use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::Mutex;
use crate::core::registry::{IndexHandle, IndexId};
use crate::service::indexed_files::IndexedFiles;
use crate::service::watch_loop::{spawn_watch_loop, WatcherTask};
const DISABLE_WATCHER_ENV: &str = "TRUSTY_DISABLE_WATCHER";
fn watcher_disabled_for_value(val: Option<&str>) -> bool {
val == Some("1")
}
fn watcher_disabled() -> bool {
watcher_disabled_for_value(std::env::var(DISABLE_WATCHER_ENV).ok().as_deref())
}
#[derive(Clone, Default)]
pub struct WatcherManager {
inner: Arc<Mutex<HashMap<IndexId, WatcherTask>>>,
}
impl WatcherManager {
pub fn new() -> Self {
Self::default()
}
pub async fn spawn_for_index(&self, handle: &Arc<IndexHandle>) {
if watcher_disabled() {
tracing::debug!(
index_id = %handle.id,
"file watcher disabled via {DISABLE_WATCHER_ENV}=1 — not watching",
);
return;
}
let indexed_files = IndexedFiles::new();
let task = match spawn_watch_loop(
&handle.root_path,
Arc::clone(&handle.indexer),
indexed_files,
) {
Ok(task) => task,
Err(e) => {
tracing::warn!(
index_id = %handle.id,
root = %handle.root_path.display(),
"could not start file watcher (incremental indexing disabled for this index): {e:#}",
);
return;
}
};
let mut guard = self.inner.lock().await;
if guard.contains_key(&handle.id) {
drop(guard);
task.stop();
return;
}
guard.insert(handle.id.clone(), task);
drop(guard);
tracing::info!(
index_id = %handle.id,
root = %handle.root_path.display(),
"file watcher active — saves trigger incremental indexing (issue #1621)",
);
}
pub async fn stop_for_index(&self, id: &IndexId) -> bool {
let task = self.inner.lock().await.remove(id);
match task {
Some(task) => {
task.stop();
tracing::debug!(index_id = %id, "file watcher stopped");
true
}
None => false,
}
}
pub async fn stop_all(&self) -> usize {
let mut guard = self.inner.lock().await;
let count = guard.len();
for (_id, task) in guard.drain() {
task.stop();
}
if count > 0 {
tracing::info!("stopped {count} file watcher(s) on shutdown");
}
count
}
pub async fn watched_count(&self) -> usize {
self.inner.lock().await.len()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::CodeIndexer;
use std::sync::Arc;
use tokio::sync::RwLock;
fn handle_for(id: &str, root: &std::path::Path) -> Arc<IndexHandle> {
let indexer = Arc::new(RwLock::new(CodeIndexer::new(id, root)));
Arc::new(IndexHandle::bare(
IndexId::new(id),
indexer,
root.to_path_buf(),
))
}
#[test]
fn disable_env_gate_only_matches_one() {
assert!(watcher_disabled_for_value(Some("1")));
assert!(!watcher_disabled_for_value(None)); assert!(!watcher_disabled_for_value(Some(""))); assert!(!watcher_disabled_for_value(Some("0")));
assert!(!watcher_disabled_for_value(Some("true")));
assert!(!watcher_disabled_for_value(Some("yes")));
assert!(!watcher_disabled_for_value(Some("on")));
assert!(!watcher_disabled_for_value(Some(" 1"))); assert!(!watcher_disabled_for_value(Some("1 ")));
assert!(!watcher_disabled_for_value(Some("11")));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn spawn_is_idempotent() {
let dir = tempfile::tempdir().expect("tempdir");
let mgr = WatcherManager::new();
let handle = handle_for("idx", dir.path());
mgr.spawn_for_index(&handle).await;
mgr.spawn_for_index(&handle).await;
assert_eq!(
mgr.watched_count().await,
1,
"second spawn for the same index must not add a second watcher"
);
mgr.stop_all().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn stop_for_index_removes_entry() {
let dir = tempfile::tempdir().expect("tempdir");
let mgr = WatcherManager::new();
let handle = handle_for("idx", dir.path());
mgr.spawn_for_index(&handle).await;
assert_eq!(mgr.watched_count().await, 1);
let stopped = mgr.stop_for_index(&handle.id).await;
assert!(stopped, "stop_for_index must report it stopped a watcher");
assert_eq!(mgr.watched_count().await, 0);
assert!(!mgr.stop_for_index(&handle.id).await);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn stop_all_clears_all() {
let dir_a = tempfile::tempdir().expect("tempdir a");
let dir_b = tempfile::tempdir().expect("tempdir b");
let mgr = WatcherManager::new();
mgr.spawn_for_index(&handle_for("a", dir_a.path())).await;
mgr.spawn_for_index(&handle_for("b", dir_b.path())).await;
assert_eq!(mgr.watched_count().await, 2);
let stopped = mgr.stop_all().await;
assert_eq!(stopped, 2, "stop_all must report every watcher it stopped");
assert_eq!(mgr.watched_count().await, 0);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn save_triggers_incremental_index_via_manager() {
use std::time::Duration;
let dir = tempfile::tempdir().expect("tempdir");
let handle = handle_for("live", dir.path());
let mgr = WatcherManager::new();
mgr.spawn_for_index(&handle).await;
tokio::time::sleep(Duration::from_millis(150)).await;
tokio::fs::write(dir.path().join("lib.rs"), "fn alpha() {}\nfn beta() {}\n")
.await
.expect("write file");
let deadline = tokio::time::Instant::now() + Duration::from_secs(3);
loop {
if handle.indexer.read().await.chunk_count() > 0 {
break;
}
if tokio::time::Instant::now() > deadline {
panic!("chunk_count never grew — watcher did not index the save");
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
mgr.stop_all().await;
}
}