syncor_core/daemon/
server.rs1use serde::{Deserialize, Serialize};
2use serde_json::Value;
3use std::path::Path;
4use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
5use tokio::net::{UnixListener, UnixStream};
6use tokio::sync::{mpsc, oneshot};
7
8#[derive(Debug, Clone, Serialize, Deserialize)]
10pub struct IpcRequest {
11 pub cmd: String,
12 pub args: Value,
13}
14
15#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct IpcResponse {
18 pub ok: bool,
19 pub data: Option<Value>,
20 pub error: Option<String>,
21}
22
23impl IpcResponse {
24 pub fn ok(data: Value) -> Self {
25 IpcResponse {
26 ok: true,
27 data: Some(data),
28 error: None,
29 }
30 }
31
32 pub fn err(msg: impl Into<String>) -> Self {
33 IpcResponse {
34 ok: false,
35 data: None,
36 error: Some(msg.into()),
37 }
38 }
39}
40
41pub type CommandSender = mpsc::Sender<(IpcRequest, oneshot::Sender<IpcResponse>)>;
43
44pub struct IpcServer {
46 shutdown_tx: oneshot::Sender<()>,
47}
48
49impl IpcServer {
50 pub async fn start(
53 sock_path: impl AsRef<Path>,
54 cmd_sender: CommandSender,
55 ) -> std::io::Result<Self> {
56 let sock_path = sock_path.as_ref();
57
58 if sock_path.exists() {
60 std::fs::remove_file(sock_path)?;
61 }
62
63 let listener = UnixListener::bind(sock_path)?;
64 let (shutdown_tx, mut shutdown_rx) = oneshot::channel::<()>();
65
66 tokio::spawn(async move {
67 loop {
68 tokio::select! {
69 _ = &mut shutdown_rx => break,
70 result = listener.accept() => {
71 match result {
72 Ok((stream, _addr)) => {
73 let sender = cmd_sender.clone();
74 tokio::spawn(handle_connection(stream, sender));
75 }
76 Err(e) => {
77 tracing::error!("IPC accept error: {e}");
78 break;
79 }
80 }
81 }
82 }
83 }
84 });
85
86 Ok(IpcServer { shutdown_tx })
87 }
88
89 pub async fn stop(self) {
91 let _ = self.shutdown_tx.send(());
92 }
93}
94
95async fn handle_connection(stream: UnixStream, cmd_sender: CommandSender) {
96 let (read_half, mut write_half) = stream.into_split();
97 let mut reader = BufReader::new(read_half);
98 let mut line = String::new();
99
100 loop {
101 line.clear();
102 match reader.read_line(&mut line).await {
103 Ok(0) => break, Ok(_) => {
105 let trimmed = line.trim_end();
106 if trimmed.is_empty() {
107 continue;
108 }
109 let req: IpcRequest = match serde_json::from_str(trimmed) {
110 Ok(r) => r,
111 Err(e) => {
112 let resp = IpcResponse::err(format!("parse error: {e}"));
113 let _ = send_response(&mut write_half, &resp).await;
114 continue;
115 }
116 };
117
118 let (reply_tx, reply_rx) = oneshot::channel::<IpcResponse>();
119 if cmd_sender.send((req, reply_tx)).await.is_err() {
120 let resp = IpcResponse::err("daemon unavailable");
121 let _ = send_response(&mut write_half, &resp).await;
122 break;
123 }
124
125 match reply_rx.await {
126 Ok(resp) => {
127 if send_response(&mut write_half, &resp).await.is_err() {
128 break;
129 }
130 }
131 Err(_) => {
132 let resp = IpcResponse::err("no reply from daemon");
133 let _ = send_response(&mut write_half, &resp).await;
134 break;
135 }
136 }
137 }
138 Err(e) => {
139 tracing::error!("IPC read error: {e}");
140 break;
141 }
142 }
143 }
144}
145
146async fn send_response(
147 write_half: &mut tokio::net::unix::OwnedWriteHalf,
148 resp: &IpcResponse,
149) -> std::io::Result<()> {
150 let mut bytes = serde_json::to_vec(resp).map_err(std::io::Error::other)?;
151 bytes.push(b'\n');
152 write_half.write_all(&bytes).await
153}
154
155pub struct IpcClient {
157 stream: UnixStream,
158}
159
160impl IpcClient {
161 pub async fn connect(sock_path: impl AsRef<Path>) -> std::io::Result<Self> {
163 let stream = UnixStream::connect(sock_path).await?;
164 Ok(IpcClient { stream })
165 }
166
167 pub async fn send(self, request: IpcRequest) -> std::io::Result<IpcResponse> {
169 let mut stream = self.stream;
172
173 let mut payload = serde_json::to_vec(&request).map_err(std::io::Error::other)?;
174 payload.push(b'\n');
175 stream.write_all(&payload).await?;
176
177 let mut reader = BufReader::new(stream);
178 let mut line = String::new();
179 reader.read_line(&mut line).await?;
180
181 let resp: IpcResponse = serde_json::from_str(line.trim_end())
182 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
183 Ok(resp)
184 }
185}