#![allow(dead_code)]
use anyhow::{Context, Result};
use clawgarden_proto::Envelope;
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::UnixStream;
const DEFAULT_SOCK_PATH: &str = "/tmp/clawgarden.sock";
fn sock_path() -> String {
std::env::var("CLAWGARDEN_SOCKET").unwrap_or_else(|_| DEFAULT_SOCK_PATH.to_string())
}
pub struct BusClient {
read_half: Option<tokio::io::ReadHalf<UnixStream>>,
write_half: Option<tokio::io::WriteHalf<UnixStream>>,
}
impl BusClient {
pub fn new() -> Self {
Self {
read_half: None,
write_half: None,
}
}
pub async fn connect(&mut self) -> Result<()> {
let path = sock_path();
let stream = UnixStream::connect(&path)
.await
.with_context(|| format!("Failed to connect to bus socket at {}", path))?;
let (rh, wh) = tokio::io::split(stream);
self.read_half = Some(rh);
self.write_half = Some(wh);
log::info!("Connected to bus at {}", sock_path());
Ok(())
}
pub async fn send(&mut self, envelope: &Envelope) -> Result<()> {
let wh = self.write_half.as_mut().context("Not connected to bus")?;
let data = serde_json::to_vec(envelope)?;
let len = data.len() as u32;
let mut buf = Vec::with_capacity(4 + data.len());
buf.extend_from_slice(&len.to_be_bytes());
buf.extend_from_slice(&data);
wh.write_all(&buf).await?;
wh.flush().await?;
Ok(())
}
pub async fn recv(&mut self) -> Result<Envelope> {
let rh = self.read_half.as_mut().context("Not connected to bus")?;
let mut len_buf = [0u8; 4];
rh.read_exact(&mut len_buf).await?;
let len = u32::from_be_bytes(len_buf) as usize;
let mut payload_buf = vec![0u8; len];
rh.read_exact(&mut payload_buf).await?;
let envelope: Envelope = serde_json::from_slice(&payload_buf)?;
Ok(envelope)
}
pub fn is_connected(&self) -> bool {
self.read_half.is_some() && self.write_half.is_some()
}
pub fn disconnect(&mut self) {
self.read_half = None;
self.write_half = None;
log::warn!("Disconnected from bus");
}
}
impl Default for BusClient {
fn default() -> Self {
Self::new()
}
}
#[allow(dead_code)]
pub async fn connect_with_retry(max_retries: usize) -> Result<BusClient> {
let mut client = BusClient::new();
let mut retries = 0;
loop {
match client.connect().await {
Ok(_) => return Ok(client),
Err(e) => {
retries += 1;
if retries > max_retries {
return Err(e).context("Max retry attempts reached");
}
let delay = Duration::from_millis(500 * (2_u64.pow(retries.min(5) as u32)));
log::warn!("Connection failed, retrying in {:?}: {}", delay, e);
tokio::time::sleep(delay).await;
}
}
}
}