Skip to main content

ix/
daemon_sock.rs

1//! Unix domain socket interface for the ixd daemon.
2//!
3//! Provides real-time file-change notifications and status queries over a
4//! local Unix domain socket using NDJSON (newline-delimited JSON) framing.
5//!
6//! # Socket Path Resolution
7//!
8//! The socket path is derived from the canonical watched root:
9//!
10//! ```text
11//! $XDG_RUNTIME_DIR/ixd/{hash}.sock        # preferred (systemd, modern Linux)
12//! ~/.local/run/ixd/{hash}.sock             # fallback
13//! /tmp/ixd-{uid}-{hash}.sock              # last resort
14//! ```
15//!
16//! Where `hash` = first 16 hex chars of `XXH64(canonical_path, seed=0)`.
17//!
18//! # Wire Protocol (NDJSON)
19//!
20//! Each line is a valid JSON object terminated by `\\n`.
21//!
22//! **Server → Client (push):**
23//!
24//! ```json
25//! {"t":"status","pid":1234,"status":"idle","files":1523}
26//! {"t":"files_changed","batch":[{"p":"src/main.rs","m":1776468629,"o":"modify"}],"ts":1776468629}
27//! ```
28//!
29//! **Client → Server (query):**
30//!
31//! ```json
32//! {"t":"status_query"}
33//! {"t":"history_query","since":1776468000,"id":1}
34//! ```
35//!
36//! **Server → Client (query response):**
37//!
38//! ```json
39//! {"t":"query_result","id":1,"status":"idle","files":1523,"changes_since":[...]}
40//! ```
41
42use serde::{Deserialize, Serialize};
43use std::collections::VecDeque;
44use std::io::{BufRead, BufReader, Write};
45use std::os::unix::net::{UnixListener, UnixStream};
46use std::path::{Path, PathBuf};
47use std::sync::{Arc, Mutex};
48use std::time::{SystemTime, UNIX_EPOCH};
49
50/// Maximum number of change batches retained for history queries.
51const HISTORY_CAPACITY: usize = 1024;
52
53/// File change operation kind.
54#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
55#[serde(rename_all = "lowercase")]
56pub enum FileOp {
57    /// File was created.
58    Create,
59    /// File was modified.
60    Modify,
61    /// File was deleted.
62    Delete,
63    /// File was renamed.
64    Rename,
65}
66
67impl FileOp {
68    /// Convert from the notify crate's event kind to our serializable enum.
69    #[must_use]
70    pub fn from_notify_kind(kind: notify::EventKind) -> Self {
71        match kind {
72            notify::EventKind::Create(_) => Self::Create,
73            notify::EventKind::Remove(_) => Self::Delete,
74            _ => Self::Modify,
75        }
76    }
77}
78
79/// Typed daemon status enum for structured status tracking.
80#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
81#[serde(tag = "state", rename_all = "snake_case")]
82pub enum DaemonStatus {
83    /// Daemon is idle, no active rebuild.
84    Idle,
85    /// Active index rebuild in progress.
86    Indexing {
87        /// Current entropy reading.
88        entropy: u16,
89    },
90    /// Rebuild deferred due to high entropy.
91    Deferred {
92        /// Current entropy reading.
93        entropy: u16,
94    },
95    /// Safety escalation triggered.
96    Escalated {
97        /// Current entropy reading.
98        entropy: u16,
99    },
100    /// Safety warning issued.
101    Warned {
102        /// Warning reason.
103        reason: String,
104    },
105    /// Critical safety halt — daemon stopped.
106    SafetyHalt,
107    /// Unrecoverable safety exit.
108    SafetyExit,
109}
110
111impl std::fmt::Display for DaemonStatus {
112    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113        match self {
114            Self::Idle { .. } => write!(f, "idle"),
115            Self::Indexing { entropy } => write!(f, "indexing (entropy: {entropy})"),
116            Self::Deferred { entropy } => write!(f, "deferred (entropy: {entropy})"),
117            Self::Escalated { entropy } => write!(f, "escalated (entropy: {entropy})"),
118            Self::Warned { reason } => write!(f, "warned: {reason}"),
119            Self::SafetyHalt => write!(f, "safety halt"),
120            Self::SafetyExit => write!(f, "safety exit"),
121        }
122    }
123}
124
125/// A single file change record broadcast to connected clients.
126#[derive(Debug, Clone, Serialize, Deserialize)]
127pub struct FileChange {
128    /// Path of the changed file (relative to watched root when possible).
129    #[serde(rename = "p")]
130    pub path: PathBuf,
131    /// Modification timestamp (Unix seconds).
132    #[serde(rename = "m")]
133    pub mtime: u64,
134    /// Operation performed on the file.
135    #[serde(rename = "o")]
136    pub op: FileOp,
137}
138
139/// Messages sent from the server to connected clients.
140#[derive(Debug, Clone, Serialize, Deserialize)]
141#[serde(tag = "t", rename_all = "snake_case")]
142pub enum ServerMessage {
143    /// Periodic or on-change daemon status update.
144    Status {
145        /// PID of the daemon process.
146        pid: u32,
147        /// Human-readable status string (e.g. "idle", "indexing").
148        status: String,
149        /// Number of files currently in the index.
150        files: usize,
151        /// Typed daemon status (present when daemon is running).
152        #[serde(skip_serializing_if = "Option::is_none")]
153        daemon_status: Option<DaemonStatus>,
154    },
155    /// Batch of file changes detected by the watcher.
156    FilesChanged {
157        /// The changed files in this batch.
158        batch: Vec<FileChange>,
159        /// Timestamp of this event batch (Unix seconds).
160        #[serde(rename = "ts")]
161        timestamp: u64,
162    },
163    /// Response to a client query.
164    QueryResult {
165        /// Query ID (matches the `id` field from the request).
166        id: u64,
167        /// Current daemon status at query time.
168        status: String,
169        /// Number of files in the index.
170        files: usize,
171        /// Changes since the requested timestamp (for history queries).
172        #[serde(default, skip_serializing_if = "Vec::is_empty")]
173        changes_since: Vec<FileChange>,
174        /// Typed daemon status (present when daemon is running).
175        #[serde(skip_serializing_if = "Option::is_none")]
176        daemon_status: Option<DaemonStatus>,
177        /// Timestamp of the last successful rebuild completion.
178        #[serde(skip_serializing_if = "Option::is_none")]
179        last_rebuild_at: Option<u64>,
180    },
181}
182
183/// Messages sent from connected clients to the daemon server.
184#[derive(Debug, Clone, Serialize, Deserialize)]
185#[serde(tag = "t", rename_all = "snake_case")]
186pub enum ClientMessage {
187    /// Request current daemon status.
188    StatusQuery {
189        /// Client-assigned query ID (echoed back in the response).
190        #[serde(default)]
191        id: u64,
192    },
193    /// Request all changes since the given timestamp.
194    HistoryQuery {
195        /// Return changes with timestamps strictly after this value.
196        since: u64,
197        /// Client-assigned query ID (echoed back in the response).
198        id: u64,
199    },
200}
201
202/// Errors specific to the daemon socket subsystem.
203#[derive(Debug, thiserror::Error)]
204pub enum DaemonSockError {
205    /// I/O error on the socket.
206    #[error("daemon socket I/O: {0}")]
207    Io(#[from] std::io::Error),
208    /// JSON serialization or deserialization error.
209    #[error("daemon socket JSON: {0}")]
210    Json(#[from] serde_json::Error),
211    /// Could not resolve a suitable socket path.
212    #[error("daemon socket path resolution failed")]
213    PathResolution,
214}
215
216type Result<T> = std::result::Result<T, DaemonSockError>;
217
218/// Resolves the socket path for a given watched root directory.
219///
220/// Tries in order:
221/// 1. `$XDG_RUNTIME_DIR/ixd/{hash}.sock`
222/// 2. `$HOME/.local/run/ixd/{hash}.sock`
223/// 3. `/tmp/ixd-{uid}-{hash}.sock`
224///
225/// Where `hash` is the first 16 hex characters of `XXH64(canonical_root, 0)`.
226#[must_use]
227pub fn socket_path(root: &Path) -> PathBuf {
228    let canonical = root.canonicalize().unwrap_or_else(|_| root.to_path_buf());
229    let hash = format!(
230        "{:016x}",
231        xxhash_rust::xxh64::xxh64(canonical.to_string_lossy().as_bytes(), 0,)
232    );
233
234    if let Ok(xdg) = std::env::var("XDG_RUNTIME_DIR") {
235        let dir = PathBuf::from(xdg).join("ixd");
236        return dir.join(format!("{hash}.sock"));
237    }
238
239    if let Ok(home) = std::env::var("HOME") {
240        let dir = PathBuf::from(home).join(".local/run/ixd");
241        return dir.join(format!("{hash}.sock"));
242    }
243
244    let uid = unsafe { libc::getuid() };
245    PathBuf::from(format!("/tmp/ixd-{uid}-{hash}.sock"))
246}
247
248/// Ensure the parent directory of a socket path exists.
249fn ensure_socket_dir(path: &Path) -> std::io::Result<()> {
250    if let Some(parent) = path.parent() {
251        std::fs::create_dir_all(parent)?;
252    }
253    Ok(())
254}
255
256/// Circular buffer of recent file-change batches for history queries.
257struct History {
258    entries: VecDeque<(u64, Vec<FileChange>)>,
259}
260
261impl History {
262    fn new() -> Self {
263        Self {
264            entries: VecDeque::with_capacity(HISTORY_CAPACITY),
265        }
266    }
267
268    fn push(&mut self, timestamp: u64, changes: Vec<FileChange>) {
269        if self.entries.len() >= HISTORY_CAPACITY {
270            self.entries.pop_front();
271        }
272        self.entries.push_back((timestamp, changes));
273    }
274
275    fn since(&self, cutoff: u64) -> Vec<FileChange> {
276        self.entries
277            .iter()
278            .filter(|(ts, _)| *ts > cutoff)
279            .flat_map(|(_, changes)| changes.iter().cloned())
280            .collect()
281    }
282}
283
284/// State shared between the accept loop and broadcast callers.
285struct Shared {
286    clients: Vec<ClientConn>,
287    history: History,
288    status: String,
289    daemon_status: Option<DaemonStatus>,
290    last_rebuild_at: Option<u64>,
291    files_count: usize,
292}
293
294struct ClientConn {
295    stream: UnixStream,
296}
297
298impl ClientConn {
299    fn send(&mut self, msg: &ServerMessage) -> bool {
300        let Ok(mut line) = serde_json::to_string(msg) else {
301            return false;
302        };
303        line.push('\n');
304        self.stream.write_all(line.as_bytes()).is_ok() && self.stream.flush().is_ok()
305    }
306}
307
308/// Daemon-side socket server.
309///
310/// Binds a Unix domain socket, accepts client connections, and broadcasts
311/// file-change events and status updates to all connected clients.
312pub struct DaemonServer {
313    shared: Arc<Mutex<Shared>>,
314    listener: UnixListener,
315    socket_path: PathBuf,
316    accept_handle: Option<std::thread::JoinHandle<()>>,
317    running: Arc<std::sync::atomic::AtomicBool>,
318}
319
320impl DaemonServer {
321    /// Create and bind a new daemon socket server for the given watched root.
322    ///
323    /// The socket path is derived from the canonical root (see [`socket_path`]).
324    /// Any existing socket file at the path is removed before binding.
325    ///
326    /// # Errors
327    ///
328    /// Returns an error if the parent directory cannot be created or the
329    /// socket cannot be bound.
330    pub fn new(root: &Path) -> Result<Self> {
331        let sp = socket_path(root);
332        ensure_socket_dir(&sp)?;
333
334        if sp.exists() || sp.is_symlink() {
335            let msg = if sp.is_symlink() {
336                format!("symlink attack detected at {}", sp.display())
337            } else {
338                format!("socket file already exists at {}", sp.display())
339            };
340            return Err(DaemonSockError::Io(std::io::Error::new(
341                std::io::ErrorKind::AddrInUse,
342                msg,
343            )));
344        }
345
346        let listener = UnixListener::bind(&sp)?;
347
348        let shared = Arc::new(Mutex::new(Shared {
349            clients: Vec::new(),
350            history: History::new(),
351            status: "idle".to_string(),
352            daemon_status: Some(DaemonStatus::Idle),
353            last_rebuild_at: None,
354            files_count: 0,
355        }));
356        let running = Arc::new(std::sync::atomic::AtomicBool::new(true));
357
358        Ok(Self {
359            shared,
360            listener,
361            socket_path: sp,
362            accept_handle: None,
363            running,
364        })
365    }
366
367    /// Return the filesystem path of the bound socket.
368    #[must_use]
369    pub fn path(&self) -> &Path {
370        &self.socket_path
371    }
372
373    /// Start the accept-and-read loop in a background thread.
374    ///
375    /// After calling `start()`, the server will accept new connections and
376    /// respond to client queries automatically. Call [`DaemonServer::broadcast`]
377    /// from the main loop to push events to all connected clients.
378    ///
379    /// # Errors
380    ///
381    /// Returns an error if the listener cannot be cloned, the accept thread
382    /// cannot be spawned, or file descriptor operations fail.
383    pub fn start(&mut self) -> Result<()> {
384        let listener = self.listener.try_clone().map_err(DaemonSockError::Io)?;
385        let shared = Arc::clone(&self.shared);
386        let running = Arc::clone(&self.running);
387
388        let handle = std::thread::Builder::new()
389            .name("ixd-sock-accept".to_string())
390            .spawn(move || {
391                if let Err(e) = listener.set_nonblocking(true) {
392                    tracing::error!("ixd: cannot set nonblocking: {e}");
393                    return;
394                }
395
396                while running.load(std::sync::atomic::Ordering::SeqCst) {
397                    match listener.accept() {
398                        Ok((stream, _)) => {
399                            if let Err(e) = stream.set_nonblocking(false) {
400                                tracing::warn!("ixd: cannot set blocking on client: {e}");
401                                continue;
402                            }
403                            let _ =
404                                stream.set_write_timeout(Some(std::time::Duration::from_secs(5)));
405                            let read_stream = match stream.try_clone() {
406                                Ok(s) => s,
407                                Err(e) => {
408                                    tracing::warn!("ixd: cannot clone stream: {e}");
409                                    continue;
410                                }
411                            };
412                            let shared_clone = Arc::clone(&shared);
413                            let running_clone = Arc::clone(&running);
414                            if let Err(e) = std::thread::Builder::new()
415                                .name("ixd-sock-client".to_string())
416                                .spawn(move || {
417                                    client_read_loop(&read_stream, &shared_clone, &running_clone);
418                                })
419                            {
420                                tracing::warn!("ixd: failed to spawn client thread: {e}");
421                                continue;
422                            }
423                            let conn = ClientConn { stream };
424                            if let Ok(mut s) = shared.lock() {
425                                s.clients.push(conn);
426                            }
427                        }
428                        Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
429                            std::thread::sleep(std::time::Duration::from_millis(100));
430                        }
431                        Err(e) => {
432                            tracing::warn!("ixd: accept error: {e}");
433                            std::thread::sleep(std::time::Duration::from_millis(200));
434                        }
435                    }
436                }
437            })
438            .map_err(DaemonSockError::Io)?;
439
440        self.accept_handle = Some(handle);
441        Ok(())
442    }
443
444    /// Broadcast a server message to all connected clients.
445    ///
446    /// Disconnected clients are automatically removed. The message is
447    /// serialized once and written to each client with a short write
448    /// timeout to prevent a slow consumer from blocking the daemon.
449    pub fn broadcast(&self, msg: &ServerMessage) {
450        let Ok(mut s) = self.shared.lock() else {
451            return;
452        };
453        s.clients.retain_mut(|c| c.send(msg));
454    }
455
456    /// Update the daemon status and file count (reflected in subsequent
457    /// broadcasts and query responses).
458    pub fn set_status(&self, daemon_status: &DaemonStatus, files_count: usize) {
459        if let Ok(mut s) = self.shared.lock() {
460            s.status = daemon_status.to_string();
461            s.daemon_status = Some(daemon_status.clone());
462            s.files_count = files_count;
463        }
464    }
465
466    /// Record a file-change batch in the history buffer and broadcast it.
467    pub fn notify_changes(&self, changes: Vec<FileChange>, files_count: usize) {
468        let timestamp = now_secs();
469        if let Ok(mut s) = self.shared.lock() {
470            s.history.push(timestamp, changes.clone());
471            s.files_count = files_count;
472            if matches!(s.daemon_status, Some(DaemonStatus::Idle)) {
473                s.status = "idle".to_string();
474                s.daemon_status = Some(DaemonStatus::Idle);
475                s.last_rebuild_at = Some(timestamp);
476            }
477            let msg = ServerMessage::FilesChanged {
478                batch: changes,
479                timestamp,
480            };
481            s.clients.retain_mut(|c| c.send(&msg));
482        }
483    }
484}
485fn client_read_loop(
486    stream: &UnixStream,
487    shared: &Arc<Mutex<Shared>>,
488    running: &Arc<std::sync::atomic::AtomicBool>,
489) {
490    let _ = stream.set_read_timeout(Some(std::time::Duration::from_secs(5)));
491    let mut reader = BufReader::new(stream);
492    let mut line_buf = String::new();
493
494    loop {
495        if !running.load(std::sync::atomic::Ordering::SeqCst) {
496            break;
497        }
498        line_buf.clear();
499        match reader.read_line(&mut line_buf) {
500            Ok(0) => break,
501            Ok(_) => {
502                let msg: ClientMessage = match serde_json::from_str(&line_buf) {
503                    Ok(m) => m,
504                    Err(e) => {
505                        tracing::debug!("ixd: malformed client message: {e}");
506                        continue;
507                    }
508                };
509
510                let response = match msg {
511                    ClientMessage::StatusQuery { id } => {
512                        let Ok(s) = shared.lock() else {
513                            tracing::warn!("ixd: shared lock poisoned in status query");
514                            continue;
515                        };
516                        ServerMessage::QueryResult {
517                            id,
518                            status: s.status.clone(),
519                            files: s.files_count,
520                            changes_since: Vec::new(),
521                            daemon_status: s.daemon_status.clone(),
522                            last_rebuild_at: s.last_rebuild_at,
523                        }
524                    }
525                    ClientMessage::HistoryQuery { since, id } => {
526                        let Ok(s) = shared.lock() else {
527                            tracing::warn!("ixd: shared lock poisoned in history query");
528                            continue;
529                        };
530                        let changes = s.history.since(since);
531                        ServerMessage::QueryResult {
532                            id,
533                            status: s.status.clone(),
534                            files: s.files_count,
535                            changes_since: changes,
536                            daemon_status: s.daemon_status.clone(),
537                            last_rebuild_at: s.last_rebuild_at,
538                        }
539                    }
540                };
541
542if let Ok(mut write_stream) = stream.try_clone() {
543        match serde_json::to_string(&response) {
544            Ok(mut line) => {
545                line.push('\n');
546                if write_stream.write_all(line.as_bytes()).is_err()
547                    || write_stream.flush().is_err()
548                {
549                    break;
550                }
551            }
552            Err(e) => {
553                tracing::warn!("ixd: failed to serialize query response: {e}");
554                break;
555            }
556        }
557    }
558            }
559            Err(e) if e.kind() == std::io::ErrorKind::TimedOut => {}
560            Err(_) => break,
561        }
562    }
563}
564
565/// Client-side connection to an ixd daemon socket.
566pub struct DaemonClient {
567    stream: BufReader<UnixStream>,
568}
569
570impl DaemonClient {
571    /// Connect to the daemon socket for the given watched root.
572    ///
573    /// # Errors
574    ///
575    /// Returns an error if the socket does not exist or the connection fails.
576    pub fn connect(root: &Path) -> Result<Self> {
577        let sp = socket_path(root);
578        let stream = UnixStream::connect(&sp)?;
579        stream.set_read_timeout(Some(std::time::Duration::from_secs(5)))?;
580        stream.set_write_timeout(Some(std::time::Duration::from_secs(5)))?;
581        Ok(Self {
582            stream: BufReader::new(stream),
583        })
584    }
585
586    /// Receive the next message from the daemon (blocking).
587    ///
588    /// # Errors
589    ///
590    /// Returns an error on I/O failure, timeout (5s), or malformed JSON.
591    pub fn recv(&mut self) -> Result<ServerMessage> {
592        let mut line = String::new();
593        let bytes = self.stream.read_line(&mut line).map_err(|e| {
594            if e.kind() == std::io::ErrorKind::TimedOut {
595                DaemonSockError::Io(std::io::Error::new(
596                    std::io::ErrorKind::TimedOut,
597                    "recv timed out after 5s",
598                ))
599            } else {
600                DaemonSockError::Io(e)
601            }
602        })?;
603        if bytes == 0 {
604            return Err(DaemonSockError::Io(std::io::Error::new(
605                std::io::ErrorKind::UnexpectedEof,
606                "daemon closed connection",
607            )));
608        }
609        let msg: ServerMessage = serde_json::from_str(line.trim_end()).map_err(|e| {
610            DaemonSockError::Io(std::io::Error::new(
611                std::io::ErrorKind::InvalidData,
612                format!("invalid JSON: {e}"),
613            ))
614        })?;
615        Ok(msg)
616    }
617
618    /// Send a query message to the daemon.
619    ///
620    /// # Errors
621    ///
622    /// Returns an error on I/O failure or serialization error.
623    pub fn send(&mut self, msg: &ClientMessage) -> Result<()> {
624        let stream = self.stream.get_mut();
625        let mut line = serde_json::to_string(msg)?;
626        line.push('\n');
627        stream.write_all(line.as_bytes())?;
628        stream.flush()?;
629        Ok(())
630    }
631}
632
633/// Current Unix timestamp in seconds.
634fn now_secs() -> u64 {
635    SystemTime::now()
636        .duration_since(UNIX_EPOCH)
637        .unwrap_or_default()
638        .as_secs()
639}
640
641impl Drop for DaemonServer {
642    fn drop(&mut self) {
643        self.running
644            .store(false, std::sync::atomic::Ordering::SeqCst);
645        if let Some(handle) = self.accept_handle.take() {
646            let _ = handle.join();
647        }
648        let _ = std::fs::remove_file(&self.socket_path);
649    }
650}
651
652#[cfg(test)]
653mod tests {
654    use super::*;
655    use std::path::PathBuf;
656
657    #[test]
658    fn socket_path_deterministic() {
659        let root = PathBuf::from("/tmp/test-project");
660        let p1 = socket_path(&root);
661        let p2 = socket_path(&root);
662        assert_eq!(p1, p2, "same root must produce same socket path");
663    }
664
665    #[test]
666    fn socket_path_different_roots() {
667        let r1 = PathBuf::from("/tmp/project-a");
668        let r2 = PathBuf::from("/tmp/project-b");
669        assert_ne!(socket_path(&r1), socket_path(&r2));
670    }
671
672    #[test]
673    fn socket_path_uses_xdg() {
674        unsafe { std::env::set_var("XDG_RUNTIME_DIR", "/tmp/xdg-test-runtime") };
675        let p = socket_path(Path::new("/tmp/some-project"));
676        assert!(p.starts_with("/tmp/xdg-test-runtime/ixd/"));
677        assert!(p.extension().is_some_and(|e| e == "sock"));
678        unsafe { std::env::remove_var("XDG_RUNTIME_DIR") };
679    }
680
681    #[test]
682    fn server_message_ndjson_roundtrip() {
683        let msg = ServerMessage::Status {
684            pid: 1234,
685            status: "idle".to_string(),
686            files: 42,
687            daemon_status: None,
688        };
689        let json = serde_json::to_string(&msg).expect("serialize");
690        assert!(json.contains("\"t\":\"status\""), "tag field present");
691        assert!(!json.contains("daemon_status"), "daemon_status should be omitted when None");
692
693        let back: ServerMessage = serde_json::from_str(&json).expect("deserialize");
694        if let ServerMessage::Status { pid, status, files, daemon_status } = back {
695            assert_eq!(pid, 1234);
696            assert_eq!(status, "idle");
697            assert_eq!(files, 42);
698            assert_eq!(daemon_status, None);
699        } else {
700            panic!("wrong variant after roundtrip");
701        }
702    }
703
704    #[test]
705    fn files_changed_roundtrip() {
706        let msg = ServerMessage::FilesChanged {
707            batch: vec![FileChange {
708                path: PathBuf::from("src/main.rs"),
709                mtime: 1_776_468_629,
710                op: FileOp::Modify,
711            }],
712            timestamp: 1_776_468_629,
713        };
714        let json = serde_json::to_string(&msg).expect("serialize");
715        let back: ServerMessage = serde_json::from_str(&json).expect("deserialize");
716        if let ServerMessage::FilesChanged { batch, timestamp } = back {
717            assert_eq!(batch.len(), 1);
718            assert_eq!(batch[0].path, PathBuf::from("src/main.rs"));
719            assert_eq!(timestamp, 1_776_468_629);
720        } else {
721            panic!("wrong variant");
722        }
723    }
724
725    #[test]
726    fn client_message_roundtrip() {
727        let msg = ClientMessage::HistoryQuery { since: 1000, id: 7 };
728        let json = serde_json::to_string(&msg).expect("serialize");
729        let back: ClientMessage = serde_json::from_str(&json).expect("deserialize");
730        if let ClientMessage::HistoryQuery { since, id } = back {
731            assert_eq!(since, 1000);
732            assert_eq!(id, 7);
733        } else {
734            panic!("wrong variant");
735        }
736    }
737
738    #[test]
739    fn history_since() {
740        let mut h = History::new();
741        h.push(
742            100,
743            vec![FileChange {
744                path: PathBuf::from("a.rs"),
745                mtime: 100,
746                op: FileOp::Create,
747            }],
748        );
749        h.push(
750            200,
751            vec![FileChange {
752                path: PathBuf::from("b.rs"),
753                mtime: 200,
754                op: FileOp::Modify,
755            }],
756        );
757        h.push(
758            300,
759            vec![FileChange {
760                path: PathBuf::from("c.rs"),
761                mtime: 300,
762                op: FileOp::Delete,
763            }],
764        );
765
766        let changes = h.since(150);
767        assert_eq!(changes.len(), 2);
768        assert_eq!(changes[0].path, PathBuf::from("b.rs"));
769        assert_eq!(changes[1].path, PathBuf::from("c.rs"));
770    }
771
772    #[test]
773    fn history_capacity() {
774        let mut h = History::new();
775        for i in 0..=HISTORY_CAPACITY {
776            h.push(
777                i as u64,
778                vec![FileChange {
779                    path: PathBuf::from(format!("f{i}")),
780                    mtime: i as u64,
781                    op: FileOp::Modify,
782                }],
783            );
784        }
785        assert_eq!(h.entries.len(), HISTORY_CAPACITY);
786        // Oldest entry (ts=0) should have been evicted
787        assert_eq!(h.entries.front().expect("non-empty").0, 1);
788    }
789
790    #[test]
791    fn server_client_connect_and_broadcast() {
792        let tmp = tempfile::tempdir().expect("tempdir");
793        let root = tmp.path().to_path_buf();
794
795        let mut server = DaemonServer::new(&root).expect("create server");
796        let sp = server.path().to_path_buf();
797        let _ = server.start();
798
799        // Connect a client
800        let stream = UnixStream::connect(&sp).expect("connect");
801        let mut client = DaemonClient {
802            stream: BufReader::new(stream),
803        };
804
805        // Give the accept thread time to register the client
806        std::thread::sleep(std::time::Duration::from_millis(200));
807
808        server.set_status(&DaemonStatus::Idle, 10);
809
810        // Broadcast a status message
811        server.broadcast(&ServerMessage::Status {
812            pid: 1234,
813            status: "idle".to_string(),
814            files: 10,
815            daemon_status: Some(DaemonStatus::Idle),
816        });
817
818        // Client should receive the message
819        // Use a timeout to avoid hanging forever
820        client
821            .stream
822            .get_mut()
823            .set_read_timeout(Some(std::time::Duration::from_secs(2)))
824            .expect("set timeout");
825
826        match client.recv() {
827            Ok(ServerMessage::Status { pid, status, files, daemon_status }) => {
828                assert_eq!(pid, 1234);
829                assert_eq!(status, "idle");
830                assert_eq!(files, 10);
831                assert!(daemon_status.is_some());
832            }
833            Ok(other) => panic!("expected Status, got {other:?}"),
834            Err(e) => panic!("recv failed: {e}"),
835        }
836    }
837
838#[test]
839    fn client_query_status() {
840        let tmp = tempfile::tempdir().expect("tempdir");
841        let root = tmp.path().to_path_buf();
842
843        let mut server = DaemonServer::new(&root).expect("create server");
844        let sp = server.path().to_path_buf();
845        let _ = server.start();
846        server.set_status(&DaemonStatus::Indexing { entropy: 42 }, 99);
847
848        let stream = UnixStream::connect(&sp).expect("connect");
849        let mut client = DaemonClient {
850            stream: BufReader::new(stream),
851        };
852
853        std::thread::sleep(std::time::Duration::from_millis(200));
854
855        client
856            .send(&ClientMessage::StatusQuery { id: 123 })
857            .expect("send query");
858
859        client
860            .stream
861            .get_mut()
862            .set_read_timeout(Some(std::time::Duration::from_secs(2)))
863            .expect("set timeout");
864
865        match client.recv() {
866            Ok(ServerMessage::QueryResult {
867                id,
868                status,
869                files,
870                changes_since,
871                daemon_status,
872                last_rebuild_at,
873            }) => {
874                eprintln!("[JSON] id={}, status={}, files={}, daemon_status={:?}, last_rebuild_at={:?}",
875                    id, status, files, daemon_status, last_rebuild_at);
876                assert_eq!(id, 123);
877                assert_eq!(status, "indexing (entropy: 42)");
878                assert_eq!(files, 99);
879                assert!(changes_since.is_empty());
880                assert_eq!(daemon_status, Some(DaemonStatus::Indexing { entropy: 42 }));
881                assert_eq!(last_rebuild_at, None);
882            }
883            Ok(other) => panic!("expected QueryResult, got {other:?}"),
884            Err(e) => panic!("recv failed: {e}"),
885        }
886    }
887}