lore_cli/daemon/
server.rs1use anyhow::{Context, Result};
8use serde::{Deserialize, Serialize};
9use std::path::Path;
10use std::sync::Arc;
11use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
12use tokio::net::{UnixListener, UnixStream};
13use tokio::sync::{oneshot, RwLock};
14
15use super::state::DaemonStats;
16
17#[derive(Debug, Clone, Serialize, Deserialize)]
19#[serde(tag = "command", rename_all = "snake_case")]
20pub enum DaemonCommand {
21 Status,
23 Stop,
25 Stats,
27 Ping,
29}
30
31#[derive(Debug, Clone, Serialize, Deserialize)]
33#[serde(tag = "type", rename_all = "snake_case")]
34pub enum DaemonResponse {
35 Status {
37 running: bool,
38 pid: u32,
39 uptime_seconds: u64,
40 },
41 Stopping,
43 Stats(DaemonStats),
45 Pong,
47 Error { message: String },
49}
50
51pub async fn run_server(
67 socket_path: &Path,
68 stats: Arc<RwLock<DaemonStats>>,
69 shutdown_tx: Option<oneshot::Sender<()>>,
70 mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
71) -> Result<()> {
72 if socket_path.exists() {
74 std::fs::remove_file(socket_path).context("Failed to remove existing socket file")?;
75 }
76
77 let listener = UnixListener::bind(socket_path).context("Failed to bind Unix socket")?;
78
79 tracing::info!("IPC server listening on {:?}", socket_path);
80
81 let shutdown_tx = Arc::new(std::sync::Mutex::new(shutdown_tx));
83
84 loop {
85 tokio::select! {
86 accept_result = listener.accept() => {
87 match accept_result {
88 Ok((stream, _addr)) => {
89 let stats_clone = stats.clone();
90 let shutdown_tx_clone = shutdown_tx.clone();
91 tokio::spawn(async move {
92 if let Err(e) = handle_connection(stream, stats_clone, shutdown_tx_clone).await {
93 tracing::warn!("Error handling IPC connection: {}", e);
94 }
95 });
96 }
97 Err(e) => {
98 tracing::warn!("Failed to accept connection: {}", e);
99 }
100 }
101 }
102 _ = shutdown_rx.recv() => {
103 tracing::info!("IPC server shutting down");
104 break;
105 }
106 }
107 }
108
109 Ok(())
110}
111
112async fn handle_connection(
114 stream: UnixStream,
115 stats: Arc<RwLock<DaemonStats>>,
116 shutdown_tx: Arc<std::sync::Mutex<Option<oneshot::Sender<()>>>>,
117) -> Result<()> {
118 let (reader, mut writer) = stream.into_split();
119 let mut reader = BufReader::new(reader);
120 let mut line = String::new();
121
122 reader
124 .read_line(&mut line)
125 .await
126 .context("Failed to read from socket")?;
127
128 let command: DaemonCommand =
129 serde_json::from_str(line.trim()).context("Failed to parse command")?;
130
131 tracing::debug!("Received IPC command: {:?}", command);
132
133 let response = match command {
134 DaemonCommand::Status => {
135 let stats_guard = stats.read().await;
136 let uptime = chrono::Utc::now()
137 .signed_duration_since(stats_guard.started_at)
138 .num_seconds() as u64;
139 DaemonResponse::Status {
140 running: true,
141 pid: std::process::id(),
142 uptime_seconds: uptime,
143 }
144 }
145 DaemonCommand::Stop => {
146 let mut guard = shutdown_tx
149 .lock()
150 .unwrap_or_else(|poisoned| poisoned.into_inner());
151 if let Some(tx) = guard.take() {
152 let _ = tx.send(());
153 }
154 DaemonResponse::Stopping
155 }
156 DaemonCommand::Stats => {
157 let stats_guard = stats.read().await;
158 DaemonResponse::Stats(stats_guard.clone())
159 }
160 DaemonCommand::Ping => DaemonResponse::Pong,
161 };
162
163 let response_json = serde_json::to_string(&response).context("Failed to serialize response")?;
164
165 writer
166 .write_all(response_json.as_bytes())
167 .await
168 .context("Failed to write response")?;
169 writer
170 .write_all(b"\n")
171 .await
172 .context("Failed to write newline")?;
173 writer.flush().await.context("Failed to flush writer")?;
174
175 Ok(())
176}
177
178pub async fn send_command(socket_path: &Path, command: DaemonCommand) -> Result<DaemonResponse> {
192 let stream = UnixStream::connect(socket_path)
193 .await
194 .context("Failed to connect to daemon socket")?;
195
196 let (reader, mut writer) = stream.into_split();
197
198 let command_json = serde_json::to_string(&command).context("Failed to serialize command")?;
200 writer
201 .write_all(command_json.as_bytes())
202 .await
203 .context("Failed to write command")?;
204 writer
205 .write_all(b"\n")
206 .await
207 .context("Failed to write newline")?;
208 writer.flush().await.context("Failed to flush")?;
209
210 let mut reader = BufReader::new(reader);
212 let mut line = String::new();
213 reader
214 .read_line(&mut line)
215 .await
216 .context("Failed to read response")?;
217
218 let response: DaemonResponse =
219 serde_json::from_str(line.trim()).context("Failed to parse response")?;
220
221 Ok(response)
222}
223
224pub fn send_command_sync(socket_path: &Path, command: DaemonCommand) -> Result<DaemonResponse> {
229 let rt = tokio::runtime::Runtime::new().context("Failed to create tokio runtime")?;
230 rt.block_on(send_command(socket_path, command))
231}
232
233#[cfg(test)]
234mod tests {
235 use super::*;
236 use tempfile::tempdir;
237
238 #[test]
239 fn test_daemon_command_serialization() {
240 let commands = vec![
241 DaemonCommand::Status,
242 DaemonCommand::Stop,
243 DaemonCommand::Stats,
244 DaemonCommand::Ping,
245 ];
246
247 for cmd in commands {
248 let json = serde_json::to_string(&cmd).expect("Failed to serialize");
249 let parsed: DaemonCommand = serde_json::from_str(&json).expect("Failed to parse");
250 let _ = parsed;
252 }
253 }
254
255 #[test]
256 fn test_daemon_response_status_serialization() {
257 let response = DaemonResponse::Status {
258 running: true,
259 pid: 12345,
260 uptime_seconds: 3600,
261 };
262
263 let json = serde_json::to_string(&response).expect("Failed to serialize");
264 assert!(json.contains("\"type\":\"status\""));
265 assert!(json.contains("\"running\":true"));
266 assert!(json.contains("\"pid\":12345"));
267
268 let parsed: DaemonResponse = serde_json::from_str(&json).expect("Failed to parse");
269 match parsed {
270 DaemonResponse::Status {
271 running,
272 pid,
273 uptime_seconds,
274 } => {
275 assert!(running);
276 assert_eq!(pid, 12345);
277 assert_eq!(uptime_seconds, 3600);
278 }
279 _ => panic!("Expected Status response"),
280 }
281 }
282
283 #[test]
284 fn test_daemon_response_stopping_serialization() {
285 let response = DaemonResponse::Stopping;
286 let json = serde_json::to_string(&response).expect("Failed to serialize");
287 assert!(json.contains("\"type\":\"stopping\""));
288 }
289
290 #[test]
291 fn test_daemon_response_stats_serialization() {
292 let stats = DaemonStats::default();
293 let response = DaemonResponse::Stats(stats);
294
295 let json = serde_json::to_string(&response).expect("Failed to serialize");
296 assert!(json.contains("\"type\":\"stats\""));
297 assert!(json.contains("\"files_watched\""));
298 }
299
300 #[test]
301 fn test_daemon_response_error_serialization() {
302 let response = DaemonResponse::Error {
303 message: "Something went wrong".to_string(),
304 };
305
306 let json = serde_json::to_string(&response).expect("Failed to serialize");
307 assert!(json.contains("\"type\":\"error\""));
308 assert!(json.contains("Something went wrong"));
309 }
310
311 #[tokio::test]
312 async fn test_server_client_communication() {
313 let dir = tempdir().expect("Failed to create temp dir");
314 let socket_path = dir.path().join("test.sock");
315
316 let stats = Arc::new(RwLock::new(DaemonStats::default()));
317 let (shutdown_tx, _shutdown_rx) = oneshot::channel();
318 let (broadcast_tx, broadcast_rx) = tokio::sync::broadcast::channel(1);
319
320 let socket_path_clone = socket_path.clone();
322 let stats_clone = stats.clone();
323 let server_handle = tokio::spawn(async move {
324 run_server(
325 &socket_path_clone,
326 stats_clone,
327 Some(shutdown_tx),
328 broadcast_rx,
329 )
330 .await
331 });
332
333 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
335
336 let response = send_command(&socket_path, DaemonCommand::Ping)
338 .await
339 .expect("Failed to send command");
340
341 match response {
342 DaemonResponse::Pong => {}
343 _ => panic!("Expected Pong response"),
344 }
345
346 let response = send_command(&socket_path, DaemonCommand::Status)
348 .await
349 .expect("Failed to send command");
350
351 match response {
352 DaemonResponse::Status { running, .. } => {
353 assert!(running);
354 }
355 _ => panic!("Expected Status response"),
356 }
357
358 let response = send_command(&socket_path, DaemonCommand::Stop)
360 .await
361 .expect("Failed to send command");
362
363 match response {
364 DaemonResponse::Stopping => {}
365 _ => panic!("Expected Stopping response"),
366 }
367
368 let _ = broadcast_tx.send(());
370 let _ = tokio::time::timeout(tokio::time::Duration::from_secs(1), server_handle).await;
371 }
372}