#![allow(dead_code)]
use anyhow::{Context, Result};
use clawgarden_proto::Envelope;
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::UnixStream;
const SOCK_PATH: &str = "/tmp/clawgarden.sock";
pub struct BusClient {
stream: Option<UnixStream>,
}
impl BusClient {
pub fn new() -> Self {
Self { stream: None }
}
pub async fn connect(&mut self) -> Result<()> {
let stream = UnixStream::connect(SOCK_PATH)
.await
.context("Failed to connect to bus socket")?;
self.stream = Some(stream);
log::info!("Connected to bus at {}", SOCK_PATH);
Ok(())
}
pub async fn send(&mut self, envelope: &Envelope) -> Result<()> {
let stream = self.stream.as_mut().context("Not connected to bus")?;
let data = serde_json::to_vec(envelope)?;
let len = data.len() as u32;
let mut buf = Vec::new();
buf.extend_from_slice(&len.to_be_bytes());
buf.extend_from_slice(&data);
stream.write_all(&buf).await?;
stream.flush().await?;
log::debug!("Sent envelope: {:?}", envelope.id);
Ok(())
}
pub async fn recv(&mut self) -> Result<Envelope> {
let stream = self.stream.as_mut().context("Not connected to bus")?;
let mut len_buf = [0u8; 4];
stream.read_exact(&mut len_buf).await?;
let len = u32::from_be_bytes(len_buf) as usize;
let mut payload_buf = vec![0u8; len];
stream.read_exact(&mut payload_buf).await?;
let envelope: Envelope = serde_json::from_slice(&payload_buf)?;
log::debug!("Received envelope: {:?}", envelope.id);
Ok(envelope)
}
pub fn is_connected(&self) -> bool {
self.stream.is_some()
}
pub fn disconnect(&mut self) {
self.stream = None;
log::warn!("Disconnected from bus");
}
}
impl Default for BusClient {
fn default() -> Self {
Self::new()
}
}
#[allow(dead_code)]
pub async fn connect_with_retry(max_retries: usize) -> Result<BusClient> {
let mut client = BusClient::new();
let mut retries = 0;
loop {
match client.connect().await {
Ok(_) => return Ok(client),
Err(e) => {
retries += 1;
if retries > max_retries {
return Err(e).context("Max retry attempts reached");
}
let delay = Duration::from_millis(500 * (2_u64.pow(retries.min(5) as u32)));
log::warn!("Connection failed, retrying in {:?}: {}", delay, e);
tokio::time::sleep(delay).await;
}
}
}
}