argyph-core 1.0.2

Local-first MCP server giving AI coding agents fast, structured, and semantic context over any codebase.
Documentation
use std::sync::Arc;
use std::time::{Duration, Instant};

use argyph_fs::FileWatcher;
use argyph_store::Store;
use camino::Utf8PathBuf;
use tokio::sync::RwLock;

use crate::tiers::{self, TierState};

const MAX_EVENTS_PER_MINUTE: usize = 60;
const WINDOW_SECS: u64 = 60;
const POLL_INTERVAL_SECS: u64 = 5;

pub fn should_force_poll() -> bool {
    std::env::var("ARGYPH_WATCHER").as_deref() == Ok("poll")
}

pub fn create_watcher(root: &Utf8PathBuf, debounce: Duration) -> FileWatcher {
    if should_force_poll() {
        tracing::info!("ARGYPH_WATCHER=poll — using polling watcher");
        FileWatcher::poll_watcher(root.clone(), Duration::from_secs(POLL_INTERVAL_SECS))
    } else {
        match FileWatcher::notify_watcher(root, debounce) {
            Ok(w) => {
                tracing::info!("using native filesystem watcher");
                w
            }
            Err(e) => {
                tracing::warn!(error = %e, "native watcher failed, falling back to polling");
                FileWatcher::poll_watcher(root.clone(), Duration::from_secs(POLL_INTERVAL_SECS))
            }
        }
    }
}

pub struct WatcherOrchestrator {
    root: Utf8PathBuf,
    watcher: RwLock<FileWatcher>,
    store: Arc<dyn Store>,
    _tier_state: Arc<RwLock<TierState>>,
    event_times: RwLock<Vec<Instant>>,
}

impl WatcherOrchestrator {
    pub fn new(
        root: Utf8PathBuf,
        watcher: FileWatcher,
        store: Arc<dyn Store>,
        #[allow(dead_code)] tier_state: Arc<RwLock<TierState>>,
    ) -> Self {
        Self {
            root,
            watcher: RwLock::new(watcher),
            store,
            _tier_state: tier_state,
            event_times: RwLock::new(Vec::new()),
        }
    }

    pub async fn run(&self) {
        loop {
            let batch = {
                let mut w = self.watcher.write().await;
                w.next_batch().await
            };

            if batch.is_empty() {
                continue;
            }

            tracing::info!(count = batch.len(), "watcher batch received");

            if self.rate_limited().await {
                tracing::warn!(
                    limit = MAX_EVENTS_PER_MINUTE,
                    window_s = WINDOW_SECS,
                    "reindex rate cap exceeded, falling back to polling"
                );
                self.switch_to_polling().await;
                continue;
            }

            match tiers::incremental_reindex(&self.root, &*self.store, &batch).await {
                Ok(()) => {
                    tracing::info!(affected = batch.len(), "incremental reindex complete");
                }
                Err(e) => {
                    tracing::error!(error = %e, "incremental reindex failed");
                }
            }
        }
    }

    async fn rate_limited(&self) -> bool {
        let now = Instant::now();
        let mut times = self.event_times.write().await;
        times.push(now);
        times.retain(|t| now.duration_since(*t).as_secs() < WINDOW_SECS);
        times.len() > MAX_EVENTS_PER_MINUTE
    }

    async fn switch_to_polling(&self) {
        let root_buf = self.root.to_path_buf();
        let poller = FileWatcher::poll_watcher(root_buf, Duration::from_secs(POLL_INTERVAL_SECS));
        let mut w = self.watcher.write().await;
        *w = poller;
    }
}