trusty-search 0.3.43

Machine-wide hybrid code search service: BM25 + vector + KG, zero cold-start, MCP server
Documentation
//! Glue between [`crate::service::watcher::FileWatcher`] and `CodeIndexer`.
//!
//! Why: The watcher emits raw filesystem events; the indexer wants
//! `index_file` / `remove_chunk` calls. This module bridges them and
//! maintains an [`IndexedFiles`] side-map so that file deletions can locate
//! the chunk IDs that need to come out of the HNSW + corpus.
//!
//! What: [`spawn_watch_loop`] starts the [`FileWatcher`] and a long-running
//! tokio task that consumes events. Returns a `WatcherTask` handle that owns
//! both the `FileWatcher` (so dropping it stops the OS watcher) and the
//! `JoinHandle` of the consumer task.
//!
//! Test: integration test below boots the loop on a temp dir, writes a file,
//! and asserts the indexer's `chunk_count()` increases.

use std::path::Path;
use std::sync::Arc;

use crate::core::chunker::chunk_ast;
use crate::core::CodeIndexer;
use anyhow::Result;
use tokio::sync::mpsc;
use tokio::sync::RwLock;
use tokio::task::JoinHandle;

use crate::service::indexed_files::IndexedFiles;
use crate::service::walker::{path_in_skipped_dir, should_skip_path};
use crate::service::watcher::{FileWatcher, WatchEvent};

/// Handle for a running watch loop. Drop it to stop watching and join the
/// consumer task on the next `await` boundary.
pub struct WatcherTask {
    _watcher: FileWatcher,
    _join: JoinHandle<()>,
}

/// Start watching `root_path` and forward changes into `indexer`.
///
/// `indexed_files` is shared with anyone else who needs to know which chunks
/// belong to which path (e.g. an explicit `remove_file` HTTP handler).
pub fn spawn_watch_loop(
    root_path: &Path,
    indexer: Arc<RwLock<CodeIndexer>>,
    indexed_files: IndexedFiles,
) -> Result<WatcherTask> {
    let (tx, mut rx) = mpsc::unbounded_channel::<WatchEvent>();
    let watcher = FileWatcher::start(root_path.to_path_buf(), tx)?;

    let join = tokio::spawn(async move {
        while let Some(event) = rx.recv().await {
            match event {
                WatchEvent::Modified(path) => {
                    handle_modified(&path, &indexer, &indexed_files).await;
                }
                WatchEvent::Removed(path) => {
                    handle_removed(&path, &indexer, &indexed_files).await;
                }
            }
        }
    });

    Ok(WatcherTask {
        _watcher: watcher,
        _join: join,
    })
}

/// Re-chunk the file and merge it into the indexer. Stale chunks from a
/// previous version of the same file are removed first so we don't accumulate
/// dead entries on edit.
async fn handle_modified(
    path: &Path,
    indexer: &Arc<RwLock<CodeIndexer>>,
    indexed_files: &IndexedFiles,
) {
    // Skip directories — the watcher fires on parent mtime updates too.
    if path.is_dir() {
        return;
    }

    // Apply the same exclusions as the recursive walker: a file modified
    // inside an excluded subtree (e.g. `cdk.out/`, `node_modules/`) or a
    // minified/binary/large file must not enter the index incrementally.
    if path_in_skipped_dir(path) || should_skip_path(path) {
        tracing::debug!(?path, "skip excluded file");
        return;
    }

    let content = match tokio::fs::read_to_string(path).await {
        Ok(s) => s,
        Err(err) => {
            tracing::debug!(?err, ?path, "skip unreadable file");
            return;
        }
    };

    // Drop any prior chunks for this file before re-indexing.
    if let Some(stale_ids) = indexed_files.take(path).await {
        let idx = indexer.read().await;
        for id in stale_ids {
            if let Err(err) = idx.remove_chunk(&id).await {
                tracing::warn!(?err, %id, "remove_chunk failed");
            }
        }
    }

    // Compute fresh chunk IDs and feed them to the indexer.
    let path_str = path.to_string_lossy().into_owned();
    let (chunks, _entities) = chunk_ast(&path_str, &content);
    let new_ids: Vec<String> = chunks.iter().map(|c| c.id.clone()).collect();

    let idx = indexer.read().await;
    if let Err(err) = idx.index_file(&path_str, &content).await {
        tracing::warn!(?err, ?path, "index_file failed");
        return;
    }
    drop(idx);

    indexed_files.record(path.to_path_buf(), new_ids).await;
}

/// Drop every chunk we previously recorded for `path` from the indexer.
async fn handle_removed(
    path: &Path,
    indexer: &Arc<RwLock<CodeIndexer>>,
    indexed_files: &IndexedFiles,
) {
    let Some(ids) = indexed_files.take(path).await else {
        return;
    };
    let idx = indexer.read().await;
    for id in ids {
        if let Err(err) = idx.remove_chunk(&id).await {
            tracing::warn!(?err, %id, "remove_chunk failed");
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::time::Duration;
    use tokio::sync::RwLock;

    /// End-to-end: writing a `.rs` file inside a watched directory causes the
    /// indexer's chunk count to grow within ~2s.
    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
    async fn modified_file_triggers_indexing() {
        let dir = tempfile::tempdir().expect("tempdir");
        let indexer = Arc::new(RwLock::new(CodeIndexer::new("test", dir.path())));
        let tracker = IndexedFiles::new();

        let _task = spawn_watch_loop(dir.path(), Arc::clone(&indexer), tracker.clone())
            .expect("watch loop starts");

        // Allow the OS watcher to install.
        tokio::time::sleep(Duration::from_millis(150)).await;

        let file = dir.path().join("lib.rs");
        tokio::fs::write(&file, "fn alpha() {}\nfn beta() {}\n")
            .await
            .expect("write file");

        // Poll up to 2s for the indexer to pick the change up.
        let deadline = tokio::time::Instant::now() + Duration::from_secs(2);
        loop {
            let count = indexer.read().await.chunk_count();
            if count > 0 {
                break;
            }
            if tokio::time::Instant::now() > deadline {
                panic!("chunk_count never grew above 0");
            }
            tokio::time::sleep(Duration::from_millis(50)).await;
        }

        assert!(
            tracker.len().await >= 1,
            "expected at least one tracked file"
        );
    }

    /// Issue #129: a file created inside `cdk.out/` must NOT be indexed by the
    /// watcher — the build-artefact subtree exclusion applies incrementally.
    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
    async fn cdk_out_file_is_not_indexed() {
        let dir = tempfile::tempdir().expect("tempdir");
        let indexer = Arc::new(RwLock::new(CodeIndexer::new("test", dir.path())));
        let tracker = IndexedFiles::new();

        let _task = spawn_watch_loop(dir.path(), Arc::clone(&indexer), tracker.clone())
            .expect("watch loop starts");

        tokio::time::sleep(Duration::from_millis(150)).await;

        // Write a real source file and a build-artefact file.
        let cdk_dir = dir.path().join("cdk.out/asset.abc/python");
        tokio::fs::create_dir_all(&cdk_dir).await.expect("mkdir");
        tokio::fs::write(cdk_dir.join("vendored.py"), "import boto3\n")
            .await
            .expect("write vendored");
        tokio::fs::write(dir.path().join("handler.py"), "def handler(): pass\n")
            .await
            .expect("write handler");

        // Poll for the real file to be picked up.
        let deadline = tokio::time::Instant::now() + Duration::from_secs(2);
        loop {
            if indexer.read().await.chunk_count() > 0 {
                break;
            }
            if tokio::time::Instant::now() > deadline {
                panic!("real source was never indexed");
            }
            tokio::time::sleep(Duration::from_millis(50)).await;
        }
        // Give the watcher a moment to (not) process the cdk.out file.
        tokio::time::sleep(Duration::from_millis(300)).await;

        // Only handler.py should be tracked; vendored.py must be excluded.
        let tracked = tracker.len().await;
        assert_eq!(
            tracked, 1,
            "exactly one file (handler.py) should be tracked, got {tracked}"
        );
    }
}