use std::net::SocketAddr;
use anyhow::{Context, Result};
use futures_lite::StreamExt;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream;
use tokio_util::codec::{FramedRead, LinesCodec};
use uuid::Uuid;
use super::protocol::{Event, EventKind, Op, Request};
use super::MAX_LINE_BYTES;
pub struct DaemonClient {
addr: SocketAddr,
}
impl DaemonClient {
pub fn new(port: u16) -> Self {
Self {
addr: SocketAddr::from(([127, 0, 0, 1], port)),
}
}
pub async fn is_running(&self) -> bool {
TcpStream::connect(self.addr).await.is_ok()
}
pub async fn send(&self, op: Op, mut on_event: impl FnMut(Event)) -> Result<()> {
let req = Request {
req_id: Uuid::new_v4(),
op,
};
let stream = TcpStream::connect(self.addr).await.with_context(|| {
format!(
"cannot connect to rdrop daemon at {} — start it with: rdrop daemon start",
self.addr
)
})?;
let (reader, mut writer) = stream.into_split();
let json_req = serde_json::to_string(&req)?;
writer.write_all(format!("{json_req}\n").as_bytes()).await?;
let mut framed = FramedRead::new(reader, LinesCodec::new_with_max_length(MAX_LINE_BYTES));
loop {
let line = framed
.next()
.await
.ok_or_else(|| {
anyhow::anyhow!("rdrop daemon closed connection before sending Done or Error")
})?
.context("framing error reading from daemon")?;
let event: Event = serde_json::from_str(&line)
.with_context(|| format!("malformed event from daemon: {line}"))?;
let is_eos = matches!(event.kind, EventKind::Done | EventKind::Error { .. });
on_event(event);
if is_eos {
break;
}
}
Ok(())
}
pub async fn run(&self, op: Op) -> Result<()> {
let mut err: Option<String> = None;
self.send(op, |event| match event.kind {
EventKind::Line { text } => println!("{text}"),
EventKind::Error { message } => err = Some(message),
EventKind::Done | EventKind::Progress { .. } | EventKind::Record { .. } => {}
})
.await?;
if let Some(msg) = err {
anyhow::bail!(msg);
}
Ok(())
}
}