clawgarden-agent 0.2.4

Agent runtime with persona/memory loader, judge, and pi RPC for ClawGarden
Documentation
#![allow(dead_code)]
//! UDS Bus Client for communicating with the ClawGarden message bus

use anyhow::{Context, Result};
use clawgarden_proto::Envelope;
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::UnixStream;

/// Path to the ClawGarden UDS socket
const SOCK_PATH: &str = "/tmp/clawgarden.sock";

/// Bus client for sending and receiving Envelopes via UDS
pub struct BusClient {
    stream: Option<UnixStream>,
}

impl BusClient {
    /// Create a new bus client
    pub fn new() -> Self {
        Self { stream: None }
    }

    /// Connect to the bus socket
    pub async fn connect(&mut self) -> Result<()> {
        let stream = UnixStream::connect(SOCK_PATH)
            .await
            .context("Failed to connect to bus socket")?;
        self.stream = Some(stream);
        log::info!("Connected to bus at {}", SOCK_PATH);
        Ok(())
    }

    /// Send an envelope to the bus
    pub async fn send(&mut self, envelope: &Envelope) -> Result<()> {
        let stream = self.stream.as_mut().context("Not connected to bus")?;

        let data = serde_json::to_vec(envelope)?;
        let len = data.len() as u32;
        let mut buf = Vec::new();
        buf.extend_from_slice(&len.to_be_bytes());
        buf.extend_from_slice(&data);

        stream.write_all(&buf).await?;
        stream.flush().await?;
        log::debug!("Sent envelope: {:?}", envelope.id);
        Ok(())
    }

    /// Receive an envelope from the bus
    pub async fn recv(&mut self) -> Result<Envelope> {
        let stream = self.stream.as_mut().context("Not connected to bus")?;

        // Read length prefix (4 bytes)
        let mut len_buf = [0u8; 4];
        stream.read_exact(&mut len_buf).await?;
        let len = u32::from_be_bytes(len_buf) as usize;

        // Read payload
        let mut payload_buf = vec![0u8; len];
        stream.read_exact(&mut payload_buf).await?;

        let envelope: Envelope = serde_json::from_slice(&payload_buf)?;
        log::debug!("Received envelope: {:?}", envelope.id);
        Ok(envelope)
    }

    /// Check if connected
    pub fn is_connected(&self) -> bool {
        self.stream.is_some()
    }

    /// Handle disconnection - mark stream as None for reconnect
    pub fn disconnect(&mut self) {
        self.stream = None;
        log::warn!("Disconnected from bus");
    }
}

impl Default for BusClient {
    fn default() -> Self {
        Self::new()
    }
}

/// Reconnect loop with exponential backoff
#[allow(dead_code)]
pub async fn connect_with_retry(max_retries: usize) -> Result<BusClient> {
    let mut client = BusClient::new();
    let mut retries = 0;

    loop {
        match client.connect().await {
            Ok(_) => return Ok(client),
            Err(e) => {
                retries += 1;
                if retries > max_retries {
                    return Err(e).context("Max retry attempts reached");
                }
                let delay = Duration::from_millis(500 * (2_u64.pow(retries.min(5) as u32)));
                log::warn!("Connection failed, retrying in {:?}: {}", delay, e);
                tokio::time::sleep(delay).await;
            }
        }
    }
}