Skip to main content

ix/
daemon_sock.rs

1//! Unix domain socket interface for the ixd daemon.
2//!
3//! Provides real-time file-change notifications and status queries over a
4//! local Unix domain socket using NDJSON (newline-delimited JSON) framing.
5//!
6//! # Socket Path Resolution
7//!
8//! The socket path is derived from the canonical watched root:
9//!
10//! ```text
11//! $XDG_RUNTIME_DIR/ixd/{hash}.sock        # preferred (systemd, modern Linux)
12//! ~/.local/run/ixd/{hash}.sock             # fallback
13//! /tmp/ixd-{uid}-{hash}.sock              # last resort
14//! ```
15//!
16//! Where `hash` = first 16 hex chars of `XXH64(canonical_path, seed=0)`.
17//!
18//! # Wire Protocol (NDJSON)
19//!
20//! Each line is a valid JSON object terminated by `\\n`.
21//!
22//! **Server → Client (push):**
23//!
24//! ```json
25//! {"t":"status","pid":1234,"status":"idle","files":1523}
26//! {"t":"files_changed","batch":[{"p":"src/main.rs","m":1776468629,"o":"modify"}],"ts":1776468629}
27//! ```
28//!
29//! **Client → Server (query):**
30//!
31//! ```json
32//! {"t":"status_query"}
33//! {"t":"history_query","since":1776468000,"id":1}
34//! ```
35//!
36//! **Server → Client (query response):**
37//!
38//! ```json
39//! {"t":"query_result","id":1,"status":"idle","files":1523,"changes_since":[...]}
40//! ```
41
42use serde::{Deserialize, Serialize};
43use std::collections::VecDeque;
44use std::io::{BufRead, BufReader, Write};
45use std::os::unix::net::{UnixListener, UnixStream};
46use std::path::{Path, PathBuf};
47use std::sync::{Arc, Mutex};
48use std::time::{SystemTime, UNIX_EPOCH};
49
50/// Maximum number of change batches retained for history queries.
51const HISTORY_CAPACITY: usize = 1024;
52
53/// File change operation kind.
54#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
55#[serde(rename_all = "lowercase")]
56pub enum FileOp {
57    /// File was created.
58    Create,
59    /// File was modified.
60    Modify,
61    /// File was deleted.
62    Delete,
63    /// File was renamed.
64    Rename,
65}
66
67impl FileOp {
68    /// Convert from the notify crate's event kind to our serializable enum.
69    #[must_use]
70    pub fn from_notify_kind(kind: notify::EventKind) -> Self {
71        match kind {
72            notify::EventKind::Create(_) => Self::Create,
73            notify::EventKind::Remove(_) => Self::Delete,
74            _ => Self::Modify,
75        }
76    }
77}
78
79/// Typed daemon status enum for structured status tracking.
80#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
81#[serde(tag = "state", rename_all = "snake_case")]
82pub enum DaemonStatus {
83    /// Daemon is idle, no active rebuild.
84    Idle,
85    /// Active index rebuild in progress.
86    Indexing {
87        /// Current entropy reading.
88        entropy: u16,
89    },
90    /// Rebuild deferred due to high entropy.
91    Deferred {
92        /// Current entropy reading.
93        entropy: u16,
94    },
95    /// Safety escalation triggered.
96    Escalated {
97        /// Current entropy reading.
98        entropy: u16,
99    },
100    /// Safety warning issued.
101    Warned {
102        /// Warning reason.
103        reason: String,
104    },
105    /// Critical safety halt — daemon stopped.
106    SafetyHalt,
107    /// Unrecoverable safety exit.
108    SafetyExit,
109}
110
111impl std::fmt::Display for DaemonStatus {
112    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113        match self {
114            Self::Idle { .. } => write!(f, "idle"),
115            Self::Indexing { entropy } => write!(f, "indexing (entropy: {entropy})"),
116            Self::Deferred { entropy } => write!(f, "deferred (entropy: {entropy})"),
117            Self::Escalated { entropy } => write!(f, "escalated (entropy: {entropy})"),
118            Self::Warned { reason } => write!(f, "warned: {reason}"),
119            Self::SafetyHalt => write!(f, "safety halt"),
120            Self::SafetyExit => write!(f, "safety exit"),
121        }
122    }
123}
124
125/// A single file change record broadcast to connected clients.
126#[derive(Debug, Clone, Serialize, Deserialize)]
127pub struct FileChange {
128    /// Path of the changed file (relative to watched root when possible).
129    #[serde(rename = "p")]
130    pub path: PathBuf,
131    /// Modification timestamp (Unix seconds).
132    #[serde(rename = "m")]
133    pub mtime: u64,
134    /// Operation performed on the file.
135    #[serde(rename = "o")]
136    pub op: FileOp,
137}
138
139/// Messages sent from the server to connected clients.
140#[derive(Debug, Clone, Serialize, Deserialize)]
141#[serde(tag = "t", rename_all = "snake_case")]
142pub enum ServerMessage {
143    /// Periodic or on-change daemon status update.
144    Status {
145        /// PID of the daemon process.
146        pid: u32,
147        /// Human-readable status string (e.g. "idle", "indexing").
148        status: String,
149        /// Number of files currently in the index.
150        files: usize,
151        /// Typed daemon status (present when daemon is running).
152        #[serde(skip_serializing_if = "Option::is_none")]
153        daemon_status: Option<DaemonStatus>,
154    },
155    /// Batch of file changes detected by the watcher.
156    FilesChanged {
157        /// The changed files in this batch.
158        batch: Vec<FileChange>,
159        /// Timestamp of this event batch (Unix seconds).
160        #[serde(rename = "ts")]
161        timestamp: u64,
162    },
163    /// Response to a client query.
164    QueryResult {
165        /// Query ID (matches the `id` field from the request).
166        id: u64,
167        /// Current daemon status at query time.
168        status: String,
169        /// Number of files in the index.
170        files: usize,
171        /// Changes since the requested timestamp (for history queries).
172        #[serde(default, skip_serializing_if = "Vec::is_empty")]
173        changes_since: Vec<FileChange>,
174        /// Typed daemon status (present when daemon is running).
175        #[serde(skip_serializing_if = "Option::is_none")]
176        daemon_status: Option<DaemonStatus>,
177        /// Timestamp of the last successful rebuild completion.
178        #[serde(skip_serializing_if = "Option::is_none")]
179        last_rebuild_at: Option<u64>,
180    },
181}
182
183/// Messages sent from connected clients to the daemon server.
184#[derive(Debug, Clone, Serialize, Deserialize)]
185#[serde(tag = "t", rename_all = "snake_case")]
186pub enum ClientMessage {
187    /// Request current daemon status.
188    StatusQuery {
189        /// Client-assigned query ID (echoed back in the response).
190        #[serde(default)]
191        id: u64,
192    },
193    /// Request all changes since the given timestamp.
194    HistoryQuery {
195        /// Return changes with timestamps strictly after this value.
196        since: u64,
197        /// Client-assigned query ID (echoed back in the response).
198        id: u64,
199    },
200}
201
202/// Errors specific to the daemon socket subsystem.
203#[derive(Debug, thiserror::Error)]
204pub enum DaemonSockError {
205    /// I/O error on the socket.
206    #[error("daemon socket I/O: {0}")]
207    Io(#[from] std::io::Error),
208    /// JSON serialization or deserialization error.
209    #[error("daemon socket JSON: {0}")]
210    Json(#[from] serde_json::Error),
211    /// Could not resolve a suitable socket path.
212    #[error("daemon socket path resolution failed")]
213    PathResolution,
214}
215
216type Result<T> = std::result::Result<T, DaemonSockError>;
217
218/// Resolves the socket path for a given watched root directory.
219///
220/// Tries in order:
221/// 1. `$XDG_RUNTIME_DIR/ixd/{hash}.sock`
222/// 2. `$HOME/.local/run/ixd/{hash}.sock`
223/// 3. `/tmp/ixd-{uid}-{hash}.sock`
224///
225/// Where `hash` is the first 16 hex characters of `XXH64(canonical_root, 0)`.
226#[must_use]
227pub fn socket_path(root: &Path) -> PathBuf {
228    let canonical = root.canonicalize().unwrap_or_else(|_| root.to_path_buf());
229    let hash = format!(
230        "{:016x}",
231        xxhash_rust::xxh64::xxh64(canonical.to_string_lossy().as_bytes(), 0,)
232    );
233
234    if let Ok(xdg) = std::env::var("XDG_RUNTIME_DIR") {
235        let dir = PathBuf::from(xdg).join("ixd");
236        return dir.join(format!("{hash}.sock"));
237    }
238
239    if let Ok(home) = std::env::var("HOME") {
240        let dir = PathBuf::from(home).join(".local/run/ixd");
241        return dir.join(format!("{hash}.sock"));
242    }
243
244    let uid = unsafe { libc::getuid() };
245    PathBuf::from(format!("/tmp/ixd-{uid}-{hash}.sock"))
246}
247
248/// Ensure the parent directory of a socket path exists.
249fn ensure_socket_dir(path: &Path) -> std::io::Result<()> {
250    if let Some(parent) = path.parent() {
251        std::fs::create_dir_all(parent)?;
252    }
253    Ok(())
254}
255
256/// Circular buffer of recent file-change batches for history queries.
257struct History {
258    entries: VecDeque<(u64, Vec<FileChange>)>,
259}
260
261impl History {
262    fn new() -> Self {
263        Self {
264            entries: VecDeque::with_capacity(HISTORY_CAPACITY),
265        }
266    }
267
268    fn push(&mut self, timestamp: u64, changes: Vec<FileChange>) {
269        if self.entries.len() >= HISTORY_CAPACITY {
270            self.entries.pop_front();
271        }
272        self.entries.push_back((timestamp, changes));
273    }
274
275    fn since(&self, cutoff: u64) -> Vec<FileChange> {
276        self.entries
277            .iter()
278            .filter(|(ts, _)| *ts > cutoff)
279            .flat_map(|(_, changes)| changes.iter().cloned())
280            .collect()
281    }
282}
283
284/// State shared between the accept loop and broadcast callers.
285struct Shared {
286    clients: Vec<ClientConn>,
287    history: History,
288    status: String,
289    daemon_status: Option<DaemonStatus>,
290    last_rebuild_at: Option<u64>,
291    files_count: usize,
292}
293
294struct ClientConn {
295    stream: UnixStream,
296}
297
298impl ClientConn {
299    fn send(&mut self, msg: &ServerMessage) -> bool {
300        let Ok(mut line) = serde_json::to_string(msg) else {
301            return false;
302        };
303        line.push('\n');
304        self.stream.write_all(line.as_bytes()).is_ok() && self.stream.flush().is_ok()
305    }
306}
307
308/// Daemon-side socket server.
309///
310/// Binds a Unix domain socket, accepts client connections, and broadcasts
311/// file-change events and status updates to all connected clients.
312pub struct DaemonServer {
313    shared: Arc<Mutex<Shared>>,
314    listener: UnixListener,
315    socket_path: PathBuf,
316    accept_handle: Option<std::thread::JoinHandle<()>>,
317    running: Arc<std::sync::atomic::AtomicBool>,
318}
319
320impl DaemonServer {
321    /// Create and bind a new daemon socket server for the given watched root.
322    ///
323    /// The socket path is derived from the canonical root (see [`socket_path`]).
324    /// Any existing socket file at the path is removed before binding.
325    ///
326    /// # Errors
327    ///
328    /// Returns an error if the parent directory cannot be created or the
329    /// socket cannot be bound.
330    pub fn new(root: &Path) -> Result<Self> {
331        let sp = socket_path(root);
332        ensure_socket_dir(&sp)?;
333
334        if sp.exists() || sp.is_symlink() {
335            let msg = if sp.is_symlink() {
336                format!("symlink attack detected at {}", sp.display())
337            } else {
338                format!("socket file already exists at {}", sp.display())
339            };
340            return Err(DaemonSockError::Io(std::io::Error::new(
341                std::io::ErrorKind::AddrInUse,
342                msg,
343            )));
344        }
345
346        let listener = UnixListener::bind(&sp)?;
347
348        let shared = Arc::new(Mutex::new(Shared {
349            clients: Vec::new(),
350            history: History::new(),
351            status: "idle".to_string(),
352            daemon_status: Some(DaemonStatus::Idle),
353            last_rebuild_at: None,
354            files_count: 0,
355        }));
356        let running = Arc::new(std::sync::atomic::AtomicBool::new(true));
357
358        Ok(Self {
359            shared,
360            listener,
361            socket_path: sp,
362            accept_handle: None,
363            running,
364        })
365    }
366
367    /// Return the filesystem path of the bound socket.
368    #[must_use]
369    pub fn path(&self) -> &Path {
370        &self.socket_path
371    }
372
373    /// Start the accept-and-read loop in a background thread.
374    ///
375    /// After calling `start()`, the server will accept new connections and
376    /// respond to client queries automatically. Call [`DaemonServer::broadcast`]
377    /// from the main loop to push events to all connected clients.
378    ///
379    /// # Errors
380    ///
381    /// Returns an error if the listener cannot be cloned, the accept thread
382    /// cannot be spawned, or file descriptor operations fail.
383    pub fn start(&mut self) -> Result<()> {
384        let listener = self.listener.try_clone().map_err(DaemonSockError::Io)?;
385        let shared = Arc::clone(&self.shared);
386        let running = Arc::clone(&self.running);
387
388        let handle = std::thread::Builder::new()
389            .name("ixd-sock-accept".to_string())
390            .spawn(move || {
391                if let Err(e) = listener.set_nonblocking(true) {
392                    tracing::error!("ixd: cannot set nonblocking: {e}");
393                    return;
394                }
395
396                while running.load(std::sync::atomic::Ordering::SeqCst) {
397                    match listener.accept() {
398                        Ok((stream, _)) => {
399                            if let Err(e) = stream.set_nonblocking(false) {
400                                tracing::warn!("ixd: cannot set blocking on client: {e}");
401                                continue;
402                            }
403                            let _ =
404                                stream.set_write_timeout(Some(std::time::Duration::from_secs(5)));
405                            let read_stream = match stream.try_clone() {
406                                Ok(s) => s,
407                                Err(e) => {
408                                    tracing::warn!("ixd: cannot clone stream: {e}");
409                                    continue;
410                                }
411                            };
412                            let shared_clone = Arc::clone(&shared);
413                            let running_clone = Arc::clone(&running);
414                            if let Err(e) = std::thread::Builder::new()
415                                .name("ixd-sock-client".to_string())
416                                .spawn(move || {
417                                    client_read_loop(&read_stream, &shared_clone, &running_clone);
418                                })
419                            {
420                                tracing::warn!("ixd: failed to spawn client thread: {e}");
421                                continue;
422                            }
423                            let conn = ClientConn { stream };
424                            if let Ok(mut s) = shared.lock() {
425                                s.clients.push(conn);
426                            }
427                        }
428                        Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
429                            std::thread::sleep(std::time::Duration::from_millis(100));
430                        }
431                        Err(e) => {
432                            tracing::warn!("ixd: accept error: {e}");
433                            std::thread::sleep(std::time::Duration::from_millis(200));
434                        }
435                    }
436                }
437            })
438            .map_err(DaemonSockError::Io)?;
439
440        self.accept_handle = Some(handle);
441        Ok(())
442    }
443
444    /// Broadcast a server message to all connected clients.
445    ///
446    /// Disconnected clients are automatically removed. The message is
447    /// serialized once and written to each client with a short write
448    /// timeout to prevent a slow consumer from blocking the daemon.
449    pub fn broadcast(&self, msg: &ServerMessage) {
450        let Ok(mut s) = self.shared.lock() else {
451            return;
452        };
453        s.clients.retain_mut(|c| c.send(msg));
454    }
455
456    /// Update the daemon status and file count (reflected in subsequent
457    /// broadcasts and query responses).
458    pub fn set_status(&self, daemon_status: &DaemonStatus, files_count: usize) {
459        if let Ok(mut s) = self.shared.lock() {
460            s.status = daemon_status.to_string();
461            s.daemon_status = Some(daemon_status.clone());
462            s.files_count = files_count;
463        }
464    }
465
466    /// Record a file-change batch in the history buffer and broadcast it.
467    pub fn notify_changes(&self, changes: Vec<FileChange>, files_count: usize) {
468        let timestamp = now_secs();
469        if let Ok(mut s) = self.shared.lock() {
470            s.history.push(timestamp, changes.clone());
471            s.files_count = files_count;
472            if matches!(s.daemon_status, Some(DaemonStatus::Idle)) {
473                s.status = "idle".to_string();
474                s.daemon_status = Some(DaemonStatus::Idle);
475                s.last_rebuild_at = Some(timestamp);
476            }
477            let msg = ServerMessage::FilesChanged {
478                batch: changes,
479                timestamp,
480            };
481            s.clients.retain_mut(|c| c.send(&msg));
482        }
483    }
484}
485fn client_read_loop(
486    stream: &UnixStream,
487    shared: &Arc<Mutex<Shared>>,
488    running: &Arc<std::sync::atomic::AtomicBool>,
489) {
490    let _ = stream.set_read_timeout(Some(std::time::Duration::from_secs(5)));
491    let mut reader = BufReader::new(stream);
492    let mut line_buf = String::new();
493
494    loop {
495        if !running.load(std::sync::atomic::Ordering::SeqCst) {
496            break;
497        }
498        line_buf.clear();
499        match reader.read_line(&mut line_buf) {
500            Ok(0) => break,
501            Ok(_) => {
502                let msg: ClientMessage = match serde_json::from_str(&line_buf) {
503                    Ok(m) => m,
504                    Err(e) => {
505                        tracing::debug!("ixd: malformed client message: {e}");
506                        continue;
507                    }
508                };
509
510                let response = match msg {
511                    ClientMessage::StatusQuery { id } => {
512                        let Ok(s) = shared.lock() else {
513                            tracing::warn!("ixd: shared lock poisoned in status query");
514                            continue;
515                        };
516                        ServerMessage::QueryResult {
517                            id,
518                            status: s.status.clone(),
519                            files: s.files_count,
520                            changes_since: Vec::new(),
521                            daemon_status: s.daemon_status.clone(),
522                            last_rebuild_at: s.last_rebuild_at,
523                        }
524                    }
525                    ClientMessage::HistoryQuery { since, id } => {
526                        let Ok(s) = shared.lock() else {
527                            tracing::warn!("ixd: shared lock poisoned in history query");
528                            continue;
529                        };
530                        let changes = s.history.since(since);
531                        ServerMessage::QueryResult {
532                            id,
533                            status: s.status.clone(),
534                            files: s.files_count,
535                            changes_since: changes,
536                            daemon_status: s.daemon_status.clone(),
537                            last_rebuild_at: s.last_rebuild_at,
538                        }
539                    }
540                };
541
542                if let Ok(mut write_stream) = stream.try_clone() {
543                    match serde_json::to_string(&response) {
544                        Ok(mut line) => {
545                            line.push('\n');
546                            if write_stream.write_all(line.as_bytes()).is_err()
547                                || write_stream.flush().is_err()
548                            {
549                                break;
550                            }
551                        }
552                        Err(e) => {
553                            tracing::warn!("ixd: failed to serialize query response: {e}");
554                            break;
555                        }
556                    }
557                }
558            }
559            Err(e) if e.kind() == std::io::ErrorKind::TimedOut => {}
560            Err(_) => break,
561        }
562    }
563}
564
565/// Client-side connection to an ixd daemon socket.
566pub struct DaemonClient {
567    stream: BufReader<UnixStream>,
568}
569
570impl DaemonClient {
571    /// Connect to the daemon socket for the given watched root.
572    ///
573    /// # Errors
574    ///
575    /// Returns an error if the socket does not exist or the connection fails.
576    pub fn connect(root: &Path) -> Result<Self> {
577        let sp = socket_path(root);
578        let stream = UnixStream::connect(&sp)?;
579        stream.set_read_timeout(Some(std::time::Duration::from_secs(5)))?;
580        stream.set_write_timeout(Some(std::time::Duration::from_secs(5)))?;
581        Ok(Self {
582            stream: BufReader::new(stream),
583        })
584    }
585
586    /// Receive the next message from the daemon (blocking).
587    ///
588    /// # Errors
589    ///
590    /// Returns an error on I/O failure, timeout (5s), or malformed JSON.
591    pub fn recv(&mut self) -> Result<ServerMessage> {
592        let mut line = String::new();
593        let bytes = self.stream.read_line(&mut line).map_err(|e| {
594            if e.kind() == std::io::ErrorKind::TimedOut {
595                DaemonSockError::Io(std::io::Error::new(
596                    std::io::ErrorKind::TimedOut,
597                    "recv timed out after 5s",
598                ))
599            } else {
600                DaemonSockError::Io(e)
601            }
602        })?;
603        if bytes == 0 {
604            return Err(DaemonSockError::Io(std::io::Error::new(
605                std::io::ErrorKind::UnexpectedEof,
606                "daemon closed connection",
607            )));
608        }
609        let msg: ServerMessage = serde_json::from_str(line.trim_end()).map_err(|e| {
610            DaemonSockError::Io(std::io::Error::new(
611                std::io::ErrorKind::InvalidData,
612                format!("invalid JSON: {e}"),
613            ))
614        })?;
615        Ok(msg)
616    }
617
618    /// Send a query message to the daemon.
619    ///
620    /// # Errors
621    ///
622    /// Returns an error on I/O failure or serialization error.
623    pub fn send(&mut self, msg: &ClientMessage) -> Result<()> {
624        let stream = self.stream.get_mut();
625        let mut line = serde_json::to_string(msg)?;
626        line.push('\n');
627        stream.write_all(line.as_bytes())?;
628        stream.flush()?;
629        Ok(())
630    }
631}
632
633/// Current Unix timestamp in seconds.
634fn now_secs() -> u64 {
635    SystemTime::now()
636        .duration_since(UNIX_EPOCH)
637        .unwrap_or_default()
638        .as_secs()
639}
640
641impl Drop for DaemonServer {
642    fn drop(&mut self) {
643        self.running
644            .store(false, std::sync::atomic::Ordering::SeqCst);
645        if let Some(handle) = self.accept_handle.take() {
646            let _ = handle.join();
647        }
648        let _ = std::fs::remove_file(&self.socket_path);
649    }
650}
651
652#[cfg(test)]
653mod tests {
654    use super::*;
655    use std::path::PathBuf;
656
657    #[test]
658    fn socket_path_deterministic() {
659        let root = PathBuf::from("/tmp/test-project");
660        let p1 = socket_path(&root);
661        let p2 = socket_path(&root);
662        assert_eq!(p1, p2, "same root must produce same socket path");
663    }
664
665    #[test]
666    fn socket_path_different_roots() {
667        let r1 = PathBuf::from("/tmp/project-a");
668        let r2 = PathBuf::from("/tmp/project-b");
669        assert_ne!(socket_path(&r1), socket_path(&r2));
670    }
671
672    #[test]
673    fn socket_path_uses_xdg() {
674        unsafe { std::env::set_var("XDG_RUNTIME_DIR", "/tmp/xdg-test-runtime") };
675        let p = socket_path(Path::new("/tmp/some-project"));
676        assert!(p.starts_with("/tmp/xdg-test-runtime/ixd/"));
677        assert!(p.extension().is_some_and(|e| e == "sock"));
678        unsafe { std::env::remove_var("XDG_RUNTIME_DIR") };
679    }
680
681    #[test]
682    fn server_message_ndjson_roundtrip() {
683        let msg = ServerMessage::Status {
684            pid: 1234,
685            status: "idle".to_string(),
686            files: 42,
687            daemon_status: None,
688        };
689        let json = serde_json::to_string(&msg).expect("serialize");
690        assert!(json.contains("\"t\":\"status\""), "tag field present");
691        assert!(
692            !json.contains("daemon_status"),
693            "daemon_status should be omitted when None"
694        );
695
696        let back: ServerMessage = serde_json::from_str(&json).expect("deserialize");
697        if let ServerMessage::Status {
698            pid,
699            status,
700            files,
701            daemon_status,
702        } = back
703        {
704            assert_eq!(pid, 1234);
705            assert_eq!(status, "idle");
706            assert_eq!(files, 42);
707            assert_eq!(daemon_status, None);
708        } else {
709            panic!("wrong variant after roundtrip");
710        }
711    }
712
713    #[test]
714    fn files_changed_roundtrip() {
715        let msg = ServerMessage::FilesChanged {
716            batch: vec![FileChange {
717                path: PathBuf::from("src/main.rs"),
718                mtime: 1_776_468_629,
719                op: FileOp::Modify,
720            }],
721            timestamp: 1_776_468_629,
722        };
723        let json = serde_json::to_string(&msg).expect("serialize");
724        let back: ServerMessage = serde_json::from_str(&json).expect("deserialize");
725        if let ServerMessage::FilesChanged { batch, timestamp } = back {
726            assert_eq!(batch.len(), 1);
727            assert_eq!(batch[0].path, PathBuf::from("src/main.rs"));
728            assert_eq!(timestamp, 1_776_468_629);
729        } else {
730            panic!("wrong variant");
731        }
732    }
733
734    #[test]
735    fn client_message_roundtrip() {
736        let msg = ClientMessage::HistoryQuery { since: 1000, id: 7 };
737        let json = serde_json::to_string(&msg).expect("serialize");
738        let back: ClientMessage = serde_json::from_str(&json).expect("deserialize");
739        if let ClientMessage::HistoryQuery { since, id } = back {
740            assert_eq!(since, 1000);
741            assert_eq!(id, 7);
742        } else {
743            panic!("wrong variant");
744        }
745    }
746
747    #[test]
748    fn history_since() {
749        let mut h = History::new();
750        h.push(
751            100,
752            vec![FileChange {
753                path: PathBuf::from("a.rs"),
754                mtime: 100,
755                op: FileOp::Create,
756            }],
757        );
758        h.push(
759            200,
760            vec![FileChange {
761                path: PathBuf::from("b.rs"),
762                mtime: 200,
763                op: FileOp::Modify,
764            }],
765        );
766        h.push(
767            300,
768            vec![FileChange {
769                path: PathBuf::from("c.rs"),
770                mtime: 300,
771                op: FileOp::Delete,
772            }],
773        );
774
775        let changes = h.since(150);
776        assert_eq!(changes.len(), 2);
777        assert_eq!(changes[0].path, PathBuf::from("b.rs"));
778        assert_eq!(changes[1].path, PathBuf::from("c.rs"));
779    }
780
781    #[test]
782    fn history_capacity() {
783        let mut h = History::new();
784        for i in 0..=HISTORY_CAPACITY {
785            h.push(
786                i as u64,
787                vec![FileChange {
788                    path: PathBuf::from(format!("f{i}")),
789                    mtime: i as u64,
790                    op: FileOp::Modify,
791                }],
792            );
793        }
794        assert_eq!(h.entries.len(), HISTORY_CAPACITY);
795        // Oldest entry (ts=0) should have been evicted
796        assert_eq!(h.entries.front().expect("non-empty").0, 1);
797    }
798
799    #[test]
800    fn server_client_connect_and_broadcast() {
801        let tmp = tempfile::tempdir().expect("tempdir");
802        let root = tmp.path().to_path_buf();
803
804        let mut server = DaemonServer::new(&root).expect("create server");
805        let sp = server.path().to_path_buf();
806        let _ = server.start();
807
808        // Connect a client
809        let stream = UnixStream::connect(&sp).expect("connect");
810        let mut client = DaemonClient {
811            stream: BufReader::new(stream),
812        };
813
814        // Give the accept thread time to register the client
815        std::thread::sleep(std::time::Duration::from_millis(200));
816
817        server.set_status(&DaemonStatus::Idle, 10);
818
819        // Broadcast a status message
820        server.broadcast(&ServerMessage::Status {
821            pid: 1234,
822            status: "idle".to_string(),
823            files: 10,
824            daemon_status: Some(DaemonStatus::Idle),
825        });
826
827        // Client should receive the message
828        // Use a timeout to avoid hanging forever
829        client
830            .stream
831            .get_mut()
832            .set_read_timeout(Some(std::time::Duration::from_secs(2)))
833            .expect("set timeout");
834
835        match client.recv() {
836            Ok(ServerMessage::Status {
837                pid,
838                status,
839                files,
840                daemon_status,
841            }) => {
842                assert_eq!(pid, 1234);
843                assert_eq!(status, "idle");
844                assert_eq!(files, 10);
845                assert!(daemon_status.is_some());
846            }
847            Ok(other) => panic!("expected Status, got {other:?}"),
848            Err(e) => panic!("recv failed: {e}"),
849        }
850    }
851
852    #[test]
853    fn client_query_status() {
854        let tmp = tempfile::tempdir().expect("tempdir");
855        let root = tmp.path().to_path_buf();
856
857        let mut server = DaemonServer::new(&root).expect("create server");
858        let sp = server.path().to_path_buf();
859        let _ = server.start();
860        server.set_status(&DaemonStatus::Indexing { entropy: 42 }, 99);
861
862        let stream = UnixStream::connect(&sp).expect("connect");
863        let mut client = DaemonClient {
864            stream: BufReader::new(stream),
865        };
866
867        std::thread::sleep(std::time::Duration::from_millis(200));
868
869        client
870            .send(&ClientMessage::StatusQuery { id: 123 })
871            .expect("send query");
872
873        client
874            .stream
875            .get_mut()
876            .set_read_timeout(Some(std::time::Duration::from_secs(2)))
877            .expect("set timeout");
878
879        match client.recv() {
880            Ok(ServerMessage::QueryResult {
881                id,
882                status,
883                files,
884                changes_since,
885                daemon_status,
886                last_rebuild_at,
887            }) => {
888                eprintln!(
889                    "[JSON] id={}, status={}, files={}, daemon_status={:?}, last_rebuild_at={:?}",
890                    id, status, files, daemon_status, last_rebuild_at
891                );
892                assert_eq!(id, 123);
893                assert_eq!(status, "indexing (entropy: 42)");
894                assert_eq!(files, 99);
895                assert!(changes_since.is_empty());
896                assert_eq!(daemon_status, Some(DaemonStatus::Indexing { entropy: 42 }));
897                assert_eq!(last_rebuild_at, None);
898            }
899            Ok(other) => panic!("expected QueryResult, got {other:?}"),
900            Err(e) => panic!("recv failed: {e}"),
901        }
902    }
903}