clawgarden-agent 0.9.8

Agent runtime with persona/memory loader, judge, and pi RPC for ClawGarden
Documentation
#![allow(dead_code)]
//! UDS Bus Client for communicating with the ClawGarden message bus.
//!
//! Bidirectional: sends envelopes AND receives broadcast events from the bus
//! over a persistent UDS connection. Uses split read/write halves so that
//! send() and recv() can operate concurrently.

use anyhow::{Context, Result};
use clawgarden_proto::Envelope;
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::UnixStream;

/// Path to the ClawGarden UDS socket (overridable via CLAWGARDEN_SOCKET env)
const DEFAULT_SOCK_PATH: &str = "/tmp/clawgarden.sock";

fn sock_path() -> String {
    std::env::var("CLAWGARDEN_SOCKET").unwrap_or_else(|_| DEFAULT_SOCK_PATH.to_string())
}

/// Bus client using split read/write halves for concurrent send + recv
pub struct BusClient {
    read_half: Option<tokio::io::ReadHalf<UnixStream>>,
    write_half: Option<tokio::io::WriteHalf<UnixStream>>,
}

impl BusClient {
    /// Create a new bus client
    pub fn new() -> Self {
        Self {
            read_half: None,
            write_half: None,
        }
    }

    /// Connect to the bus socket and split into read/write halves
    pub async fn connect(&mut self) -> Result<()> {
        let path = sock_path();
        let stream = UnixStream::connect(&path)
            .await
            .with_context(|| format!("Failed to connect to bus socket at {}", path))?;
        let (rh, wh) = tokio::io::split(stream);
        self.read_half = Some(rh);
        self.write_half = Some(wh);
        log::info!("Connected to bus at {}", sock_path());
        Ok(())
    }

    /// Send an envelope to the bus (fire-and-forget)
    pub async fn send(&mut self, envelope: &Envelope) -> Result<()> {
        let wh = self.write_half.as_mut().context("Not connected to bus")?;

        let data = serde_json::to_vec(envelope)?;
        let len = data.len() as u32;
        let mut buf = Vec::with_capacity(4 + data.len());
        buf.extend_from_slice(&len.to_be_bytes());
        buf.extend_from_slice(&data);

        wh.write_all(&buf).await?;
        wh.flush().await?;
        Ok(())
    }

    /// Receive an envelope from the bus (blocking until one arrives)
    pub async fn recv(&mut self) -> Result<Envelope> {
        let rh = self.read_half.as_mut().context("Not connected to bus")?;

        let mut len_buf = [0u8; 4];
        rh.read_exact(&mut len_buf).await?;
        let len = u32::from_be_bytes(len_buf) as usize;

        let mut payload_buf = vec![0u8; len];
        rh.read_exact(&mut payload_buf).await?;

        let envelope: Envelope = serde_json::from_slice(&payload_buf)?;
        Ok(envelope)
    }

    /// Check if connected
    pub fn is_connected(&self) -> bool {
        self.read_half.is_some() && self.write_half.is_some()
    }

    /// Handle disconnection
    pub fn disconnect(&mut self) {
        self.read_half = None;
        self.write_half = None;
        log::warn!("Disconnected from bus");
    }
}

impl Default for BusClient {
    fn default() -> Self {
        Self::new()
    }
}

/// Reconnect loop with exponential backoff
#[allow(dead_code)]
pub async fn connect_with_retry(max_retries: usize) -> Result<BusClient> {
    let mut client = BusClient::new();
    let mut retries = 0;

    loop {
        match client.connect().await {
            Ok(_) => return Ok(client),
            Err(e) => {
                retries += 1;
                if retries > max_retries {
                    return Err(e).context("Max retry attempts reached");
                }
                let delay = Duration::from_millis(500 * (2_u64.pow(retries.min(5) as u32)));
                log::warn!("Connection failed, retrying in {:?}: {}", delay, e);
                tokio::time::sleep(delay).await;
            }
        }
    }
}