1use std::{
13 path::{Path, PathBuf},
14 sync::{
15 Arc,
16 atomic::{AtomicBool, Ordering},
17 mpsc::RecvTimeoutError,
18 },
19 thread::JoinHandle,
20 time::{Duration, Instant},
21};
22
23use notify::{Event, RecursiveMode, Watcher as _, recommended_watcher};
24
25use crate::{
26 config::Config,
27 fleet,
28 index::{IndexDatabase, ai::ReconcileOptions, target_for_path},
29 locks::{self, FileLock},
30};
31
32const GC_EVERY_PASSES: u64 = 20;
35const PASS_RECONCILE_MAX_SECONDS: u64 = 60;
37const SKIP_TIMEOUT: Duration = Duration::from_secs(3);
39const FLEET_DEBOUNCE: Duration = Duration::from_millis(500);
42const FLEET_MAX_LATENCY: Duration = Duration::from_millis(2000);
44
45#[derive(Debug)]
48struct Debounce {
49 debounce: Duration,
50 max_latency: Duration,
51 first: Option<Instant>,
52 last: Option<Instant>,
53}
54
55impl Debounce {
56 fn new(debounce: Duration, max_latency: Duration) -> Self {
57 Self { debounce, max_latency, first: None, last: None }
58 }
59
60 fn on_event(&mut self, now: Instant) {
61 self.first.get_or_insert(now);
62 self.last = Some(now);
63 }
64
65 fn reset(&mut self) {
66 self.first = None;
67 self.last = None;
68 }
69
70 fn fire_at(&self) -> Option<Instant> {
73 let (first, last) = (self.first?, self.last?);
74 Some((last + self.debounce).min(first + self.max_latency))
75 }
76
77 fn due_in(&self, now: Instant) -> Option<Duration> {
78 self.fire_at().map(|at| at.saturating_duration_since(now))
79 }
80
81 fn should_fire(&self, now: Instant) -> bool {
82 self.fire_at().is_some_and(|at| now >= at)
83 }
84}
85
86pub fn maintenance_pass(config: &Config, run_gc: bool) -> anyhow::Result<()> {
88 let lock_path = locks::write_lock_path(&config.database);
89 let _lock = FileLock::acquire_blocking(&lock_path)?;
90 run_pass(config, run_gc)
91}
92
93pub fn maintenance_pass_or_skip(config: &Config, run_gc: bool) -> anyhow::Result<bool> {
96 let lock_path = locks::write_lock_path(&config.database);
97 match FileLock::acquire_timeout(&lock_path, SKIP_TIMEOUT)? {
98 Some(_lock) => {
99 run_pass(config, run_gc)?;
100 Ok(true)
101 },
102 None => Ok(false),
103 }
104}
105
106fn run_pass(config: &Config, run_gc: bool) -> anyhow::Result<()> {
107 let db = IndexDatabase::index_discover(config)?;
108 let runtime = &config.local_ai.embedding.runtime;
109 let options = ReconcileOptions {
110 batch_size: Some(runtime.batch_size),
111 changed_first: true,
112 max_seconds: Some(PASS_RECONCILE_MAX_SECONDS),
113 max_embedding_chars: runtime.max_embedding_chars,
114 intra_threads: runtime.ort_threads.map(|n| n as usize),
115 ..ReconcileOptions::default()
116 };
117 db.reconcile_with_options_progress(options, |_| {})?;
118 if run_gc {
119 let _ = db.gc();
120 }
121 let _ = db.memory_validate();
122 Ok(())
123}
124
125fn event_is_relevant(config: &Config, event: &Event) -> bool {
129 if event.need_rescan() {
130 return true;
131 }
132 event.paths.iter().any(|path| {
133 path.strip_prefix(&config.root)
134 .ok()
135 .is_some_and(|relative| target_for_path(config, relative).is_some())
136 })
137}
138
139fn event_targets_binary(fleet_bin: Option<&Path>, event: &Event) -> bool {
143 let Some(bin) = fleet_bin else {
144 return false;
145 };
146 event.paths.iter().any(|path| path == bin)
147}
148
149#[derive(Debug)]
151pub struct Watcher {
152 stop: Arc<AtomicBool>,
153 handle: Option<JoinHandle<()>>,
154}
155
156impl Watcher {
157 pub fn spawn(config: Config) -> Option<Watcher> {
160 Self::spawn_with_fleet(config, None)
161 }
162
163 pub fn spawn_with_fleet(config: Config, fleet_bin: Option<PathBuf>) -> Option<Watcher> {
168 if !config.watch.enabled || std::env::var_os("RAG_RAT_NO_WATCH").is_some() {
169 return None;
170 }
171 let stop = Arc::new(AtomicBool::new(false));
172 let handle = std::thread::Builder::new()
173 .name("rag-rat-watch".to_string())
174 .spawn({
175 let stop = Arc::clone(&stop);
176 move || watcher_main(config, fleet_bin, &stop)
177 })
178 .ok()?;
179 Some(Watcher { stop, handle: Some(handle) })
180 }
181}
182
183impl Drop for Watcher {
184 fn drop(&mut self) {
185 self.stop.store(true, Ordering::Relaxed);
186 if let Some(handle) = self.handle.take() {
187 let _ = handle.join();
188 }
189 }
190}
191
192fn sleep_checking_stop(total: Duration, stop: &AtomicBool) {
193 let step = Duration::from_millis(200);
194 let mut waited = Duration::ZERO;
195 while waited < total {
196 if stop.load(Ordering::Relaxed) {
197 return;
198 }
199 std::thread::sleep(step.min(total - waited));
200 waited += step;
201 }
202}
203
204fn watcher_main(config: Config, fleet_bin: Option<PathBuf>, stop: &AtomicBool) {
205 let base_dir =
206 config.database.parent().map(Path::to_path_buf).unwrap_or_else(|| config.root.clone());
207 let election_path = locks::election_lock_path(&base_dir, &config.root);
208
209 let _election = loop {
211 if stop.load(Ordering::Relaxed) {
212 return;
213 }
214 match FileLock::try_acquire(&election_path) {
215 Ok(Some(lock)) => break lock,
216 _ => sleep_checking_stop(Duration::from_secs(5), stop),
217 }
218 };
219
220 let _ = maintenance_pass(&config, true);
222
223 let (tx, rx) = std::sync::mpsc::channel();
224 let Ok(mut notify_watcher) = recommended_watcher(move |res| {
225 let _ = tx.send(res);
226 }) else {
227 return;
228 };
229 for target in &config.targets {
230 for dir in &target.directories {
231 let _ = notify_watcher.watch(&config.root.join(dir), RecursiveMode::Recursive);
232 }
233 }
234 let fleet_dir = fleet_bin.as_ref().and_then(|bin| bin.parent());
238 if let Some(dir) = fleet_dir {
239 let _ = notify_watcher.watch(dir, RecursiveMode::NonRecursive);
240 }
241
242 let mut debounce = Debounce::new(
243 Duration::from_millis(config.watch.debounce_ms),
244 Duration::from_millis(config.watch.max_latency_ms),
245 );
246 let mut fleet_debounce = Debounce::new(FLEET_DEBOUNCE, FLEET_MAX_LATENCY);
247 let periodic = (config.watch.periodic_sweep_secs > 0)
249 .then(|| Duration::from_secs(config.watch.periodic_sweep_secs));
250 let mut passes: u64 = 0;
251 let mut last_pass = Instant::now(); loop {
253 if stop.load(Ordering::Relaxed) {
254 break;
255 }
256 let now = Instant::now();
257 let periodic_wait = periodic.map(|p| (last_pass + p).saturating_duration_since(now));
258 let wait = [debounce.due_in(now), fleet_debounce.due_in(now), periodic_wait]
259 .into_iter()
260 .flatten()
261 .min()
262 .unwrap_or(Duration::from_millis(500));
263 match rx.recv_timeout(wait) {
264 Ok(Ok(event)) => {
265 let now = Instant::now();
266 if event_is_relevant(&config, &event) {
267 debounce.on_event(now);
268 }
269 if event_targets_binary(fleet_bin.as_deref(), &event) {
270 fleet_debounce.on_event(now);
271 }
272 },
273 Ok(_) => {},
274 Err(RecvTimeoutError::Timeout) => {},
275 Err(RecvTimeoutError::Disconnected) => break,
276 }
277 let now = Instant::now();
278 let periodic_due = periodic.is_some_and(|p| now >= last_pass + p);
279 if debounce.should_fire(now) || periodic_due {
280 passes += 1;
281 let _ = maintenance_pass(&config, passes.is_multiple_of(GC_EVERY_PASSES));
282 debounce.reset();
283 last_pass = Instant::now();
284 }
285 if fleet_debounce.should_fire(now)
286 && let Some(bin) = fleet_bin.as_deref()
287 {
288 fleet::trigger(bin);
290 fleet_debounce.reset();
291 }
292 }
293
294 if debounce.fire_at().is_some() {
298 let _ = shutdown_discover(&config);
299 }
300}
301
302fn shutdown_discover(config: &Config) -> anyhow::Result<bool> {
304 let lock_path = locks::write_lock_path(&config.database);
305 match FileLock::acquire_timeout(&lock_path, SKIP_TIMEOUT)? {
306 Some(_lock) => {
307 IndexDatabase::index_discover(config)?;
308 Ok(true)
309 },
310 None => Ok(false),
311 }
312}
313
314#[cfg(test)]
315mod tests {
316 use super::*;
317
318 #[test]
319 fn debounce_fires_after_quiet_window() {
320 let mut d = Debounce::new(Duration::from_millis(400), Duration::from_millis(2500));
321 let t0 = Instant::now();
322 d.on_event(t0);
323 assert!(!d.should_fire(t0 + Duration::from_millis(399)), "fires before quiet window");
324 assert!(d.should_fire(t0 + Duration::from_millis(400)), "fires at quiet window");
325 }
326
327 #[test]
328 fn debounce_max_latency_cap_beats_sustained_events() {
329 let debounce = Duration::from_millis(400);
330 let max = Duration::from_millis(2500);
331 let mut d = Debounce::new(debounce, max);
332 let t0 = Instant::now();
333 d.on_event(t0);
334 let mut now = t0;
336 for _ in 0..100 {
337 now += Duration::from_millis(200);
338 d.on_event(now);
339 if now >= t0 + max {
340 break;
341 }
342 assert!(!d.should_fire(now), "should not fire mid-stream before the cap");
343 }
344 assert!(
346 d.should_fire(t0 + max),
347 "max-latency cap must force a pass under sustained writes"
348 );
349 }
350
351 #[test]
352 fn debounce_idle_has_no_deadline() {
353 let d = Debounce::new(Duration::from_millis(400), Duration::from_millis(2500));
354 assert!(d.due_in(Instant::now()).is_none());
355 assert!(!d.should_fire(Instant::now()));
356 }
357}