Skip to main content

syncor_core/daemon/
server.rs

1use serde::{Deserialize, Serialize};
2use serde_json::Value;
3use std::path::Path;
4use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
5use tokio::net::{UnixListener, UnixStream};
6use tokio::sync::{mpsc, oneshot};
7
8/// A request sent from an IPC client to the daemon.
9#[derive(Debug, Clone, Serialize, Deserialize)]
10pub struct IpcRequest {
11    pub cmd: String,
12    pub args: Value,
13}
14
15/// A response sent from the daemon back to the IPC client.
16#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct IpcResponse {
18    pub ok: bool,
19    pub data: Option<Value>,
20    pub error: Option<String>,
21}
22
23impl IpcResponse {
24    pub fn ok(data: Value) -> Self {
25        IpcResponse {
26            ok: true,
27            data: Some(data),
28            error: None,
29        }
30    }
31
32    pub fn err(msg: impl Into<String>) -> Self {
33        IpcResponse {
34            ok: false,
35            data: None,
36            error: Some(msg.into()),
37        }
38    }
39}
40
41/// Type alias for the channel that sends IPC commands to the daemon command loop.
42pub type CommandSender = mpsc::Sender<(IpcRequest, oneshot::Sender<IpcResponse>)>;
43
44/// Handle to the IPC server. Drop or call `stop()` to shut it down.
45pub struct IpcServer {
46    shutdown_tx: oneshot::Sender<()>,
47}
48
49impl IpcServer {
50    /// Start the IPC server, binding to `sock_path` and forwarding parsed requests
51    /// through `cmd_sender`.
52    pub async fn start(
53        sock_path: impl AsRef<Path>,
54        cmd_sender: CommandSender,
55    ) -> std::io::Result<Self> {
56        let sock_path = sock_path.as_ref();
57
58        // Remove stale socket if present.
59        if sock_path.exists() {
60            std::fs::remove_file(sock_path)?;
61        }
62
63        let listener = UnixListener::bind(sock_path)?;
64        let (shutdown_tx, mut shutdown_rx) = oneshot::channel::<()>();
65
66        tokio::spawn(async move {
67            loop {
68                tokio::select! {
69                    _ = &mut shutdown_rx => break,
70                    result = listener.accept() => {
71                        match result {
72                            Ok((stream, _addr)) => {
73                                let sender = cmd_sender.clone();
74                                tokio::spawn(handle_connection(stream, sender));
75                            }
76                            Err(e) => {
77                                tracing::error!("IPC accept error: {e}");
78                                break;
79                            }
80                        }
81                    }
82                }
83            }
84        });
85
86        Ok(IpcServer { shutdown_tx })
87    }
88
89    /// Signal the server to stop accepting new connections.
90    pub async fn stop(self) {
91        let _ = self.shutdown_tx.send(());
92    }
93}
94
95async fn handle_connection(stream: UnixStream, cmd_sender: CommandSender) {
96    let (read_half, mut write_half) = stream.into_split();
97    let mut reader = BufReader::new(read_half);
98    let mut line = String::new();
99
100    loop {
101        line.clear();
102        match reader.read_line(&mut line).await {
103            Ok(0) => break, // EOF
104            Ok(_) => {
105                let trimmed = line.trim_end();
106                if trimmed.is_empty() {
107                    continue;
108                }
109                let req: IpcRequest = match serde_json::from_str(trimmed) {
110                    Ok(r) => r,
111                    Err(e) => {
112                        let resp = IpcResponse::err(format!("parse error: {e}"));
113                        let _ = send_response(&mut write_half, &resp).await;
114                        continue;
115                    }
116                };
117
118                let (reply_tx, reply_rx) = oneshot::channel::<IpcResponse>();
119                if cmd_sender.send((req, reply_tx)).await.is_err() {
120                    let resp = IpcResponse::err("daemon unavailable");
121                    let _ = send_response(&mut write_half, &resp).await;
122                    break;
123                }
124
125                match reply_rx.await {
126                    Ok(resp) => {
127                        if send_response(&mut write_half, &resp).await.is_err() {
128                            break;
129                        }
130                    }
131                    Err(_) => {
132                        let resp = IpcResponse::err("no reply from daemon");
133                        let _ = send_response(&mut write_half, &resp).await;
134                        break;
135                    }
136                }
137            }
138            Err(e) => {
139                tracing::error!("IPC read error: {e}");
140                break;
141            }
142        }
143    }
144}
145
146async fn send_response(
147    write_half: &mut tokio::net::unix::OwnedWriteHalf,
148    resp: &IpcResponse,
149) -> std::io::Result<()> {
150    let mut bytes = serde_json::to_vec(resp).map_err(std::io::Error::other)?;
151    bytes.push(b'\n');
152    write_half.write_all(&bytes).await
153}
154
155/// IPC client that connects to a daemon over a Unix domain socket.
156pub struct IpcClient {
157    stream: UnixStream,
158}
159
160impl IpcClient {
161    /// Connect to the Unix domain socket at `sock_path`.
162    pub async fn connect(sock_path: impl AsRef<Path>) -> std::io::Result<Self> {
163        let stream = UnixStream::connect(sock_path).await?;
164        Ok(IpcClient { stream })
165    }
166
167    /// Send a request and wait for the response.
168    pub async fn send(self, request: IpcRequest) -> std::io::Result<IpcResponse> {
169        // We consume `self` to keep borrowing simple; a production client would
170        // keep the stream around for multiple requests.
171        let mut stream = self.stream;
172
173        let mut payload = serde_json::to_vec(&request).map_err(std::io::Error::other)?;
174        payload.push(b'\n');
175        stream.write_all(&payload).await?;
176
177        let mut reader = BufReader::new(stream);
178        let mut line = String::new();
179        reader.read_line(&mut line).await?;
180
181        let resp: IpcResponse = serde_json::from_str(line.trim_end())
182            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
183        Ok(resp)
184    }
185}