lore_cli/daemon/
server.rs

1//! Unix socket IPC server for daemon communication.
2//!
3//! Provides a simple request/response protocol over Unix domain sockets
4//! for communicating with the running daemon. Supports commands like
5//! status, stop, and stats.
6
7use anyhow::{Context, Result};
8use serde::{Deserialize, Serialize};
9use std::path::Path;
10use std::sync::Arc;
11use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
12use tokio::net::{UnixListener, UnixStream};
13use tokio::sync::{oneshot, RwLock};
14
15use super::state::DaemonStats;
16
17/// Commands that can be sent to the daemon via IPC.
18#[derive(Debug, Clone, Serialize, Deserialize)]
19#[serde(tag = "command", rename_all = "snake_case")]
20pub enum DaemonCommand {
21    /// Request the daemon's current status.
22    Status,
23    /// Request the daemon to shut down gracefully.
24    Stop,
25    /// Request runtime statistics from the daemon.
26    Stats,
27    /// Ping to check if daemon is responsive.
28    Ping,
29}
30
31/// Responses from the daemon to IPC commands.
32#[derive(Debug, Clone, Serialize, Deserialize)]
33#[serde(tag = "type", rename_all = "snake_case")]
34pub enum DaemonResponse {
35    /// Status response indicating daemon is running.
36    Status {
37        running: bool,
38        pid: u32,
39        uptime_seconds: u64,
40    },
41    /// Acknowledgment that stop command was received.
42    Stopping,
43    /// Runtime statistics.
44    Stats(DaemonStats),
45    /// Ping response.
46    Pong,
47    /// Error response.
48    Error { message: String },
49}
50
51/// Runs the IPC server on the given Unix socket path.
52///
53/// The server listens for incoming connections and processes commands
54/// until a shutdown signal is received or the Stop command is sent.
55///
56/// # Arguments
57///
58/// * `socket_path` - Path for the Unix domain socket
59/// * `stats` - Shared statistics that can be read by clients
60/// * `shutdown_tx` - Sender to signal daemon shutdown when Stop is received
61/// * `mut shutdown_rx` - Receiver that signals when to stop the server
62///
63/// # Errors
64///
65/// Returns an error if the socket cannot be created or bound.
66pub async fn run_server(
67    socket_path: &Path,
68    stats: Arc<RwLock<DaemonStats>>,
69    shutdown_tx: Option<oneshot::Sender<()>>,
70    mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
71) -> Result<()> {
72    // Remove existing socket file if present
73    if socket_path.exists() {
74        std::fs::remove_file(socket_path).context("Failed to remove existing socket file")?;
75    }
76
77    let listener = UnixListener::bind(socket_path).context("Failed to bind Unix socket")?;
78
79    tracing::info!("IPC server listening on {:?}", socket_path);
80
81    // Wrap shutdown_tx in Arc<Mutex> so it can be moved into the handler
82    let shutdown_tx = Arc::new(std::sync::Mutex::new(shutdown_tx));
83
84    loop {
85        tokio::select! {
86            accept_result = listener.accept() => {
87                match accept_result {
88                    Ok((stream, _addr)) => {
89                        let stats_clone = stats.clone();
90                        let shutdown_tx_clone = shutdown_tx.clone();
91                        tokio::spawn(async move {
92                            if let Err(e) = handle_connection(stream, stats_clone, shutdown_tx_clone).await {
93                                tracing::warn!("Error handling IPC connection: {}", e);
94                            }
95                        });
96                    }
97                    Err(e) => {
98                        tracing::warn!("Failed to accept connection: {}", e);
99                    }
100                }
101            }
102            _ = shutdown_rx.recv() => {
103                tracing::info!("IPC server shutting down");
104                break;
105            }
106        }
107    }
108
109    Ok(())
110}
111
112/// Handles a single client connection.
113async fn handle_connection(
114    stream: UnixStream,
115    stats: Arc<RwLock<DaemonStats>>,
116    shutdown_tx: Arc<std::sync::Mutex<Option<oneshot::Sender<()>>>>,
117) -> Result<()> {
118    let (reader, mut writer) = stream.into_split();
119    let mut reader = BufReader::new(reader);
120    let mut line = String::new();
121
122    // Read a single line (one command per connection)
123    reader
124        .read_line(&mut line)
125        .await
126        .context("Failed to read from socket")?;
127
128    let command: DaemonCommand =
129        serde_json::from_str(line.trim()).context("Failed to parse command")?;
130
131    tracing::debug!("Received IPC command: {:?}", command);
132
133    let response = match command {
134        DaemonCommand::Status => {
135            let stats_guard = stats.read().await;
136            let uptime = chrono::Utc::now()
137                .signed_duration_since(stats_guard.started_at)
138                .num_seconds() as u64;
139            DaemonResponse::Status {
140                running: true,
141                pid: std::process::id(),
142                uptime_seconds: uptime,
143            }
144        }
145        DaemonCommand::Stop => {
146            // Signal the daemon to shut down
147            // If the lock is poisoned, we still want to try to shut down
148            let mut guard = shutdown_tx
149                .lock()
150                .unwrap_or_else(|poisoned| poisoned.into_inner());
151            if let Some(tx) = guard.take() {
152                let _ = tx.send(());
153            }
154            DaemonResponse::Stopping
155        }
156        DaemonCommand::Stats => {
157            let stats_guard = stats.read().await;
158            DaemonResponse::Stats(stats_guard.clone())
159        }
160        DaemonCommand::Ping => DaemonResponse::Pong,
161    };
162
163    let response_json = serde_json::to_string(&response).context("Failed to serialize response")?;
164
165    writer
166        .write_all(response_json.as_bytes())
167        .await
168        .context("Failed to write response")?;
169    writer
170        .write_all(b"\n")
171        .await
172        .context("Failed to write newline")?;
173    writer.flush().await.context("Failed to flush writer")?;
174
175    Ok(())
176}
177
178/// Sends a command to the daemon and returns the response.
179///
180/// Connects to the Unix socket, sends the command, and reads the response.
181///
182/// # Arguments
183///
184/// * `socket_path` - Path to the daemon's Unix socket
185/// * `command` - The command to send
186///
187/// # Errors
188///
189/// Returns an error if the connection fails, the command cannot be sent,
190/// or the response cannot be read or parsed.
191pub async fn send_command(socket_path: &Path, command: DaemonCommand) -> Result<DaemonResponse> {
192    let stream = UnixStream::connect(socket_path)
193        .await
194        .context("Failed to connect to daemon socket")?;
195
196    let (reader, mut writer) = stream.into_split();
197
198    // Send command
199    let command_json = serde_json::to_string(&command).context("Failed to serialize command")?;
200    writer
201        .write_all(command_json.as_bytes())
202        .await
203        .context("Failed to write command")?;
204    writer
205        .write_all(b"\n")
206        .await
207        .context("Failed to write newline")?;
208    writer.flush().await.context("Failed to flush")?;
209
210    // Read response
211    let mut reader = BufReader::new(reader);
212    let mut line = String::new();
213    reader
214        .read_line(&mut line)
215        .await
216        .context("Failed to read response")?;
217
218    let response: DaemonResponse =
219        serde_json::from_str(line.trim()).context("Failed to parse response")?;
220
221    Ok(response)
222}
223
224/// Synchronous wrapper for sending a command to the daemon.
225///
226/// Creates a temporary tokio runtime to send the command.
227/// Use this from non-async contexts like CLI commands.
228pub fn send_command_sync(socket_path: &Path, command: DaemonCommand) -> Result<DaemonResponse> {
229    let rt = tokio::runtime::Runtime::new().context("Failed to create tokio runtime")?;
230    rt.block_on(send_command(socket_path, command))
231}
232
233#[cfg(test)]
234mod tests {
235    use super::*;
236    use tempfile::tempdir;
237
238    #[test]
239    fn test_daemon_command_serialization() {
240        let commands = vec![
241            DaemonCommand::Status,
242            DaemonCommand::Stop,
243            DaemonCommand::Stats,
244            DaemonCommand::Ping,
245        ];
246
247        for cmd in commands {
248            let json = serde_json::to_string(&cmd).expect("Failed to serialize");
249            let parsed: DaemonCommand = serde_json::from_str(&json).expect("Failed to parse");
250            // Just verify round-trip works (can't compare Debug output reliably)
251            let _ = parsed;
252        }
253    }
254
255    #[test]
256    fn test_daemon_response_status_serialization() {
257        let response = DaemonResponse::Status {
258            running: true,
259            pid: 12345,
260            uptime_seconds: 3600,
261        };
262
263        let json = serde_json::to_string(&response).expect("Failed to serialize");
264        assert!(json.contains("\"type\":\"status\""));
265        assert!(json.contains("\"running\":true"));
266        assert!(json.contains("\"pid\":12345"));
267
268        let parsed: DaemonResponse = serde_json::from_str(&json).expect("Failed to parse");
269        match parsed {
270            DaemonResponse::Status {
271                running,
272                pid,
273                uptime_seconds,
274            } => {
275                assert!(running);
276                assert_eq!(pid, 12345);
277                assert_eq!(uptime_seconds, 3600);
278            }
279            _ => panic!("Expected Status response"),
280        }
281    }
282
283    #[test]
284    fn test_daemon_response_stopping_serialization() {
285        let response = DaemonResponse::Stopping;
286        let json = serde_json::to_string(&response).expect("Failed to serialize");
287        assert!(json.contains("\"type\":\"stopping\""));
288    }
289
290    #[test]
291    fn test_daemon_response_stats_serialization() {
292        let stats = DaemonStats::default();
293        let response = DaemonResponse::Stats(stats);
294
295        let json = serde_json::to_string(&response).expect("Failed to serialize");
296        assert!(json.contains("\"type\":\"stats\""));
297        assert!(json.contains("\"files_watched\""));
298    }
299
300    #[test]
301    fn test_daemon_response_error_serialization() {
302        let response = DaemonResponse::Error {
303            message: "Something went wrong".to_string(),
304        };
305
306        let json = serde_json::to_string(&response).expect("Failed to serialize");
307        assert!(json.contains("\"type\":\"error\""));
308        assert!(json.contains("Something went wrong"));
309    }
310
311    #[tokio::test]
312    async fn test_server_client_communication() {
313        let dir = tempdir().expect("Failed to create temp dir");
314        let socket_path = dir.path().join("test.sock");
315
316        let stats = Arc::new(RwLock::new(DaemonStats::default()));
317        let (shutdown_tx, _shutdown_rx) = oneshot::channel();
318        let (broadcast_tx, broadcast_rx) = tokio::sync::broadcast::channel(1);
319
320        // Start server in background
321        let socket_path_clone = socket_path.clone();
322        let stats_clone = stats.clone();
323        let server_handle = tokio::spawn(async move {
324            run_server(
325                &socket_path_clone,
326                stats_clone,
327                Some(shutdown_tx),
328                broadcast_rx,
329            )
330            .await
331        });
332
333        // Give server time to start
334        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
335
336        // Send ping command
337        let response = send_command(&socket_path, DaemonCommand::Ping)
338            .await
339            .expect("Failed to send command");
340
341        match response {
342            DaemonResponse::Pong => {}
343            _ => panic!("Expected Pong response"),
344        }
345
346        // Send status command
347        let response = send_command(&socket_path, DaemonCommand::Status)
348            .await
349            .expect("Failed to send command");
350
351        match response {
352            DaemonResponse::Status { running, .. } => {
353                assert!(running);
354            }
355            _ => panic!("Expected Status response"),
356        }
357
358        // Send stop command
359        let response = send_command(&socket_path, DaemonCommand::Stop)
360            .await
361            .expect("Failed to send command");
362
363        match response {
364            DaemonResponse::Stopping => {}
365            _ => panic!("Expected Stopping response"),
366        }
367
368        // Signal broadcast shutdown and wait for server to stop
369        let _ = broadcast_tx.send(());
370        let _ = tokio::time::timeout(tokio::time::Duration::from_secs(1), server_handle).await;
371    }
372}