use std::path::Path;
use std::sync::Arc;
use crate::core::chunker::chunk_ast;
use crate::core::CodeIndexer;
use anyhow::Result;
use tokio::sync::mpsc;
use tokio::sync::RwLock;
use tokio::task::JoinHandle;
use crate::service::indexed_files::IndexedFiles;
use crate::service::walker::{path_in_skipped_dir, should_skip_path};
use crate::service::watcher::{FileWatcher, WatchEvent};
pub struct WatcherTask {
_watcher: FileWatcher,
_join: JoinHandle<()>,
}
pub fn spawn_watch_loop(
root_path: &Path,
indexer: Arc<RwLock<CodeIndexer>>,
indexed_files: IndexedFiles,
) -> Result<WatcherTask> {
let (tx, mut rx) = mpsc::unbounded_channel::<WatchEvent>();
let watcher = FileWatcher::start(root_path.to_path_buf(), tx)?;
let raw_root = root_path.to_path_buf();
let canonical_root =
std::fs::canonicalize(root_path).unwrap_or_else(|_| root_path.to_path_buf());
let join = tokio::spawn(async move {
while let Some(event) = rx.recv().await {
match event {
WatchEvent::Modified(path) => {
handle_modified(&path, &canonical_root, &raw_root, &indexer, &indexed_files)
.await;
}
WatchEvent::Removed(path) => {
handle_removed(&path, &canonical_root, &raw_root, &indexer, &indexed_files)
.await;
}
}
}
});
Ok(WatcherTask {
_watcher: watcher,
_join: join,
})
}
pub fn watcher_relative_path(canonical_root: &Path, raw_root: &Path, event_path: &Path) -> String {
if let Ok(canonical_event) = std::fs::canonicalize(event_path) {
if let Ok(rel) = canonical_event.strip_prefix(canonical_root) {
return rel.to_string_lossy().into_owned();
}
return canonical_event.to_string_lossy().into_owned();
}
if let Ok(rel) = event_path.strip_prefix(canonical_root) {
return rel.to_string_lossy().into_owned();
}
if let Ok(rel) = event_path.strip_prefix(raw_root) {
return rel.to_string_lossy().into_owned();
}
event_path.to_string_lossy().into_owned()
}
async fn handle_modified(
path: &Path,
canonical_root: &Path,
raw_root: &Path,
indexer: &Arc<RwLock<CodeIndexer>>,
indexed_files: &IndexedFiles,
) {
if path.is_dir() {
return;
}
if path_in_skipped_dir(path) || should_skip_path(path) {
tracing::debug!(?path, "skip excluded file");
return;
}
let content = match tokio::fs::read_to_string(path).await {
Ok(s) => s,
Err(err) => {
tracing::debug!(?err, ?path, "skip unreadable file");
return;
}
};
let path_str = watcher_relative_path(canonical_root, raw_root, path);
if let Some(stale_ids) = indexed_files
.take(&std::path::PathBuf::from(&path_str))
.await
{
let idx = indexer.read().await;
for id in stale_ids {
if let Err(err) = idx.remove_chunk(&id).await {
tracing::warn!(?err, %id, "remove_chunk failed");
}
}
}
let (chunks, _entities) = chunk_ast(&path_str, &content);
let new_ids: Vec<String> = chunks.iter().map(|c| c.id.clone()).collect();
let idx = indexer.read().await;
if let Err(err) = idx.index_file(&path_str, &content).await {
tracing::warn!(?err, ?path, "index_file failed");
return;
}
drop(idx);
indexed_files
.record(std::path::PathBuf::from(&path_str), new_ids)
.await;
}
async fn handle_removed(
path: &Path,
canonical_root: &Path,
raw_root: &Path,
indexer: &Arc<RwLock<CodeIndexer>>,
indexed_files: &IndexedFiles,
) {
let rel_key = watcher_relative_path(canonical_root, raw_root, path);
let Some(ids) = indexed_files
.take(&std::path::PathBuf::from(&rel_key))
.await
else {
return;
};
let idx = indexer.read().await;
for id in ids {
if let Err(err) = idx.remove_chunk(&id).await {
tracing::warn!(?err, %id, "remove_chunk failed");
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::path::PathBuf;
use std::time::Duration;
use tokio::sync::RwLock;
#[test]
fn watcher_relative_path_strips_root_prefix() {
let dir = tempfile::tempdir().expect("tempdir");
let root = std::fs::canonicalize(dir.path()).expect("canonicalize root");
let file = root.join("lib.rs");
std::fs::write(&file, "").expect("create file");
let rel = watcher_relative_path(&root, &root, &file);
assert_eq!(rel, "lib.rs", "expected bare filename, got {rel:?}");
assert!(!rel.starts_with('/'), "must not start with '/'");
}
#[test]
fn watcher_relative_path_preserves_subdirectory_structure() {
let dir = tempfile::tempdir().expect("tempdir");
let root = std::fs::canonicalize(dir.path()).expect("canonicalize root");
let subdir = root.join("src").join("auth");
std::fs::create_dir_all(&subdir).expect("create subdir");
let file = subdir.join("mod.rs");
std::fs::write(&file, "").expect("create file");
let rel = watcher_relative_path(&root, &root, &file);
assert_eq!(
rel,
PathBuf::from("src")
.join("auth")
.join("mod.rs")
.display()
.to_string(),
"expected src/auth/mod.rs"
);
assert!(!rel.starts_with('/'), "must not start with '/'");
}
#[test]
fn watcher_relative_path_falls_back_for_file_outside_root() {
let root_dir = tempfile::tempdir().expect("tempdir root");
let other_dir = tempfile::tempdir().expect("tempdir other");
let root = std::fs::canonicalize(root_dir.path()).expect("canonicalize root");
let outside = other_dir.path().join("x.rs");
std::fs::write(&outside, "").expect("create outside file");
let result = watcher_relative_path(&root, &root, &outside);
assert!(
!result.starts_with(root.to_str().unwrap_or("")),
"result must not start with root: {result:?}"
);
assert!(!result.is_empty(), "result must not be empty");
}
#[cfg(unix)]
#[test]
fn watcher_relative_path_resolves_symlinked_root() {
let dir = tempfile::tempdir().expect("tempdir");
let real = dir.path().join("real");
std::fs::create_dir(&real).expect("create real");
let link = dir.path().join("link");
std::os::unix::fs::symlink(&real, &link).expect("symlink");
let file = real.join("foo.rs");
std::fs::write(&file, "").expect("create file");
let canonical_root = std::fs::canonicalize(&link).expect("canonicalize link");
let rel = watcher_relative_path(&canonical_root, &link, &file);
assert_eq!(rel, "foo.rs", "expected bare filename, got {rel:?}");
assert!(!rel.starts_with('/'), "must not start with '/'");
}
#[test]
fn removed_event_produces_same_relative_key_as_modified() {
let dir = tempfile::tempdir().expect("tempdir");
let raw_root = dir.path().to_path_buf();
let canonical_root = std::fs::canonicalize(&raw_root).expect("canonicalize");
let abs_path = canonical_root.join("src").join("lib.rs");
std::fs::create_dir_all(abs_path.parent().expect("parent")).expect("mkdir");
std::fs::write(&abs_path, "").expect("write");
let modified_key = watcher_relative_path(&canonical_root, &raw_root, &abs_path);
std::fs::remove_file(&abs_path).expect("remove");
let removed_key = watcher_relative_path(&canonical_root, &raw_root, &abs_path);
assert_eq!(
modified_key, removed_key,
"Removed arm key {removed_key:?} must equal Modified arm key {modified_key:?}"
);
assert!(
!removed_key.starts_with('/'),
"must be relative: {removed_key:?}"
);
}
#[cfg(unix)]
#[test]
fn removed_deleted_file_dual_root_fallback() {
let dir = tempfile::tempdir().expect("tempdir");
let real = dir.path().join("proj");
std::fs::create_dir(&real).expect("create proj");
let link = dir.path().join("proj-link");
std::os::unix::fs::symlink(&real, &link).expect("symlink");
let canonical_root = std::fs::canonicalize(&link).expect("canonicalize");
let raw_root = link.clone();
let event_path = raw_root.join("src").join("main.rs");
let result = watcher_relative_path(&canonical_root, &raw_root, &event_path);
assert_eq!(
result,
PathBuf::from("src").join("main.rs").display().to_string(),
"dual-root fallback must yield relative key, got {result:?}"
);
assert!(!result.starts_with('/'), "must be relative: {result:?}");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn modified_file_triggers_indexing() {
let dir = tempfile::tempdir().expect("tempdir");
let indexer = Arc::new(RwLock::new(CodeIndexer::new("test", dir.path())));
let tracker = IndexedFiles::new();
let _task = spawn_watch_loop(dir.path(), Arc::clone(&indexer), tracker.clone())
.expect("watch loop starts");
tokio::time::sleep(Duration::from_millis(150)).await;
let file = dir.path().join("lib.rs");
tokio::fs::write(&file, "fn alpha() {}\nfn beta() {}\n")
.await
.expect("write file");
let deadline = tokio::time::Instant::now() + Duration::from_secs(2);
loop {
let count = indexer.read().await.chunk_count();
if count > 0 {
break;
}
if tokio::time::Instant::now() > deadline {
panic!("chunk_count never grew above 0");
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
assert!(
tracker.len().await >= 1,
"expected at least one tracked file"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn cdk_out_file_is_not_indexed() {
let dir = tempfile::tempdir().expect("tempdir");
let indexer = Arc::new(RwLock::new(CodeIndexer::new("test", dir.path())));
let tracker = IndexedFiles::new();
let _task = spawn_watch_loop(dir.path(), Arc::clone(&indexer), tracker.clone())
.expect("watch loop starts");
tokio::time::sleep(Duration::from_millis(150)).await;
let cdk_dir = dir.path().join("cdk.out/asset.abc/python");
tokio::fs::create_dir_all(&cdk_dir).await.expect("mkdir");
tokio::fs::write(cdk_dir.join("vendored.py"), "import boto3\n")
.await
.expect("write vendored");
tokio::fs::write(dir.path().join("handler.py"), "def handler(): pass\n")
.await
.expect("write handler");
let deadline = tokio::time::Instant::now() + Duration::from_secs(2);
loop {
if indexer.read().await.chunk_count() > 0 {
break;
}
if tokio::time::Instant::now() > deadline {
panic!("real source was never indexed");
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
tokio::time::sleep(Duration::from_millis(300)).await;
let tracked = tracker.len().await;
assert_eq!(
tracked, 1,
"exactly one file (handler.py) should be tracked, got {tracked}"
);
}
}