Skip to main content

rgx/
server.rs

1//! The per-project daemon: holds the index resident in RAM, keeps it fresh, and answers queries over
2//! a local IPC endpoint (an AF_UNIX socket on Unix, a loopback-TCP port on Windows — see
3//! [`crate::transport`]). Owning that endpoint *is* the single-instance lock — a second daemon that
4//! loses the race exits. The daemon serves immediately: a warm start loads the snapshot and answers
5//! at once; a cold start answers via a full ripgrep scan (the correct fallback) until the first
6//! build finishes. See `docs/indexing.md` and `docs/index-and-storage.md`.
7
8use std::path::{Path, PathBuf};
9use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
10use std::sync::{Arc, Condvar, Mutex, RwLock};
11use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
12
13use anyhow::Result;
14use notify_debouncer_full::new_debouncer;
15use notify_debouncer_full::notify::RecursiveMode;
16
17use crate::config::Config;
18use crate::confirm::SearchOptions;
19use crate::index::{self, Index};
20use crate::pagination::{self, PaginationStore};
21use crate::paths;
22use crate::proto::{self, Request};
23use crate::transport::{self, Stream};
24
25/// How often a `watch` subscriber repaints when nothing changed (keeps the build-progress count and
26/// the snapshot age fresh, and detects client disconnect).
27const WATCH_HEARTBEAT: Duration = Duration::from_millis(250);
28
29/// Upper bound on how often the idle reaper wakes to check; short timeouts check at their own pace.
30const IDLE_CHECK_MAX: Duration = Duration::from_secs(15);
31
32struct Shared {
33    index: RwLock<Index>,
34    ready: AtomicBool,
35    root: PathBuf,
36    snapshot: PathBuf,
37    /// Cold-build progress (files processed / total to process); only meaningful while building.
38    indexed: AtomicUsize,
39    total: AtomicUsize,
40    /// Posting-list footprint, cached so `status`/`watch` need not re-walk all postings each render.
41    index_bytes: AtomicU64,
42    /// A change counter + condvar so `watch` wakes immediately on any transition.
43    seq: Mutex<u64>,
44    seq_cv: Condvar,
45    /// A cold build at least this long earns an on-disk snapshot; below it the index stays RAM-only.
46    persist_threshold: Duration,
47    /// Whether the resident index is backed by a snapshot. Set once the cold build's duration is
48    /// known (warm starts keep the default `true`, since a snapshot already exists).
49    persist: AtomicBool,
50    /// Exit after this long with no client request, or `None` to stay resident.
51    idle_timeout: Option<Duration>,
52    /// Last time a request finished (or arrived); drives the idle reaper.
53    last_active: Mutex<Instant>,
54    /// Requests currently being served; any in-flight request (search, find, status, or a long-lived
55    /// watch) keeps the idle reaper from exiting. Held via [`ActiveRequest`] so a panicking handler
56    /// can't leak the count.
57    in_flight: AtomicUsize,
58    /// Short-lived store mapping pagination tokens to cursor blobs, so the printed `--cursor` is a
59    /// tiny id instead of a base64 blob.
60    pagination: Mutex<PaginationStore>,
61}
62
63/// Marks a request in flight for its whole lifetime. Drop decrements and stamps `last_active`, so the
64/// reaper never exits mid-request and the idle clock resets when the request finishes — panic-safe,
65/// since Drop runs even when a handler unwinds.
66struct ActiveRequest<'a>(&'a Shared);
67
68impl<'a> ActiveRequest<'a> {
69    fn new(shared: &'a Shared) -> Self {
70        shared.in_flight.fetch_add(1, Ordering::SeqCst);
71        shared.touch();
72        ActiveRequest(shared)
73    }
74}
75
76impl Drop for ActiveRequest<'_> {
77    fn drop(&mut self) {
78        // Stamp before decrementing so the reaper, on seeing the count hit zero, reads a fresh time.
79        self.0.touch();
80        self.0.in_flight.fetch_sub(1, Ordering::SeqCst);
81    }
82}
83
84impl Shared {
85    /// Read/write the index, recovering a poisoned lock rather than cascading panics across every
86    /// handler: the index is a rebuildable over-approximation, so continuing (and letting the next
87    /// reconcile repair it) is safer than wedging the daemon if one operation ever panics.
88    fn read_index(&self) -> std::sync::RwLockReadGuard<'_, Index> {
89        self.index.read().unwrap_or_else(|e| e.into_inner())
90    }
91    fn write_index(&self) -> std::sync::RwLockWriteGuard<'_, Index> {
92        self.index.write().unwrap_or_else(|e| e.into_inner())
93    }
94
95    /// Signal a state change (build done, reconcile applied) to wake watchers; refresh the cached
96    /// posting footprint from the current index.
97    fn bump(&self) {
98        self.index_bytes
99            .store(self.read_index().memory_bytes(), Ordering::Relaxed);
100        *self.seq.lock().unwrap_or_else(|e| e.into_inner()) += 1;
101        self.seq_cv.notify_all();
102    }
103
104    /// The pagination store, recovering a poisoned lock (the store is best-effort: a lost token just
105    /// makes the client re-run its search).
106    fn pagination(&self) -> std::sync::MutexGuard<'_, PaginationStore> {
107        self.pagination.lock().unwrap_or_else(|e| e.into_inner())
108    }
109
110    /// Reset the idle clock to now.
111    fn touch(&self) {
112        *self.last_active.lock().unwrap_or_else(|e| e.into_inner()) = Instant::now();
113    }
114
115    /// Mark the index ready to serve and start the idle clock from now, so a daemon that just
116    /// finished a long cold build gets a full idle window before the reaper can exit it.
117    fn mark_ready(&self) {
118        self.touch();
119        self.ready.store(true, Ordering::SeqCst);
120    }
121
122    /// Persist the index to its snapshot unless this index is RAM-only (`persist` is false).
123    fn maybe_save(&self, idx: &Index) {
124        if self.persist.load(Ordering::SeqCst) {
125            let _ = idx.save(&self.snapshot);
126        }
127    }
128
129    /// Block until the change counter moves past `last` or the heartbeat elapses; return the latest.
130    fn wait_change(&self, last: u64) -> u64 {
131        let g = self.seq.lock().unwrap_or_else(|e| e.into_inner());
132        let (g, _) = self
133            .seq_cv
134            .wait_timeout_while(g, WATCH_HEARTBEAT, |s| *s == last)
135            .unwrap_or_else(|e| e.into_inner());
136        *g
137    }
138}
139
140/// A per-process seed stamped onto pagination tokens so a restarted daemon's old tokens miss cleanly.
141/// Only the low 32 bits reach the token, and the raw low bits of a nanosecond clock repeat every
142/// ~4.3s, so fold the high half in — then two restarts collide only on the full 64-bit value.
143fn session_seed() -> u64 {
144    let nanos = SystemTime::now()
145        .duration_since(UNIX_EPOCH)
146        .map(|d| d.as_nanos() as u64)
147        .unwrap_or(0);
148    nanos ^ (nanos >> 32)
149}
150
151/// Run the daemon for `root` in the foreground. Returns once the socket can't be owned (another
152/// daemon is already running) or on a fatal error.
153pub fn run(root: PathBuf) -> Result<()> {
154    let dir = paths::state_dir(&root);
155    std::fs::create_dir_all(&dir)?;
156    let listener = match transport::bind(&root)? {
157        Some(l) => l,
158        None => return Ok(()), // another daemon owns this root
159    };
160
161    let cfg = Config::get();
162    let shared = Arc::new(Shared {
163        index: RwLock::new(Index::default()),
164        ready: AtomicBool::new(false),
165        root: root.clone(),
166        snapshot: paths::snapshot_path(&root),
167        indexed: AtomicUsize::new(0),
168        total: AtomicUsize::new(0),
169        index_bytes: AtomicU64::new(0),
170        seq: Mutex::new(0),
171        seq_cv: Condvar::new(),
172        persist_threshold: cfg.persist_threshold(),
173        persist: AtomicBool::new(true),
174        idle_timeout: cfg.idle_timeout(),
175        last_active: Mutex::new(Instant::now()),
176        in_flight: AtomicUsize::new(0),
177        pagination: Mutex::new(PaginationStore::new(
178            session_seed(),
179            pagination::DEFAULT_TTL,
180        )),
181    });
182
183    spawn_indexer(shared.clone());
184    spawn_watcher(shared.clone());
185    spawn_idle_reaper(shared.clone());
186
187    loop {
188        let conn = match transport::accept(&listener) {
189            Ok(conn) => conn,
190            Err(_) => continue,
191        };
192        let shared = shared.clone();
193        std::thread::spawn(move || {
194            // Count the whole connection as in flight (covering the blocking read), so the reaper
195            // can't exit out from under a request — including one accepted but not yet parsed.
196            let _active = ActiveRequest::new(&shared);
197            let _ = handle(conn, &shared);
198        });
199    }
200}
201
202/// Warm-start from the snapshot if present (serve immediately), then build/reconcile in the
203/// background so the resident index reflects the real tree.
204fn spawn_indexer(shared: Arc<Shared>) {
205    std::thread::spawn(move || {
206        if let Ok(idx) = Index::load(&shared.snapshot) {
207            *shared.write_index() = idx;
208            shared.mark_ready();
209            shared.bump();
210            // catch changes made while the daemon was down
211            let mut idx = shared.write_index();
212            idx.reconcile(&shared.root);
213            shared.maybe_save(&idx);
214            drop(idx);
215            shared.bump();
216        } else {
217            // Cold build: publish total, then index reporting per-file progress for watchers.
218            let started = Instant::now();
219            let paths = index::walk_files(&shared.root);
220            shared.total.store(paths.len(), Ordering::Relaxed);
221            shared.bump();
222            let built = Index::from_paths(&paths, &shared.indexed);
223            // A build cheap enough to redo on the next start stays RAM-only: skip the snapshot (and
224            // its per-reconcile rewrites), trading a sub-threshold cold rebuild for the disk.
225            shared.persist.store(
226                started.elapsed() >= shared.persist_threshold,
227                Ordering::SeqCst,
228            );
229            shared.maybe_save(&built);
230            *shared.write_index() = built;
231            shared.mark_ready();
232            shared.bump();
233        }
234    });
235}
236
237/// Watch the tree; on a debounced change burst, reconcile the resident index and persist. The
238/// reconcile re-walks ignore-aware, so freshly-created ignored files never leak into results.
239fn spawn_watcher(shared: Arc<Shared>) {
240    std::thread::spawn(move || {
241        let (tx, rx) = std::sync::mpsc::channel();
242        let mut debouncer = match new_debouncer(Duration::from_millis(300), None, move |res| {
243            let _ = tx.send(res);
244        }) {
245            Ok(d) => d,
246            Err(_) => return,
247        };
248        if debouncer
249            .watch(&shared.root, RecursiveMode::Recursive)
250            .is_err()
251        {
252            return;
253        }
254        for res in rx {
255            if res.is_err() || !shared.ready.load(Ordering::SeqCst) {
256                continue;
257            }
258            let mut idx = shared.write_index();
259            if idx.reconcile(&shared.root) > 0 {
260                shared.maybe_save(&idx);
261                drop(idx);
262                shared.bump();
263            }
264        }
265    });
266}
267
268/// Exit the daemon once it has gone `idle_timeout` without a request (freeing its RAM; the next
269/// search respawns it). Never exits before the first build is ready or while a request is in flight.
270/// No-op when disabled.
271fn spawn_idle_reaper(shared: Arc<Shared>) {
272    let Some(timeout) = shared.idle_timeout else {
273        return;
274    };
275    let tick = timeout.min(IDLE_CHECK_MAX);
276    std::thread::spawn(move || {
277        loop {
278            std::thread::sleep(tick);
279            // Don't reap a daemon that is still doing its first build, or one mid-request.
280            if !shared.ready.load(Ordering::SeqCst) || shared.in_flight.load(Ordering::SeqCst) > 0 {
281                continue;
282            }
283            let idle = shared
284                .last_active
285                .lock()
286                .unwrap_or_else(|e| e.into_inner())
287                .elapsed();
288            if idle >= timeout {
289                transport::cleanup(&shared.root);
290                std::process::exit(0);
291            }
292        }
293    });
294}
295
296fn handle(mut conn: Stream, shared: &Shared) -> Result<()> {
297    let req = proto::read_request(&mut conn)?;
298    match req {
299        // Errors here are essentially "client went away mid-stream"; ignore so we still attempt the
300        // stream terminator below — a request that produced N frames then errored should not look
301        // different to the client than a clean finish.
302        Request::Search { opts, pattern } => {
303            let _ = content_search(shared, &pattern, opts, &mut conn);
304        }
305        Request::Find {
306            needle,
307            after,
308            limit,
309        } => {
310            let out = find(shared, &needle, after.as_deref(), limit as usize);
311            let _ = proto::write_data(&mut conn, &out);
312        }
313        Request::Status => {
314            let _ = proto::write_data(&mut conn, &status(shared));
315        }
316        Request::Watch => return watch(shared, &mut conn),
317        Request::Shutdown => {
318            let _ = proto::write_data(&mut conn, b"ok\n");
319            let _ = proto::end_stream(&mut conn);
320            transport::cleanup(&shared.root);
321            std::process::exit(0);
322        }
323        Request::CursorStore { blob } => {
324            let token = shared.pagination().store(blob, Instant::now());
325            let _ = proto::write_data(&mut conn, token.as_bytes());
326        }
327        Request::CursorTake { token } => {
328            // An empty reply means "expired or already used"; the client re-runs the search.
329            let blob = shared.pagination().take(&token, Instant::now());
330            let _ = proto::write_data(&mut conn, &blob.unwrap_or_default());
331        }
332    }
333    let _ = proto::end_stream(&mut conn);
334    Ok(())
335}
336
337/// Stream a fresh status frame on every change (and on a heartbeat), until the client disconnects.
338/// The blocking wait holds no index lock, and rendering only touches the (cheap-while-building)
339/// resident index, so an attached watcher does not slow indexing.
340fn watch(shared: &Shared, conn: &mut Stream) -> Result<()> {
341    // The connection's ActiveRequest guard (held in the accept loop) keeps the daemon alive for the
342    // whole subscription and resets the idle clock when it ends, so nothing extra is needed here.
343    let mut last = 0;
344    loop {
345        if proto::write_data(conn, &status(shared)).is_err() {
346            return Ok(()); // client went away
347        }
348        last = shared.wait_change(last);
349    }
350}
351
352/// Stream content-search results straight to the socket so huge result sets aren't buffered.
353fn content_search(
354    shared: &Shared,
355    pattern: &str,
356    opts: SearchOptions,
357    conn: &mut Stream,
358) -> Result<()> {
359    if shared.ready.load(Ordering::SeqCst) {
360        // Resolve candidates while holding the read lock, then RELEASE it before streaming: ripgrep
361        // confirm + blocking socket writes must never run under the index lock, or a slow client
362        // would block the watcher's write lock and freeze indexing.
363        let paths = crate::candidate_paths(&shared.read_index(), pattern, opts);
364        let effective = crate::effective_pattern(pattern, opts);
365        let refs: Vec<&Path> = paths.iter().map(PathBuf::as_path).collect();
366        crate::confirm::search_streaming(&effective, &refs, &shared.root, opts, |chunk| {
367            proto::write_data(&mut *conn, chunk)
368        })
369    } else {
370        // Cold start only: pipelined full scan until the first build finishes. The sink is shared
371        // across walk threads, so guard the socket with a mutex.
372        let conn = std::sync::Mutex::new(conn);
373        crate::stream_full_scan(&shared.root, pattern, opts, |chunk| {
374            if let Ok(mut c) = conn.lock() {
375                let _ = proto::write_data(&mut **c, chunk);
376            }
377        })
378    }
379}
380
381/// File-name lookup, keyset-paginated. The response leads with a `proto::format_find_header` line so
382/// the client reports the true total (not just the page) and can resume via `next_after`.
383fn find(shared: &Shared, needle: &str, after: Option<&str>, limit: usize) -> Vec<u8> {
384    let (lines, total, start): (Vec<String>, usize, usize) = if shared.ready.load(Ordering::SeqCst)
385    {
386        let idx = shared.read_index();
387        let (hits, total, start) = idx.find(needle, after, limit);
388        let lines = hits
389            .iter()
390            .map(|p| p.to_string_lossy().into_owned())
391            .collect();
392        (lines, total, start)
393    } else {
394        let mut all: Vec<String> = index::walk_files(&shared.root)
395            .iter()
396            .filter(|p| p.to_string_lossy().contains(needle))
397            .map(|p| p.to_string_lossy().into_owned())
398            .collect();
399        all.sort_unstable();
400        let total = all.len();
401        let start = after.map_or(0, |a| all.partition_point(|p| p.as_str() <= a));
402        let lines = all.into_iter().skip(start).take(limit).collect();
403        (lines, total, start)
404    };
405    // Offer a resume key only when matches genuinely remain past this page (we know the true total
406    // and the keyset offset), so following `next_after` never lands on an empty page.
407    let next_after = (start + lines.len() < total)
408        .then(|| lines.last().map(String::as_str))
409        .flatten();
410    let mut out = proto::format_find_header(total, start, lines.len(), next_after);
411    for l in &lines {
412        out.push_str(l);
413        out.push('\n');
414    }
415    out.into_bytes()
416}
417
418fn status(shared: &Shared) -> Vec<u8> {
419    let idx = shared.read_index();
420    let state = if shared.ready.load(Ordering::SeqCst) {
421        "ready".to_string()
422    } else {
423        let done = shared.indexed.load(Ordering::Relaxed) as u64;
424        let total = shared.total.load(Ordering::Relaxed) as u64;
425        if total > 0 {
426            format!(
427                "building {} / {} files",
428                crate::status::human_count(done),
429                crate::status::human_count(total)
430            )
431        } else {
432            "building (scanning tree...)".to_string()
433        }
434    };
435    // RAM-only once a cold build decided not to persist; while building, the decision isn't made yet.
436    let ram_only = shared.ready.load(Ordering::SeqCst) && !shared.persist.load(Ordering::SeqCst);
437    crate::status::Status {
438        root: &shared.root,
439        snapshot: &shared.snapshot,
440        running: true,
441        ram_only,
442        state: Some(state),
443        files: Some(idx.file_count()),
444        trigrams: Some(idx.trigram_count()),
445        memory_bytes: Some(shared.index_bytes.load(Ordering::Relaxed)),
446    }
447    .render()
448    .into_bytes()
449}