Skip to main content

ix/
daemon_sock.rs

1//! Unix domain socket interface for the ixd daemon.
2//!
3//! Provides real-time file-change notifications and status queries over a
4//! local Unix domain socket using NDJSON (newline-delimited JSON) framing.
5//!
6//! # Socket Path Resolution
7//!
8//! The socket path is derived from the canonical watched root:
9//!
10//! ```text
11//! $XDG_RUNTIME_DIR/ixd/{hash}.sock        # preferred (systemd, modern Linux)
12//! ~/.local/run/ixd/{hash}.sock             # fallback
13//! /tmp/ixd-{uid}-{hash}.sock              # last resort
14//! ```
15//!
16//! Where `hash` = first 16 hex chars of `XXH64(canonical_path, seed=0)`.
17//!
18//! # Wire Protocol (NDJSON)
19//!
20//! Each line is a valid JSON object terminated by `\\n`.
21//!
22//! **Server → Client (push):**
23//!
24//! ```json
25//! {"t":"status","pid":1234,"status":"idle","files":1523}
26//! {"t":"files_changed","batch":[{"p":"src/main.rs","m":1776468629,"o":"modify"}],"ts":1776468629}
27//! ```
28//!
29//! **Client → Server (query):**
30//!
31//! ```json
32//! {"t":"status_query"}
33//! {"t":"history_query","since":1776468000,"id":1}
34//! ```
35//!
36//! **Server → Client (query response):**
37//!
38//! ```json
39//! {"t":"query_result","id":1,"status":"idle","files":1523,"changes_since":[...]}
40//! ```
41
42use serde::{Deserialize, Serialize};
43use std::collections::VecDeque;
44use std::io::{BufRead, BufReader, Write};
45use std::os::unix::net::{UnixListener, UnixStream};
46use std::path::{Path, PathBuf};
47use std::sync::{Arc, Mutex};
48use std::time::{SystemTime, UNIX_EPOCH};
49
50/// Maximum number of change batches retained for history queries.
51const HISTORY_CAPACITY: usize = 1024;
52
53/// File change operation kind.
54#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
55#[serde(rename_all = "lowercase")]
56pub enum FileOp {
57    /// File was created.
58    Create,
59    /// File was modified.
60    Modify,
61    /// File was deleted.
62    Delete,
63    /// File was renamed.
64    Rename,
65}
66
67impl FileOp {
68    /// Convert from the notify crate's event kind to our serializable enum.
69    #[must_use]
70    pub fn from_notify_kind(kind: notify::EventKind) -> Self {
71        match kind {
72            notify::EventKind::Create(_) => Self::Create,
73            notify::EventKind::Remove(_) => Self::Delete,
74            _ => Self::Modify,
75        }
76    }
77}
78
79/// A single file change record broadcast to connected clients.
80#[derive(Debug, Clone, Serialize, Deserialize)]
81pub struct FileChange {
82    /// Path of the changed file (relative to watched root when possible).
83    #[serde(rename = "p")]
84    pub path: PathBuf,
85    /// Modification timestamp (Unix seconds).
86    #[serde(rename = "m")]
87    pub mtime: u64,
88    /// Operation performed on the file.
89    #[serde(rename = "o")]
90    pub op: FileOp,
91}
92
93/// Messages sent from the server to connected clients.
94#[derive(Debug, Clone, Serialize, Deserialize)]
95#[serde(tag = "t", rename_all = "snake_case")]
96pub enum ServerMessage {
97    /// Periodic or on-change daemon status update.
98    Status {
99        /// PID of the daemon process.
100        pid: u32,
101        /// Human-readable status string (e.g. "idle", "indexing").
102        status: String,
103        /// Number of files currently in the index.
104        files: usize,
105    },
106    /// Batch of file changes detected by the watcher.
107    FilesChanged {
108        /// The changed files in this batch.
109        batch: Vec<FileChange>,
110        /// Timestamp of this event batch (Unix seconds).
111        #[serde(rename = "ts")]
112        timestamp: u64,
113    },
114    /// Response to a client query.
115    QueryResult {
116        /// Query ID (matches the `id` field from the request).
117        id: u64,
118        /// Current daemon status at query time.
119        status: String,
120        /// Number of files in the index.
121        files: usize,
122        /// Changes since the requested timestamp (for history queries).
123        #[serde(default, skip_serializing_if = "Vec::is_empty")]
124        changes_since: Vec<FileChange>,
125    },
126}
127
128/// Messages sent from connected clients to the daemon server.
129#[derive(Debug, Clone, Serialize, Deserialize)]
130#[serde(tag = "t", rename_all = "snake_case")]
131pub enum ClientMessage {
132    /// Request current daemon status.
133    StatusQuery,
134    /// Request all changes since the given timestamp.
135    HistoryQuery {
136        /// Return changes with timestamps strictly after this value.
137        since: u64,
138        /// Client-assigned query ID (echoed back in the response).
139        id: u64,
140    },
141}
142
143/// Errors specific to the daemon socket subsystem.
144#[derive(Debug, thiserror::Error)]
145pub enum DaemonSockError {
146    /// I/O error on the socket.
147    #[error("daemon socket I/O: {0}")]
148    Io(#[from] std::io::Error),
149    /// JSON serialization or deserialization error.
150    #[error("daemon socket JSON: {0}")]
151    Json(#[from] serde_json::Error),
152    /// Could not resolve a suitable socket path.
153    #[error("daemon socket path resolution failed")]
154    PathResolution,
155}
156
157type Result<T> = std::result::Result<T, DaemonSockError>;
158
159/// Resolves the socket path for a given watched root directory.
160///
161/// Tries in order:
162/// 1. `$XDG_RUNTIME_DIR/ixd/{hash}.sock`
163/// 2. `$HOME/.local/run/ixd/{hash}.sock`
164/// 3. `/tmp/ixd-{uid}-{hash}.sock`
165///
166/// Where `hash` is the first 16 hex characters of `XXH64(canonical_root, 0)`.
167#[must_use]
168pub fn socket_path(root: &Path) -> PathBuf {
169    let canonical = root.canonicalize().unwrap_or_else(|_| root.to_path_buf());
170    let hash = format!(
171        "{:016x}",
172        xxhash_rust::xxh64::xxh64(canonical.to_string_lossy().as_bytes(), 0,)
173    );
174
175    if let Ok(xdg) = std::env::var("XDG_RUNTIME_DIR") {
176        let dir = PathBuf::from(xdg).join("ixd");
177        return dir.join(format!("{hash}.sock"));
178    }
179
180    if let Ok(home) = std::env::var("HOME") {
181        let dir = PathBuf::from(home).join(".local/run/ixd");
182        return dir.join(format!("{hash}.sock"));
183    }
184
185    let uid = unsafe { libc::getuid() };
186    PathBuf::from(format!("/tmp/ixd-{uid}-{hash}.sock"))
187}
188
189/// Ensure the parent directory of a socket path exists.
190fn ensure_socket_dir(path: &Path) -> std::io::Result<()> {
191    if let Some(parent) = path.parent() {
192        std::fs::create_dir_all(parent)?;
193    }
194    Ok(())
195}
196
197/// Circular buffer of recent file-change batches for history queries.
198struct History {
199    entries: VecDeque<(u64, Vec<FileChange>)>,
200}
201
202impl History {
203    fn new() -> Self {
204        Self {
205            entries: VecDeque::with_capacity(HISTORY_CAPACITY),
206        }
207    }
208
209    fn push(&mut self, timestamp: u64, changes: Vec<FileChange>) {
210        if self.entries.len() >= HISTORY_CAPACITY {
211            self.entries.pop_front();
212        }
213        self.entries.push_back((timestamp, changes));
214    }
215
216    fn since(&self, cutoff: u64) -> Vec<FileChange> {
217        self.entries
218            .iter()
219            .filter(|(ts, _)| *ts > cutoff)
220            .flat_map(|(_, changes)| changes.iter().cloned())
221            .collect()
222    }
223}
224
225/// State shared between the accept loop and broadcast callers.
226struct Shared {
227    clients: Vec<ClientConn>,
228    history: History,
229    pid: u32,
230    status: String,
231    files_count: usize,
232}
233
234struct ClientConn {
235    stream: UnixStream,
236}
237
238impl ClientConn {
239    fn send(&mut self, msg: &ServerMessage) -> bool {
240        let Ok(mut line) = serde_json::to_string(msg) else {
241            return false;
242        };
243        line.push('\n');
244        self.stream.write_all(line.as_bytes()).is_ok() && self.stream.flush().is_ok()
245    }
246}
247
248/// Daemon-side socket server.
249///
250/// Binds a Unix domain socket, accepts client connections, and broadcasts
251/// file-change events and status updates to all connected clients.
252pub struct DaemonServer {
253    shared: Arc<Mutex<Shared>>,
254    listener: UnixListener,
255    socket_path: PathBuf,
256    accept_handle: Option<std::thread::JoinHandle<()>>,
257    running: Arc<std::sync::atomic::AtomicBool>,
258}
259
260impl DaemonServer {
261    /// Create and bind a new daemon socket server for the given watched root.
262    ///
263    /// The socket path is derived from the canonical root (see [`socket_path`]).
264    /// Any existing socket file at the path is removed before binding.
265    ///
266    /// # Errors
267    ///
268    /// Returns an error if the parent directory cannot be created or the
269    /// socket cannot be bound.
270    pub fn new(root: &Path) -> Result<Self> {
271        let sp = socket_path(root);
272        ensure_socket_dir(&sp)?;
273
274        // Remove stale socket from a previous run
275        let _ = std::fs::remove_file(&sp);
276
277        let listener = UnixListener::bind(&sp)?;
278
279        let pid = std::process::id();
280        let shared = Arc::new(Mutex::new(Shared {
281            clients: Vec::new(),
282            history: History::new(),
283            pid,
284            status: "idle".to_string(),
285            files_count: 0,
286        }));
287        let running = Arc::new(std::sync::atomic::AtomicBool::new(true));
288
289        Ok(Self {
290            shared,
291            listener,
292            socket_path: sp,
293            accept_handle: None,
294            running,
295        })
296    }
297
298    /// Return the filesystem path of the bound socket.
299    #[must_use]
300    pub fn path(&self) -> &Path {
301        &self.socket_path
302    }
303
304    /// Start the accept-and-read loop in a background thread.
305    ///
306    /// After calling `start()`, the server will accept new connections and
307    /// respond to client queries automatically. Call [`DaemonServer::broadcast`]
308    /// from the main loop to push events to all connected clients.
309    ///
310    /// # Errors
311    ///
312    /// Returns an error if the listener cannot be cloned, the accept thread
313    /// cannot be spawned, or file descriptor operations fail.
314    pub fn start(&mut self) -> Result<()> {
315        let listener = self.listener.try_clone().map_err(DaemonSockError::Io)?;
316        let shared = Arc::clone(&self.shared);
317        let running = Arc::clone(&self.running);
318
319        let handle = std::thread::Builder::new()
320            .name("ixd-sock-accept".to_string())
321            .spawn(move || {
322                if let Err(e) = listener.set_nonblocking(true) {
323                    tracing::error!("ixd: cannot set nonblocking: {e}");
324                    return;
325                }
326
327                while running.load(std::sync::atomic::Ordering::SeqCst) {
328                    match listener.accept() {
329                        Ok((stream, _)) => {
330                            if let Err(e) = stream.set_nonblocking(false) {
331                                tracing::warn!("ixd: cannot set blocking on client: {e}");
332                                continue;
333                            }
334                            let _ = stream.set_write_timeout(Some(std::time::Duration::from_secs(5)));
335                            let read_stream = match stream.try_clone() {
336                                Ok(s) => s,
337                                Err(e) => {
338                                    tracing::warn!("ixd: cannot clone stream: {e}");
339                                    continue;
340                                }
341                            };
342                            let shared_clone = Arc::clone(&shared);
343                            let running_clone = Arc::clone(&running);
344                            if let Err(e) = std::thread::Builder::new()
345                                .name("ixd-sock-client".to_string())
346                                .spawn(move || {
347                                    client_read_loop(&read_stream, &shared_clone, &running_clone);
348                                })
349                            {
350                                tracing::warn!("ixd: failed to spawn client thread: {e}");
351                                continue;
352                            }
353                            let conn = ClientConn { stream };
354                            if let Ok(mut s) = shared.lock() {
355                                s.clients.push(conn);
356                            }
357                        }
358                        Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
359                            std::thread::sleep(std::time::Duration::from_millis(100));
360                        }
361                        Err(e) => {
362                            tracing::warn!("ixd: accept error: {e}");
363                            std::thread::sleep(std::time::Duration::from_millis(200));
364                        }
365                    }
366                }
367            })
368            .map_err(DaemonSockError::Io)?;
369
370        self.accept_handle = Some(handle);
371        Ok(())
372    }
373
374    /// Broadcast a server message to all connected clients.
375    ///
376    /// Disconnected clients are automatically removed. The message is
377    /// serialized once and written to each client with a short write
378    /// timeout to prevent a slow consumer from blocking the daemon.
379    pub fn broadcast(&self, msg: &ServerMessage) {
380        let Ok(mut s) = self.shared.lock() else {
381            return;
382        };
383        s.clients.retain_mut(|c| c.send(msg));
384    }
385
386    /// Update the daemon status and file count (reflected in subsequent
387    /// broadcasts and query responses).
388    pub fn set_status(&self, status: &str, files_count: usize) {
389        if let Ok(mut s) = self.shared.lock() {
390            s.status = status.to_string();
391            s.files_count = files_count;
392        }
393    }
394
395    /// Record a file-change batch in the history buffer and broadcast it.
396    pub fn notify_changes(&self, changes: Vec<FileChange>, files_count: usize) {
397        let timestamp = now_secs();
398        if let Ok(mut s) = self.shared.lock() {
399            s.history.push(timestamp, changes.clone());
400            s.files_count = files_count;
401        }
402        let msg = ServerMessage::FilesChanged {
403            batch: changes,
404            timestamp,
405        };
406        self.broadcast(&msg);
407    }
408}
409
410fn client_read_loop(
411    stream: &UnixStream,
412    shared: &Arc<Mutex<Shared>>,
413    running: &Arc<std::sync::atomic::AtomicBool>,
414) {
415    let reader = BufReader::new(stream);
416    for line in reader.lines() {
417        if !running.load(std::sync::atomic::Ordering::SeqCst) {
418            break;
419        }
420        let Ok(line) = line else { break };
421
422        let msg: ClientMessage = match serde_json::from_str(&line) {
423            Ok(m) => m,
424            Err(e) => {
425                tracing::debug!("ixd: malformed client message: {e}");
426                continue;
427            }
428        };
429
430        let response = match msg {
431            ClientMessage::StatusQuery => {
432                let Ok(s) = shared.lock() else {
433                    tracing::warn!("ixd: shared lock poisoned in status query");
434                    continue;
435                };
436                ServerMessage::QueryResult {
437                    id: u64::from(s.pid),
438                    status: s.status.clone(),
439                    files: s.files_count,
440                    changes_since: Vec::new(),
441                }
442            }
443            ClientMessage::HistoryQuery { since, id } => {
444                let Ok(s) = shared.lock() else {
445                    tracing::warn!("ixd: shared lock poisoned in history query");
446                    continue;
447                };
448                let changes = s.history.since(since);
449                ServerMessage::QueryResult {
450                    id,
451                    status: s.status.clone(),
452                    files: s.files_count,
453                    changes_since: changes,
454                }
455            }
456        };
457
458        if let Ok(mut write_stream) = stream.try_clone() {
459            let mut line = serde_json::to_string(&response).unwrap_or_default();
460            line.push('\n');
461            let _ = write_stream.write_all(line.as_bytes());
462            let _ = write_stream.flush();
463        }
464    }
465}
466
467/// Client-side connection to an ixd daemon socket.
468pub struct DaemonClient {
469    stream: BufReader<UnixStream>,
470}
471
472impl DaemonClient {
473    /// Connect to the daemon socket for the given watched root.
474    ///
475    /// # Errors
476    ///
477    /// Returns an error if the socket does not exist or the connection fails.
478    pub fn connect(root: &Path) -> Result<Self> {
479        let sp = socket_path(root);
480        let stream = UnixStream::connect(&sp)?;
481        Ok(Self {
482            stream: BufReader::new(stream),
483        })
484    }
485
486    /// Receive the next message from the daemon (blocking).
487    ///
488    /// # Errors
489    ///
490    /// Returns an error on I/O failure or malformed JSON.
491    pub fn recv(&mut self) -> Result<ServerMessage> {
492        let mut line = String::new();
493        let bytes = self.stream.read_line(&mut line)?;
494        if bytes == 0 {
495            return Err(DaemonSockError::Io(std::io::Error::new(
496                std::io::ErrorKind::UnexpectedEof,
497                "daemon closed connection",
498            )));
499        }
500        let msg: ServerMessage = serde_json::from_str(line.trim_end())?;
501        Ok(msg)
502    }
503
504    /// Send a query message to the daemon.
505    ///
506    /// # Errors
507    ///
508    /// Returns an error on I/O failure or serialization error.
509    pub fn send(&mut self, msg: &ClientMessage) -> Result<()> {
510        let stream = self.stream.get_mut();
511        let mut line = serde_json::to_string(msg)?;
512        line.push('\n');
513        stream.write_all(line.as_bytes())?;
514        stream.flush()?;
515        Ok(())
516    }
517}
518
519/// Current Unix timestamp in seconds.
520fn now_secs() -> u64 {
521    SystemTime::now()
522        .duration_since(UNIX_EPOCH)
523        .unwrap_or_default()
524        .as_secs()
525}
526
527impl Drop for DaemonServer {
528    fn drop(&mut self) {
529        self.running
530            .store(false, std::sync::atomic::Ordering::SeqCst);
531        if let Some(handle) = self.accept_handle.take() {
532            let _ = handle.join();
533        }
534        let _ = std::fs::remove_file(&self.socket_path);
535    }
536}
537
538#[cfg(test)]
539mod tests {
540    use super::*;
541    use std::path::PathBuf;
542
543    #[test]
544    fn socket_path_deterministic() {
545        let root = PathBuf::from("/tmp/test-project");
546        let p1 = socket_path(&root);
547        let p2 = socket_path(&root);
548        assert_eq!(p1, p2, "same root must produce same socket path");
549    }
550
551    #[test]
552    fn socket_path_different_roots() {
553        let r1 = PathBuf::from("/tmp/project-a");
554        let r2 = PathBuf::from("/tmp/project-b");
555        assert_ne!(socket_path(&r1), socket_path(&r2));
556    }
557
558    #[test]
559    fn socket_path_uses_xdg() {
560        unsafe { std::env::set_var("XDG_RUNTIME_DIR", "/tmp/xdg-test-runtime") };
561        let p = socket_path(Path::new("/tmp/some-project"));
562        assert!(p.starts_with("/tmp/xdg-test-runtime/ixd/"));
563        assert!(p.extension().is_some_and(|e| e == "sock"));
564        unsafe { std::env::remove_var("XDG_RUNTIME_DIR") };
565    }
566
567    #[test]
568    fn server_message_ndjson_roundtrip() {
569        let msg = ServerMessage::Status {
570            pid: 1234,
571            status: "idle".to_string(),
572            files: 42,
573        };
574        let json = serde_json::to_string(&msg).expect("serialize");
575        assert!(json.contains("\"t\":\"status\""), "tag field present");
576
577        let back: ServerMessage = serde_json::from_str(&json).expect("deserialize");
578        if let ServerMessage::Status { pid, status, files } = back {
579            assert_eq!(pid, 1234);
580            assert_eq!(status, "idle");
581            assert_eq!(files, 42);
582        } else {
583            panic!("wrong variant after roundtrip");
584        }
585    }
586
587    #[test]
588    fn files_changed_roundtrip() {
589        let msg = ServerMessage::FilesChanged {
590            batch: vec![FileChange {
591                path: PathBuf::from("src/main.rs"),
592                mtime: 1_776_468_629,
593                op: FileOp::Modify,
594            }],
595            timestamp: 1_776_468_629,
596        };
597        let json = serde_json::to_string(&msg).expect("serialize");
598        let back: ServerMessage = serde_json::from_str(&json).expect("deserialize");
599        if let ServerMessage::FilesChanged { batch, timestamp } = back {
600            assert_eq!(batch.len(), 1);
601            assert_eq!(batch[0].path, PathBuf::from("src/main.rs"));
602            assert_eq!(timestamp, 1_776_468_629);
603        } else {
604            panic!("wrong variant");
605        }
606    }
607
608    #[test]
609    fn client_message_roundtrip() {
610        let msg = ClientMessage::HistoryQuery { since: 1000, id: 7 };
611        let json = serde_json::to_string(&msg).expect("serialize");
612        let back: ClientMessage = serde_json::from_str(&json).expect("deserialize");
613        if let ClientMessage::HistoryQuery { since, id } = back {
614            assert_eq!(since, 1000);
615            assert_eq!(id, 7);
616        } else {
617            panic!("wrong variant");
618        }
619    }
620
621    #[test]
622    fn history_since() {
623        let mut h = History::new();
624        h.push(
625            100,
626            vec![FileChange {
627                path: PathBuf::from("a.rs"),
628                mtime: 100,
629                op: FileOp::Create,
630            }],
631        );
632        h.push(
633            200,
634            vec![FileChange {
635                path: PathBuf::from("b.rs"),
636                mtime: 200,
637                op: FileOp::Modify,
638            }],
639        );
640        h.push(
641            300,
642            vec![FileChange {
643                path: PathBuf::from("c.rs"),
644                mtime: 300,
645                op: FileOp::Delete,
646            }],
647        );
648
649        let changes = h.since(150);
650        assert_eq!(changes.len(), 2);
651        assert_eq!(changes[0].path, PathBuf::from("b.rs"));
652        assert_eq!(changes[1].path, PathBuf::from("c.rs"));
653    }
654
655    #[test]
656    fn history_capacity() {
657        let mut h = History::new();
658        for i in 0..=HISTORY_CAPACITY {
659            h.push(
660                i as u64,
661                vec![FileChange {
662                    path: PathBuf::from(format!("f{i}")),
663                    mtime: i as u64,
664                    op: FileOp::Modify,
665                }],
666            );
667        }
668        assert_eq!(h.entries.len(), HISTORY_CAPACITY);
669        // Oldest entry (ts=0) should have been evicted
670        assert_eq!(h.entries.front().expect("non-empty").0, 1);
671    }
672
673    #[test]
674    fn server_client_connect_and_broadcast() {
675        let tmp = tempfile::tempdir().expect("tempdir");
676        let root = tmp.path().to_path_buf();
677
678        let mut server = DaemonServer::new(&root).expect("create server");
679        let sp = server.path().to_path_buf();
680        server.start();
681
682        // Connect a client
683        let stream = UnixStream::connect(&sp).expect("connect");
684        let mut client = DaemonClient {
685            stream: BufReader::new(stream),
686        };
687
688        // Give the accept thread time to register the client
689        std::thread::sleep(std::time::Duration::from_millis(200));
690
691        server.set_status("idle", 10);
692
693        // Broadcast a status message
694        server.broadcast(&ServerMessage::Status {
695            pid: 1234,
696            status: "idle".to_string(),
697            files: 10,
698        });
699
700        // Client should receive the message
701        // Use a timeout to avoid hanging forever
702        client
703            .stream
704            .get_mut()
705            .set_read_timeout(Some(std::time::Duration::from_secs(2)))
706            .expect("set timeout");
707
708        match client.recv() {
709            Ok(ServerMessage::Status { pid, status, files }) => {
710                assert_eq!(pid, 1234);
711                assert_eq!(status, "idle");
712                assert_eq!(files, 10);
713            }
714            Ok(other) => panic!("expected Status, got {other:?}"),
715            Err(e) => panic!("recv failed: {e}"),
716        }
717    }
718
719    #[test]
720    fn client_query_status() {
721        let tmp = tempfile::tempdir().expect("tempdir");
722        let root = tmp.path().to_path_buf();
723
724        let mut server = DaemonServer::new(&root).expect("create server");
725        let sp = server.path().to_path_buf();
726        server.start();
727        server.set_status("indexing", 99);
728
729        let stream = UnixStream::connect(&sp).expect("connect");
730        let mut client = DaemonClient {
731            stream: BufReader::new(stream),
732        };
733
734        std::thread::sleep(std::time::Duration::from_millis(200));
735
736        client
737            .send(&ClientMessage::StatusQuery)
738            .expect("send query");
739
740        client
741            .stream
742            .get_mut()
743            .set_read_timeout(Some(std::time::Duration::from_secs(2)))
744            .expect("set timeout");
745
746        match client.recv() {
747            Ok(ServerMessage::QueryResult {
748                id: _,
749                status,
750                files,
751                changes_since,
752            }) => {
753                assert_eq!(status, "indexing");
754                assert_eq!(files, 99);
755                assert!(changes_since.is_empty());
756            }
757            Ok(other) => panic!("expected QueryResult, got {other:?}"),
758            Err(e) => panic!("recv failed: {e}"),
759        }
760    }
761}