darq 0.1.0

darq CLI + TUI — autonomous issue → PR pipeline with SAT and a learning loop.
Documentation
//! Daemon socket client.
//!
//! Connects to daemon via Unix socket, sends commands, receives responses.
//! Supports event streaming via a dedicated event connection.

use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::UnixStream;

use super::lifecycle;
use super::protocol::{Method, Request, Response};
use uuid::Uuid;

/// A received event from the daemon.
#[derive(Debug, Clone, serde::Deserialize)]
pub struct DaemonEvent {
    pub event: String,
    pub data: serde_json::Value,
}

/// Client for communicating with the daemon over Unix socket.
pub struct DaemonClient {
    reader: BufReader<tokio::net::unix::OwnedReadHalf>,
    writer: tokio::net::unix::OwnedWriteHalf,
}

impl DaemonClient {
    /// Connect to the daemon. Fails if daemon is not running.
    pub async fn connect() -> anyhow::Result<Self> {
        let sock_path = lifecycle::socket_path();
        let stream = UnixStream::connect(&sock_path).await.map_err(|e| {
            anyhow::anyhow!("cannot connect to daemon at {}: {e}", sock_path.display())
        })?;
        let (reader, writer) = stream.into_split();
        Ok(Self {
            reader: BufReader::new(reader),
            writer,
        })
    }

    /// Connect to daemon, auto-starting it if not running.
    pub async fn connect_or_start(config_path: &str) -> anyhow::Result<Self> {
        // Retry connection a few times — daemon may still be starting
        for _attempt in 0..10 {
            if let Ok(client) = Self::connect().await {
                return Ok(client);
            }
            tokio::time::sleep(std::time::Duration::from_millis(500)).await;
        }

        tracing::info!("daemon not running, starting...");
        lifecycle::spawn_daemon(config_path).await?;

        Self::connect().await
    }

    /// Send a request and wait for a response.
    pub async fn send(
        &mut self,
        method: Method,
        params: serde_json::Value,
    ) -> anyhow::Result<Response> {
        let request = Request {
            id: Uuid::new_v4().to_string(),
            method,
            params,
        };

        let json = serde_json::to_string(&request)?;

        // Write request
        self.writer.write_all(json.as_bytes()).await?;
        self.writer.write_all(b"\n").await?;
        self.writer.flush().await?;

        // Read response
        let mut line = String::new();
        self.reader.read_line(&mut line).await?;

        if line.trim().is_empty() {
            anyhow::bail!("daemon closed connection");
        }

        let response: Response = serde_json::from_str(line.trim())?;
        Ok(response)
    }

    /// Convenience: send a request with no params.
    pub async fn call(&mut self, method: Method) -> anyhow::Result<Response> {
        self.send(method, serde_json::Value::Object(Default::default()))
            .await
    }

    /// Subscribe to daemon events. Returns an EventStream that yields events.
    ///
    /// Opens a new connection to the daemon and sends a subscribe command.
    /// The connection stays open and receives pushed events in real-time.
    pub async fn subscribe_events() -> anyhow::Result<EventStream> {
        let sock_path = lifecycle::socket_path();
        let mut stream = UnixStream::connect(&sock_path)
            .await
            .map_err(|e| anyhow::anyhow!("cannot connect to daemon for events: {e}"))?;

        // Send subscribe request
        let request = Request {
            id: Uuid::new_v4().to_string(),
            method: Method::Subscribe,
            params: serde_json::Value::Object(Default::default()),
        };
        let json = serde_json::to_string(&request)?;
        stream.write_all(json.as_bytes()).await?;
        stream.write_all(b"\n").await?;
        stream.flush().await?;

        // Read the ack response
        let mut reader = BufReader::new(&mut stream);
        let mut line = String::new();
        reader.read_line(&mut line).await?;

        // Now in streaming mode — return the stream
        Ok(EventStream {
            reader: BufReader::new(stream),
        })
    }

    /// Check if a response is an error and return the error message.
    pub fn extract_error(response: &Response) -> Option<&str> {
        match response {
            Response::Error { error, .. } => Some(error.as_str()),
            _ => None,
        }
    }

    /// Extract the result value from a success response.
    pub fn extract_result(response: &Response) -> Option<&serde_json::Value> {
        match response {
            Response::Success { result, .. } => Some(result),
            _ => None,
        }
    }
}

/// A streaming connection that receives pushed events from the daemon.
pub struct EventStream {
    reader: BufReader<UnixStream>,
}

impl EventStream {
    /// Receive the next event from the daemon. Returns None if the connection closes.
    pub async fn recv(&mut self) -> Option<DaemonEvent> {
        let mut line = String::new();
        match self.reader.read_line(&mut line).await {
            Ok(0) => None,
            Ok(_) => {
                let trimmed = line.trim();
                if trimmed.is_empty() {
                    return None;
                }
                serde_json::from_str::<DaemonEvent>(trimmed).ok()
            }
            Err(_) => None,
        }
    }
}