use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::UnixStream;
use super::lifecycle;
use super::protocol::{Method, Request, Response};
use uuid::Uuid;
#[derive(Debug, Clone, serde::Deserialize)]
pub struct DaemonEvent {
pub event: String,
pub data: serde_json::Value,
}
pub struct DaemonClient {
reader: BufReader<tokio::net::unix::OwnedReadHalf>,
writer: tokio::net::unix::OwnedWriteHalf,
}
impl DaemonClient {
pub async fn connect() -> anyhow::Result<Self> {
let sock_path = lifecycle::socket_path();
let stream = UnixStream::connect(&sock_path).await.map_err(|e| {
anyhow::anyhow!("cannot connect to daemon at {}: {e}", sock_path.display())
})?;
let (reader, writer) = stream.into_split();
Ok(Self {
reader: BufReader::new(reader),
writer,
})
}
pub async fn connect_or_start(config_path: &str) -> anyhow::Result<Self> {
for _attempt in 0..10 {
if let Ok(client) = Self::connect().await {
return Ok(client);
}
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
}
tracing::info!("daemon not running, starting...");
lifecycle::spawn_daemon(config_path).await?;
Self::connect().await
}
pub async fn send(
&mut self,
method: Method,
params: serde_json::Value,
) -> anyhow::Result<Response> {
let request = Request {
id: Uuid::new_v4().to_string(),
method,
params,
};
let json = serde_json::to_string(&request)?;
self.writer.write_all(json.as_bytes()).await?;
self.writer.write_all(b"\n").await?;
self.writer.flush().await?;
let mut line = String::new();
self.reader.read_line(&mut line).await?;
if line.trim().is_empty() {
anyhow::bail!("daemon closed connection");
}
let response: Response = serde_json::from_str(line.trim())?;
Ok(response)
}
pub async fn call(&mut self, method: Method) -> anyhow::Result<Response> {
self.send(method, serde_json::Value::Object(Default::default()))
.await
}
pub async fn subscribe_events() -> anyhow::Result<EventStream> {
let sock_path = lifecycle::socket_path();
let mut stream = UnixStream::connect(&sock_path)
.await
.map_err(|e| anyhow::anyhow!("cannot connect to daemon for events: {e}"))?;
let request = Request {
id: Uuid::new_v4().to_string(),
method: Method::Subscribe,
params: serde_json::Value::Object(Default::default()),
};
let json = serde_json::to_string(&request)?;
stream.write_all(json.as_bytes()).await?;
stream.write_all(b"\n").await?;
stream.flush().await?;
let mut reader = BufReader::new(&mut stream);
let mut line = String::new();
reader.read_line(&mut line).await?;
Ok(EventStream {
reader: BufReader::new(stream),
})
}
pub fn extract_error(response: &Response) -> Option<&str> {
match response {
Response::Error { error, .. } => Some(error.as_str()),
_ => None,
}
}
pub fn extract_result(response: &Response) -> Option<&serde_json::Value> {
match response {
Response::Success { result, .. } => Some(result),
_ => None,
}
}
}
pub struct EventStream {
reader: BufReader<UnixStream>,
}
impl EventStream {
pub async fn recv(&mut self) -> Option<DaemonEvent> {
let mut line = String::new();
match self.reader.read_line(&mut line).await {
Ok(0) => None,
Ok(_) => {
let trimmed = line.trim();
if trimmed.is_empty() {
return None;
}
serde_json::from_str::<DaemonEvent>(trimmed).ok()
}
Err(_) => None,
}
}
}