1use std::sync::Arc;
2use std::time::{Duration, Instant};
3
4use argyph_fs::FileWatcher;
5use argyph_store::Store;
6use camino::Utf8PathBuf;
7use tokio::sync::RwLock;
8
9use crate::tiers::{self, TierState};
10
11const MAX_EVENTS_PER_MINUTE: usize = 60;
12const WINDOW_SECS: u64 = 60;
13const POLL_INTERVAL_SECS: u64 = 5;
14
15pub fn should_force_poll() -> bool {
16 std::env::var("ARGYPH_WATCHER").as_deref() == Ok("poll")
17}
18
19pub fn create_watcher(root: &Utf8PathBuf, debounce: Duration) -> FileWatcher {
20 if should_force_poll() {
21 tracing::info!("ARGYPH_WATCHER=poll — using polling watcher");
22 FileWatcher::poll_watcher(root.clone(), Duration::from_secs(POLL_INTERVAL_SECS))
23 } else {
24 match FileWatcher::notify_watcher(root, debounce) {
25 Ok(w) => {
26 tracing::info!("using native filesystem watcher");
27 w
28 }
29 Err(e) => {
30 tracing::warn!(error = %e, "native watcher failed, falling back to polling");
31 FileWatcher::poll_watcher(root.clone(), Duration::from_secs(POLL_INTERVAL_SECS))
32 }
33 }
34 }
35}
36
37pub struct WatcherOrchestrator {
38 root: Utf8PathBuf,
39 watcher: RwLock<FileWatcher>,
40 store: Arc<dyn Store>,
41 _tier_state: Arc<RwLock<TierState>>,
42 event_times: RwLock<Vec<Instant>>,
43}
44
45impl WatcherOrchestrator {
46 pub fn new(
47 root: Utf8PathBuf,
48 watcher: FileWatcher,
49 store: Arc<dyn Store>,
50 #[allow(dead_code)] tier_state: Arc<RwLock<TierState>>,
51 ) -> Self {
52 Self {
53 root,
54 watcher: RwLock::new(watcher),
55 store,
56 _tier_state: tier_state,
57 event_times: RwLock::new(Vec::new()),
58 }
59 }
60
61 pub async fn run(&self) {
62 loop {
63 let batch = {
64 let mut w = self.watcher.write().await;
65 w.next_batch().await
66 };
67
68 if batch.is_empty() {
69 continue;
70 }
71
72 tracing::info!(count = batch.len(), "watcher batch received");
73
74 if self.rate_limited().await {
75 tracing::warn!(
76 limit = MAX_EVENTS_PER_MINUTE,
77 window_s = WINDOW_SECS,
78 "reindex rate cap exceeded, falling back to polling"
79 );
80 self.switch_to_polling().await;
81 continue;
82 }
83
84 match tiers::incremental_reindex(&self.root, &*self.store, &batch).await {
85 Ok(()) => {
86 tracing::info!(affected = batch.len(), "incremental reindex complete");
87 }
88 Err(e) => {
89 tracing::error!(error = %e, "incremental reindex failed");
90 }
91 }
92 }
93 }
94
95 async fn rate_limited(&self) -> bool {
96 let now = Instant::now();
97 let mut times = self.event_times.write().await;
98 times.push(now);
99 times.retain(|t| now.duration_since(*t).as_secs() < WINDOW_SECS);
100 times.len() > MAX_EVENTS_PER_MINUTE
101 }
102
103 async fn switch_to_polling(&self) {
104 let root_buf = self.root.to_path_buf();
105 let poller = FileWatcher::poll_watcher(root_buf, Duration::from_secs(POLL_INTERVAL_SECS));
106 let mut w = self.watcher.write().await;
107 *w = poller;
108 }
109}