use std::path::Path;
use std::sync::Arc;
use crate::core::chunker::chunk_ast;
use crate::core::CodeIndexer;
use anyhow::Result;
use tokio::sync::mpsc;
use tokio::sync::RwLock;
use tokio::task::JoinHandle;
use crate::service::indexed_files::IndexedFiles;
use crate::service::watcher::{FileWatcher, WatchEvent};
pub struct WatcherTask {
_watcher: FileWatcher,
_join: JoinHandle<()>,
}
pub fn spawn_watch_loop(
root_path: &Path,
indexer: Arc<RwLock<CodeIndexer>>,
indexed_files: IndexedFiles,
) -> Result<WatcherTask> {
let (tx, mut rx) = mpsc::unbounded_channel::<WatchEvent>();
let watcher = FileWatcher::start(root_path.to_path_buf(), tx)?;
let join = tokio::spawn(async move {
while let Some(event) = rx.recv().await {
match event {
WatchEvent::Modified(path) => {
handle_modified(&path, &indexer, &indexed_files).await;
}
WatchEvent::Removed(path) => {
handle_removed(&path, &indexer, &indexed_files).await;
}
}
}
});
Ok(WatcherTask {
_watcher: watcher,
_join: join,
})
}
async fn handle_modified(
path: &Path,
indexer: &Arc<RwLock<CodeIndexer>>,
indexed_files: &IndexedFiles,
) {
if path.is_dir() {
return;
}
let content = match tokio::fs::read_to_string(path).await {
Ok(s) => s,
Err(err) => {
tracing::debug!(?err, ?path, "skip unreadable file");
return;
}
};
if let Some(stale_ids) = indexed_files.take(path).await {
let idx = indexer.read().await;
for id in stale_ids {
if let Err(err) = idx.remove_chunk(&id).await {
tracing::warn!(?err, %id, "remove_chunk failed");
}
}
}
let path_str = path.to_string_lossy().into_owned();
let (chunks, _entities) = chunk_ast(&path_str, &content);
let new_ids: Vec<String> = chunks.iter().map(|c| c.id.clone()).collect();
let idx = indexer.read().await;
if let Err(err) = idx.index_file(&path_str, &content).await {
tracing::warn!(?err, ?path, "index_file failed");
return;
}
drop(idx);
indexed_files.record(path.to_path_buf(), new_ids).await;
}
async fn handle_removed(
path: &Path,
indexer: &Arc<RwLock<CodeIndexer>>,
indexed_files: &IndexedFiles,
) {
let Some(ids) = indexed_files.take(path).await else {
return;
};
let idx = indexer.read().await;
for id in ids {
if let Err(err) = idx.remove_chunk(&id).await {
tracing::warn!(?err, %id, "remove_chunk failed");
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
use tokio::sync::RwLock;
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn modified_file_triggers_indexing() {
let dir = tempfile::tempdir().expect("tempdir");
let indexer = Arc::new(RwLock::new(CodeIndexer::new("test", dir.path())));
let tracker = IndexedFiles::new();
let _task = spawn_watch_loop(dir.path(), Arc::clone(&indexer), tracker.clone())
.expect("watch loop starts");
tokio::time::sleep(Duration::from_millis(150)).await;
let file = dir.path().join("lib.rs");
tokio::fs::write(&file, "fn alpha() {}\nfn beta() {}\n")
.await
.expect("write file");
let deadline = tokio::time::Instant::now() + Duration::from_secs(2);
loop {
let count = indexer.read().await.chunk_count();
if count > 0 {
break;
}
if tokio::time::Instant::now() > deadline {
panic!("chunk_count never grew above 0");
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
assert!(
tracker.len().await >= 1,
"expected at least one tracked file"
);
}
}