1use std::path::{Path, PathBuf};
9use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
10use std::sync::{Arc, Condvar, Mutex, RwLock};
11use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
12
13use anyhow::Result;
14use notify_debouncer_full::new_debouncer;
15use notify_debouncer_full::notify::RecursiveMode;
16
17use crate::config::Config;
18use crate::confirm::SearchOptions;
19use crate::index::{self, Index};
20use crate::pagination::{self, PaginationStore};
21use crate::paths;
22use crate::proto::{self, Request};
23use crate::transport::{self, Stream};
24
25const WATCH_HEARTBEAT: Duration = Duration::from_millis(250);
28
29const IDLE_CHECK_MAX: Duration = Duration::from_secs(15);
31
32struct Shared {
33 index: RwLock<Index>,
34 ready: AtomicBool,
35 root: PathBuf,
36 snapshot: PathBuf,
37 indexed: AtomicUsize,
39 total: AtomicUsize,
40 index_bytes: AtomicU64,
42 seq: Mutex<u64>,
44 seq_cv: Condvar,
45 persist_threshold: Duration,
47 persist: AtomicBool,
50 idle_timeout: Option<Duration>,
52 last_active: Mutex<Instant>,
54 in_flight: AtomicUsize,
58 pagination: Mutex<PaginationStore>,
61}
62
63struct ActiveRequest<'a>(&'a Shared);
67
68impl<'a> ActiveRequest<'a> {
69 fn new(shared: &'a Shared) -> Self {
70 shared.in_flight.fetch_add(1, Ordering::SeqCst);
71 shared.touch();
72 ActiveRequest(shared)
73 }
74}
75
76impl Drop for ActiveRequest<'_> {
77 fn drop(&mut self) {
78 self.0.touch();
80 self.0.in_flight.fetch_sub(1, Ordering::SeqCst);
81 }
82}
83
84impl Shared {
85 fn read_index(&self) -> std::sync::RwLockReadGuard<'_, Index> {
89 self.index.read().unwrap_or_else(|e| e.into_inner())
90 }
91 fn write_index(&self) -> std::sync::RwLockWriteGuard<'_, Index> {
92 self.index.write().unwrap_or_else(|e| e.into_inner())
93 }
94
95 fn bump(&self) {
98 self.index_bytes
99 .store(self.read_index().memory_bytes(), Ordering::Relaxed);
100 *self.seq.lock().unwrap_or_else(|e| e.into_inner()) += 1;
101 self.seq_cv.notify_all();
102 }
103
104 fn pagination(&self) -> std::sync::MutexGuard<'_, PaginationStore> {
107 self.pagination.lock().unwrap_or_else(|e| e.into_inner())
108 }
109
110 fn touch(&self) {
112 *self.last_active.lock().unwrap_or_else(|e| e.into_inner()) = Instant::now();
113 }
114
115 fn mark_ready(&self) {
118 self.touch();
119 self.ready.store(true, Ordering::SeqCst);
120 }
121
122 fn maybe_save(&self, idx: &Index) {
124 if self.persist.load(Ordering::SeqCst) {
125 let _ = idx.save(&self.snapshot);
126 }
127 }
128
129 fn wait_change(&self, last: u64) -> u64 {
131 let g = self.seq.lock().unwrap_or_else(|e| e.into_inner());
132 let (g, _) = self
133 .seq_cv
134 .wait_timeout_while(g, WATCH_HEARTBEAT, |s| *s == last)
135 .unwrap_or_else(|e| e.into_inner());
136 *g
137 }
138}
139
140fn session_seed() -> u64 {
144 let nanos = SystemTime::now()
145 .duration_since(UNIX_EPOCH)
146 .map(|d| d.as_nanos() as u64)
147 .unwrap_or(0);
148 nanos ^ (nanos >> 32)
149}
150
151pub fn run(root: PathBuf) -> Result<()> {
154 let dir = paths::state_dir(&root);
155 std::fs::create_dir_all(&dir)?;
156 let listener = match transport::bind(&root)? {
157 Some(l) => l,
158 None => return Ok(()), };
160
161 let cfg = Config::get();
162 let shared = Arc::new(Shared {
163 index: RwLock::new(Index::default()),
164 ready: AtomicBool::new(false),
165 root: root.clone(),
166 snapshot: paths::snapshot_path(&root),
167 indexed: AtomicUsize::new(0),
168 total: AtomicUsize::new(0),
169 index_bytes: AtomicU64::new(0),
170 seq: Mutex::new(0),
171 seq_cv: Condvar::new(),
172 persist_threshold: cfg.persist_threshold(),
173 persist: AtomicBool::new(true),
174 idle_timeout: cfg.idle_timeout(),
175 last_active: Mutex::new(Instant::now()),
176 in_flight: AtomicUsize::new(0),
177 pagination: Mutex::new(PaginationStore::new(
178 session_seed(),
179 pagination::DEFAULT_TTL,
180 )),
181 });
182
183 spawn_indexer(shared.clone());
184 spawn_watcher(shared.clone());
185 spawn_idle_reaper(shared.clone());
186
187 loop {
188 let conn = match transport::accept(&listener) {
189 Ok(conn) => conn,
190 Err(_) => continue,
191 };
192 let shared = shared.clone();
193 std::thread::spawn(move || {
194 let _active = ActiveRequest::new(&shared);
197 let _ = handle(conn, &shared);
198 });
199 }
200}
201
202fn spawn_indexer(shared: Arc<Shared>) {
205 std::thread::spawn(move || {
206 if let Ok(idx) = Index::load(&shared.snapshot) {
207 *shared.write_index() = idx;
208 shared.mark_ready();
209 shared.bump();
210 let mut idx = shared.write_index();
212 idx.reconcile(&shared.root);
213 shared.maybe_save(&idx);
214 drop(idx);
215 shared.bump();
216 } else {
217 let started = Instant::now();
219 let paths = index::walk_files(&shared.root);
220 shared.total.store(paths.len(), Ordering::Relaxed);
221 shared.bump();
222 let built = Index::from_paths(&paths, &shared.indexed);
223 shared.persist.store(
226 started.elapsed() >= shared.persist_threshold,
227 Ordering::SeqCst,
228 );
229 shared.maybe_save(&built);
230 *shared.write_index() = built;
231 shared.mark_ready();
232 shared.bump();
233 }
234 });
235}
236
237fn spawn_watcher(shared: Arc<Shared>) {
240 std::thread::spawn(move || {
241 let (tx, rx) = std::sync::mpsc::channel();
242 let mut debouncer = match new_debouncer(Duration::from_millis(300), None, move |res| {
243 let _ = tx.send(res);
244 }) {
245 Ok(d) => d,
246 Err(_) => return,
247 };
248 if debouncer
249 .watch(&shared.root, RecursiveMode::Recursive)
250 .is_err()
251 {
252 return;
253 }
254 for res in rx {
255 if res.is_err() || !shared.ready.load(Ordering::SeqCst) {
256 continue;
257 }
258 let mut idx = shared.write_index();
259 if idx.reconcile(&shared.root) > 0 {
260 shared.maybe_save(&idx);
261 drop(idx);
262 shared.bump();
263 }
264 }
265 });
266}
267
268fn spawn_idle_reaper(shared: Arc<Shared>) {
272 let Some(timeout) = shared.idle_timeout else {
273 return;
274 };
275 let tick = timeout.min(IDLE_CHECK_MAX);
276 std::thread::spawn(move || {
277 loop {
278 std::thread::sleep(tick);
279 if !shared.ready.load(Ordering::SeqCst) || shared.in_flight.load(Ordering::SeqCst) > 0 {
281 continue;
282 }
283 let idle = shared
284 .last_active
285 .lock()
286 .unwrap_or_else(|e| e.into_inner())
287 .elapsed();
288 if idle >= timeout {
289 transport::cleanup(&shared.root);
290 std::process::exit(0);
291 }
292 }
293 });
294}
295
296fn handle(mut conn: Stream, shared: &Shared) -> Result<()> {
297 let req = proto::read_request(&mut conn)?;
298 match req {
299 Request::Search { opts, pattern } => {
303 let _ = content_search(shared, &pattern, opts, &mut conn);
304 }
305 Request::Find {
306 needle,
307 after,
308 limit,
309 } => {
310 let out = find(shared, &needle, after.as_deref(), limit as usize);
311 let _ = proto::write_data(&mut conn, &out);
312 }
313 Request::Status => {
314 let _ = proto::write_data(&mut conn, &status(shared));
315 }
316 Request::Watch => return watch(shared, &mut conn),
317 Request::Shutdown => {
318 let _ = proto::write_data(&mut conn, b"ok\n");
319 let _ = proto::end_stream(&mut conn);
320 transport::cleanup(&shared.root);
321 std::process::exit(0);
322 }
323 Request::CursorStore { blob } => {
324 let token = shared.pagination().store(blob, Instant::now());
325 let _ = proto::write_data(&mut conn, token.as_bytes());
326 }
327 Request::CursorTake { token } => {
328 let blob = shared.pagination().take(&token, Instant::now());
330 let _ = proto::write_data(&mut conn, &blob.unwrap_or_default());
331 }
332 }
333 let _ = proto::end_stream(&mut conn);
334 Ok(())
335}
336
337fn watch(shared: &Shared, conn: &mut Stream) -> Result<()> {
341 let mut last = 0;
344 loop {
345 if proto::write_data(conn, &status(shared)).is_err() {
346 return Ok(()); }
348 last = shared.wait_change(last);
349 }
350}
351
352fn content_search(
354 shared: &Shared,
355 pattern: &str,
356 opts: SearchOptions,
357 conn: &mut Stream,
358) -> Result<()> {
359 if shared.ready.load(Ordering::SeqCst) {
360 let paths = crate::candidate_paths(&shared.read_index(), pattern, opts);
364 let effective = crate::effective_pattern(pattern, opts);
365 let refs: Vec<&Path> = paths.iter().map(PathBuf::as_path).collect();
366 crate::confirm::search_streaming(&effective, &refs, &shared.root, opts, |chunk| {
367 proto::write_data(&mut *conn, chunk)
368 })
369 } else {
370 let conn = std::sync::Mutex::new(conn);
373 crate::stream_full_scan(&shared.root, pattern, opts, |chunk| {
374 if let Ok(mut c) = conn.lock() {
375 let _ = proto::write_data(&mut **c, chunk);
376 }
377 })
378 }
379}
380
381fn find(shared: &Shared, needle: &str, after: Option<&str>, limit: usize) -> Vec<u8> {
384 let (lines, total, start): (Vec<String>, usize, usize) = if shared.ready.load(Ordering::SeqCst)
385 {
386 let idx = shared.read_index();
387 let (hits, total, start) = idx.find(needle, after, limit);
388 let lines = hits
389 .iter()
390 .map(|p| p.to_string_lossy().into_owned())
391 .collect();
392 (lines, total, start)
393 } else {
394 let mut all: Vec<String> = index::walk_files(&shared.root)
395 .iter()
396 .filter(|p| p.to_string_lossy().contains(needle))
397 .map(|p| p.to_string_lossy().into_owned())
398 .collect();
399 all.sort_unstable();
400 let total = all.len();
401 let start = after.map_or(0, |a| all.partition_point(|p| p.as_str() <= a));
402 let lines = all.into_iter().skip(start).take(limit).collect();
403 (lines, total, start)
404 };
405 let next_after = (start + lines.len() < total)
408 .then(|| lines.last().map(String::as_str))
409 .flatten();
410 let mut out = proto::format_find_header(total, start, lines.len(), next_after);
411 for l in &lines {
412 out.push_str(l);
413 out.push('\n');
414 }
415 out.into_bytes()
416}
417
418fn status(shared: &Shared) -> Vec<u8> {
419 let idx = shared.read_index();
420 let state = if shared.ready.load(Ordering::SeqCst) {
421 "ready".to_string()
422 } else {
423 let done = shared.indexed.load(Ordering::Relaxed) as u64;
424 let total = shared.total.load(Ordering::Relaxed) as u64;
425 if total > 0 {
426 format!(
427 "building {} / {} files",
428 crate::status::human_count(done),
429 crate::status::human_count(total)
430 )
431 } else {
432 "building (scanning tree...)".to_string()
433 }
434 };
435 let ram_only = shared.ready.load(Ordering::SeqCst) && !shared.persist.load(Ordering::SeqCst);
437 crate::status::Status {
438 root: &shared.root,
439 snapshot: &shared.snapshot,
440 running: true,
441 ram_only,
442 state: Some(state),
443 files: Some(idx.file_count()),
444 trigrams: Some(idx.trigram_count()),
445 memory_bytes: Some(shared.index_bytes.load(Ordering::Relaxed)),
446 }
447 .render()
448 .into_bytes()
449}