use std::{
path::{Path, PathBuf},
sync::{
Arc,
atomic::{AtomicBool, Ordering},
mpsc::RecvTimeoutError,
},
thread::JoinHandle,
time::{Duration, Instant},
};
use notify::{Event, RecursiveMode, Watcher as _, recommended_watcher};
use crate::{
config::Config,
fleet,
index::{IndexDatabase, ai::ReconcileOptions, target_for_path},
locks::{self, FileLock},
};
const GC_EVERY_PASSES: u64 = 20;
const PASS_RECONCILE_MAX_SECONDS: u64 = 60;
const SKIP_TIMEOUT: Duration = Duration::from_secs(3);
const FLEET_DEBOUNCE: Duration = Duration::from_millis(500);
const FLEET_MAX_LATENCY: Duration = Duration::from_millis(2000);
#[derive(Debug)]
struct Debounce {
debounce: Duration,
max_latency: Duration,
first: Option<Instant>,
last: Option<Instant>,
}
impl Debounce {
fn new(debounce: Duration, max_latency: Duration) -> Self {
Self { debounce, max_latency, first: None, last: None }
}
fn on_event(&mut self, now: Instant) {
self.first.get_or_insert(now);
self.last = Some(now);
}
fn reset(&mut self) {
self.first = None;
self.last = None;
}
fn fire_at(&self) -> Option<Instant> {
let (first, last) = (self.first?, self.last?);
Some((last + self.debounce).min(first + self.max_latency))
}
fn due_in(&self, now: Instant) -> Option<Duration> {
self.fire_at().map(|at| at.saturating_duration_since(now))
}
fn should_fire(&self, now: Instant) -> bool {
self.fire_at().is_some_and(|at| now >= at)
}
}
pub fn maintenance_pass(config: &Config, run_gc: bool) -> anyhow::Result<()> {
let lock_path = locks::write_lock_path(&config.database);
let _lock = FileLock::acquire_blocking(&lock_path)?;
run_pass(config, run_gc)
}
pub fn maintenance_pass_or_skip(config: &Config, run_gc: bool) -> anyhow::Result<bool> {
let lock_path = locks::write_lock_path(&config.database);
match FileLock::acquire_timeout(&lock_path, SKIP_TIMEOUT)? {
Some(_lock) => {
run_pass(config, run_gc)?;
Ok(true)
},
None => Ok(false),
}
}
fn run_pass(config: &Config, run_gc: bool) -> anyhow::Result<()> {
let db = IndexDatabase::index_discover(config)?;
let runtime = &config.local_ai.embedding.runtime;
let options = ReconcileOptions {
batch_size: Some(runtime.batch_size),
changed_first: true,
max_seconds: Some(PASS_RECONCILE_MAX_SECONDS),
max_embedding_chars: runtime.max_embedding_chars,
intra_threads: runtime.ort_threads.map(|n| n as usize),
..ReconcileOptions::default()
};
db.reconcile_with_options_progress(options, |_| {})?;
if run_gc {
let _ = db.gc();
}
let _ = db.memory_validate();
Ok(())
}
fn event_is_relevant(config: &Config, event: &Event) -> bool {
if event.need_rescan() {
return true;
}
event.paths.iter().any(|path| {
path.strip_prefix(&config.root)
.ok()
.is_some_and(|relative| target_for_path(config, relative).is_some())
})
}
fn event_targets_binary(fleet_bin: Option<&Path>, event: &Event) -> bool {
let Some(bin) = fleet_bin else {
return false;
};
event.paths.iter().any(|path| path == bin)
}
#[derive(Debug)]
pub struct Watcher {
stop: Arc<AtomicBool>,
handle: Option<JoinHandle<()>>,
}
impl Watcher {
pub fn spawn(config: Config) -> Option<Watcher> {
Self::spawn_with_fleet(config, None)
}
pub fn spawn_with_fleet(config: Config, fleet_bin: Option<PathBuf>) -> Option<Watcher> {
if !config.watch.enabled || std::env::var_os("RAG_RAT_NO_WATCH").is_some() {
return None;
}
let stop = Arc::new(AtomicBool::new(false));
let handle = std::thread::Builder::new()
.name("rag-rat-watch".to_string())
.spawn({
let stop = Arc::clone(&stop);
move || watcher_main(config, fleet_bin, &stop)
})
.ok()?;
Some(Watcher { stop, handle: Some(handle) })
}
}
impl Drop for Watcher {
fn drop(&mut self) {
self.stop.store(true, Ordering::Relaxed);
if let Some(handle) = self.handle.take() {
let _ = handle.join();
}
}
}
fn sleep_checking_stop(total: Duration, stop: &AtomicBool) {
let step = Duration::from_millis(200);
let mut waited = Duration::ZERO;
while waited < total {
if stop.load(Ordering::Relaxed) {
return;
}
std::thread::sleep(step.min(total - waited));
waited += step;
}
}
fn watcher_main(config: Config, fleet_bin: Option<PathBuf>, stop: &AtomicBool) {
let base_dir =
config.database.parent().map(Path::to_path_buf).unwrap_or_else(|| config.root.clone());
let election_path = locks::election_lock_path(&base_dir, &config.root);
let _election = loop {
if stop.load(Ordering::Relaxed) {
return;
}
match FileLock::try_acquire(&election_path) {
Ok(Some(lock)) => break lock,
_ => sleep_checking_stop(Duration::from_secs(5), stop),
}
};
let _ = maintenance_pass(&config, true);
let (tx, rx) = std::sync::mpsc::channel();
let Ok(mut notify_watcher) = recommended_watcher(move |res| {
let _ = tx.send(res);
}) else {
return;
};
for target in &config.targets {
for dir in &target.directories {
let _ = notify_watcher.watch(&config.root.join(dir), RecursiveMode::Recursive);
}
}
let fleet_dir = fleet_bin.as_ref().and_then(|bin| bin.parent());
if let Some(dir) = fleet_dir {
let _ = notify_watcher.watch(dir, RecursiveMode::NonRecursive);
}
let mut debounce = Debounce::new(
Duration::from_millis(config.watch.debounce_ms),
Duration::from_millis(config.watch.max_latency_ms),
);
let mut fleet_debounce = Debounce::new(FLEET_DEBOUNCE, FLEET_MAX_LATENCY);
let periodic = (config.watch.periodic_sweep_secs > 0)
.then(|| Duration::from_secs(config.watch.periodic_sweep_secs));
let mut passes: u64 = 0;
let mut last_pass = Instant::now(); loop {
if stop.load(Ordering::Relaxed) {
break;
}
let now = Instant::now();
let periodic_wait = periodic.map(|p| (last_pass + p).saturating_duration_since(now));
let wait = [debounce.due_in(now), fleet_debounce.due_in(now), periodic_wait]
.into_iter()
.flatten()
.min()
.unwrap_or(Duration::from_millis(500));
match rx.recv_timeout(wait) {
Ok(Ok(event)) => {
let now = Instant::now();
if event_is_relevant(&config, &event) {
debounce.on_event(now);
}
if event_targets_binary(fleet_bin.as_deref(), &event) {
fleet_debounce.on_event(now);
}
},
Ok(_) => {},
Err(RecvTimeoutError::Timeout) => {},
Err(RecvTimeoutError::Disconnected) => break,
}
let now = Instant::now();
let periodic_due = periodic.is_some_and(|p| now >= last_pass + p);
if debounce.should_fire(now) || periodic_due {
passes += 1;
let _ = maintenance_pass(&config, passes.is_multiple_of(GC_EVERY_PASSES));
debounce.reset();
last_pass = Instant::now();
}
if fleet_debounce.should_fire(now)
&& let Some(bin) = fleet_bin.as_deref()
{
fleet::trigger(bin);
fleet_debounce.reset();
}
}
if debounce.fire_at().is_some() {
let _ = shutdown_discover(&config);
}
}
fn shutdown_discover(config: &Config) -> anyhow::Result<bool> {
let lock_path = locks::write_lock_path(&config.database);
match FileLock::acquire_timeout(&lock_path, SKIP_TIMEOUT)? {
Some(_lock) => {
IndexDatabase::index_discover(config)?;
Ok(true)
},
None => Ok(false),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn debounce_fires_after_quiet_window() {
let mut d = Debounce::new(Duration::from_millis(400), Duration::from_millis(2500));
let t0 = Instant::now();
d.on_event(t0);
assert!(!d.should_fire(t0 + Duration::from_millis(399)), "fires before quiet window");
assert!(d.should_fire(t0 + Duration::from_millis(400)), "fires at quiet window");
}
#[test]
fn debounce_max_latency_cap_beats_sustained_events() {
let debounce = Duration::from_millis(400);
let max = Duration::from_millis(2500);
let mut d = Debounce::new(debounce, max);
let t0 = Instant::now();
d.on_event(t0);
let mut now = t0;
for _ in 0..100 {
now += Duration::from_millis(200);
d.on_event(now);
if now >= t0 + max {
break;
}
assert!(!d.should_fire(now), "should not fire mid-stream before the cap");
}
assert!(
d.should_fire(t0 + max),
"max-latency cap must force a pass under sustained writes"
);
}
#[test]
fn debounce_idle_has_no_deadline() {
let d = Debounce::new(Duration::from_millis(400), Duration::from_millis(2500));
assert!(d.due_in(Instant::now()).is_none());
assert!(!d.should_fire(Instant::now()));
}
}