Skip to main content

argyph_core/
watcher.rs

1use std::sync::Arc;
2use std::time::{Duration, Instant};
3
4use argyph_fs::FileWatcher;
5use argyph_store::Store;
6use camino::Utf8PathBuf;
7use tokio::sync::RwLock;
8
9use crate::tiers::{self, TierState};
10
11const MAX_EVENTS_PER_MINUTE: usize = 60;
12const WINDOW_SECS: u64 = 60;
13const POLL_INTERVAL_SECS: u64 = 5;
14
15pub fn should_force_poll() -> bool {
16    std::env::var("ARGYPH_WATCHER").as_deref() == Ok("poll")
17}
18
19pub fn create_watcher(root: &Utf8PathBuf, debounce: Duration) -> FileWatcher {
20    if should_force_poll() {
21        tracing::info!("ARGYPH_WATCHER=poll — using polling watcher");
22        FileWatcher::poll_watcher(root.clone(), Duration::from_secs(POLL_INTERVAL_SECS))
23    } else {
24        match FileWatcher::notify_watcher(root, debounce) {
25            Ok(w) => {
26                tracing::info!("using native filesystem watcher");
27                w
28            }
29            Err(e) => {
30                tracing::warn!(error = %e, "native watcher failed, falling back to polling");
31                FileWatcher::poll_watcher(root.clone(), Duration::from_secs(POLL_INTERVAL_SECS))
32            }
33        }
34    }
35}
36
37pub struct WatcherOrchestrator {
38    root: Utf8PathBuf,
39    watcher: RwLock<FileWatcher>,
40    store: Arc<dyn Store>,
41    _tier_state: Arc<RwLock<TierState>>,
42    event_times: RwLock<Vec<Instant>>,
43}
44
45impl WatcherOrchestrator {
46    pub fn new(
47        root: Utf8PathBuf,
48        watcher: FileWatcher,
49        store: Arc<dyn Store>,
50        #[allow(dead_code)] tier_state: Arc<RwLock<TierState>>,
51    ) -> Self {
52        Self {
53            root,
54            watcher: RwLock::new(watcher),
55            store,
56            _tier_state: tier_state,
57            event_times: RwLock::new(Vec::new()),
58        }
59    }
60
61    pub async fn run(&self) {
62        loop {
63            let batch = {
64                let mut w = self.watcher.write().await;
65                w.next_batch().await
66            };
67
68            if batch.is_empty() {
69                continue;
70            }
71
72            tracing::info!(count = batch.len(), "watcher batch received");
73
74            if self.rate_limited().await {
75                tracing::warn!(
76                    limit = MAX_EVENTS_PER_MINUTE,
77                    window_s = WINDOW_SECS,
78                    "reindex rate cap exceeded, falling back to polling"
79                );
80                self.switch_to_polling().await;
81                continue;
82            }
83
84            match tiers::incremental_reindex(&self.root, &*self.store, &batch).await {
85                Ok(()) => {
86                    tracing::info!(affected = batch.len(), "incremental reindex complete");
87                }
88                Err(e) => {
89                    tracing::error!(error = %e, "incremental reindex failed");
90                }
91            }
92        }
93    }
94
95    async fn rate_limited(&self) -> bool {
96        let now = Instant::now();
97        let mut times = self.event_times.write().await;
98        times.push(now);
99        times.retain(|t| now.duration_since(*t).as_secs() < WINDOW_SECS);
100        times.len() > MAX_EVENTS_PER_MINUTE
101    }
102
103    async fn switch_to_polling(&self) {
104        let root_buf = self.root.to_path_buf();
105        let poller = FileWatcher::poll_watcher(root_buf, Duration::from_secs(POLL_INTERVAL_SECS));
106        let mut w = self.watcher.write().await;
107        *w = poller;
108    }
109}