agentdir 0.1.5

Virtual filesystem for agent-optimized file exploration using CoW reflinks
Documentation
use std::sync::Arc;
use std::time::Duration;

use tokio::sync::mpsc;
use tokio::time;

use crate::backend::{Backend, SourceEvent, WatchHandle};
use crate::error::Result;
use crate::types::SourcePath;

pub struct FileWatcher {
    backend: Arc<dyn Backend>,
    roots: Vec<SourcePath>,
    poll_interval: Duration,
}

impl FileWatcher {
    pub fn new(backend: Arc<dyn Backend>, roots: Vec<SourcePath>) -> Self {
        Self {
            backend,
            roots,
            poll_interval: Duration::from_secs(60),
        }
    }

    pub fn with_poll_interval(mut self, interval: Duration) -> Self {
        self.poll_interval = interval;
        self
    }

    pub async fn start(&self) -> Result<(mpsc::Receiver<SourceEvent>, WatchHandle)> {
        let (tx, rx) = mpsc::channel(256);

        let handle = self.backend.watch(&self.roots, tx.clone()).await?;

        let poll_tx = tx.clone();
        let poll_interval = self.poll_interval;
        let cancel = handle.cancel_token();

        tokio::spawn(async move {
            let mut interval = time::interval(poll_interval);
            interval.tick().await;

            loop {
                tokio::select! {
                    _ = interval.tick() => {
                        let _ = poll_tx.send(SourceEvent::RescanNeeded).await;
                    }
                    _ = cancel.cancelled() => {
                        break;
                    }
                }
            }
        });

        Ok((rx, handle))
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::backend::local::LocalBackend;
    use std::time::Duration;
    use tempfile::TempDir;
    use tokio::time::timeout;

    #[tokio::test]
    async fn test_detect_file_creation() {
        let dir = TempDir::new().unwrap();
        let backend: Arc<dyn Backend> = Arc::new(LocalBackend);
        let roots = vec![SourcePath::new(dir.path().to_path_buf())];

        let watcher = FileWatcher::new(backend, roots);
        let (mut rx, _handle) = watcher.start().await.unwrap();

        tokio::time::sleep(Duration::from_millis(200)).await;

        std::fs::write(dir.path().join("newfile.txt"), b"hello").unwrap();

        let event = timeout(Duration::from_secs(5), rx.recv()).await;
        assert!(event.is_ok(), "Timed out waiting for file creation event");
        assert!(event.unwrap().is_some());
    }

    #[tokio::test]
    async fn test_start_waits_until_roots_are_watched() {
        let dir = TempDir::new().unwrap();
        let backend: Arc<dyn Backend> = Arc::new(LocalBackend);
        let roots = vec![SourcePath::new(dir.path().to_path_buf())];

        let watcher = FileWatcher::new(backend, roots);
        let (mut rx, _handle) = watcher.start().await.unwrap();

        std::fs::write(dir.path().join("immediate.txt"), b"hello").unwrap();

        let event = timeout(Duration::from_secs(5), rx.recv()).await;
        assert!(
            event.is_ok(),
            "watcher missed a file created immediately after start"
        );
        assert!(event.unwrap().is_some());
    }

    #[tokio::test]
    async fn test_watcher_cleanup() {
        let dir = TempDir::new().unwrap();
        let backend: Arc<dyn Backend> = Arc::new(LocalBackend);
        let roots = vec![SourcePath::new(dir.path().to_path_buf())];

        let watcher = FileWatcher::new(backend, roots);
        let (_rx, handle) = watcher.start().await.unwrap();

        drop(handle);
    }

    #[tokio::test]
    async fn test_periodic_polling_emits_rescan() {
        let dir = TempDir::new().unwrap();
        let backend: Arc<dyn Backend> = Arc::new(LocalBackend);
        let roots = vec![SourcePath::new(dir.path().to_path_buf())];

        let watcher =
            FileWatcher::new(backend, roots).with_poll_interval(Duration::from_millis(200));
        let (mut rx, _handle) = watcher.start().await.unwrap();

        let found_rescan = timeout(Duration::from_secs(3), async {
            loop {
                if let Some(event) = rx.recv().await {
                    if matches!(event, SourceEvent::RescanNeeded) {
                        return true;
                    }
                } else {
                    return false;
                }
            }
        })
        .await;

        assert!(
            found_rescan.is_ok() && found_rescan.unwrap(),
            "Expected RescanNeeded event from periodic polling"
        );
    }
}