Skip to main content

synaps_cli/events/
ingest.rs

1//! Inbox watcher — uses inotify (via the `notify` crate) to instantly react
2//! to dropped event JSON files in the inbox directory. Falls back to polling.
3
4use std::path::{Path, PathBuf};
5use std::sync::Arc;
6
7use super::queue::EventQueue;
8use super::types::Event;
9use notify::Watcher;
10
11async fn process_file(path: &Path, queue: &EventQueue) {
12    #[cfg(unix)]
13    {
14        if let Ok(meta) = tokio::fs::symlink_metadata(path).await {
15            if meta.file_type().is_symlink() {
16                tracing::warn!("refusing symlink in inbox: {}", path.display());
17                let _ = tokio::fs::remove_file(path).await;
18                return;
19            }
20        }
21    }
22    if path.extension().is_some_and(|e| e == "json") {
23        if let Ok(meta) = tokio::fs::metadata(path).await {
24            if meta.len() > 256 * 1024 {
25                tracing::warn!("inbox file too large ({}B), skipping: {}", meta.len(), path.display());
26                let _ = tokio::fs::rename(path, path.with_extension("json.oversized")).await;
27                return;
28            }
29        }
30        match tokio::fs::read_to_string(path).await {
31            Ok(content) => match serde_json::from_str::<Event>(&content) {
32                Ok(event) => {
33                    tracing::info!("Inbox event: {} from {}", event.id, event.source.source_type);
34                    match queue.push(event) {
35                        Ok(()) => { let _ = tokio::fs::remove_file(path).await; }
36                        Err(e) => { tracing::warn!("Queue full, retry later: {}", e); }
37                    }
38                }
39                Err(e) => {
40                    tracing::warn!("Invalid event {}: {}", path.display(), e);
41                    let _ = tokio::fs::rename(path, path.with_extension("json.error")).await;
42                }
43            },
44            Err(e) => tracing::warn!("Read failed {}: {}", path.display(), e),
45        }
46    }
47}
48
49async fn scan_inbox(dir: &Path, queue: &EventQueue) {
50    if let Ok(mut entries) = tokio::fs::read_dir(dir).await {
51        while let Ok(Some(entry)) = entries.next_entry().await {
52            process_file(&entry.path(), queue).await;
53        }
54    }
55}
56
57pub async fn watch_inbox(inbox_dir: PathBuf, queue: Arc<EventQueue>, shutdown: Arc<std::sync::atomic::AtomicBool>) {
58    let _ = tokio::fs::create_dir_all(&inbox_dir).await;
59    #[cfg(unix)]
60    {
61        use std::os::unix::fs::PermissionsExt;
62        if let Ok(meta) = tokio::fs::metadata(&inbox_dir).await {
63            let mut perms = meta.permissions();
64            perms.set_mode(0o700);
65            let _ = tokio::fs::set_permissions(&inbox_dir, perms).await;
66        }
67    }
68    scan_inbox(&inbox_dir, &queue).await;
69
70    // Use a tokio channel so the async runtime isn't blocked
71    let (async_tx, mut async_rx) = tokio::sync::mpsc::unbounded_channel::<Vec<PathBuf>>();
72
73    // Shutdown signal passed in from caller
74    let shutdown_flag = shutdown.clone();
75
76    // Spawn the blocking notify watcher on a dedicated thread
77    let inbox_clone = inbox_dir.clone();
78    let _watcher_handle = tokio::task::spawn_blocking(move || {
79        let (tx, rx) = std::sync::mpsc::channel();
80        let mut _watcher: notify::RecommendedWatcher = match notify::RecommendedWatcher::new(
81            tx,
82            notify::Config::default(),
83        ) {
84            Ok(w) => w,
85            Err(e) => {
86                tracing::warn!("inotify unavailable: {}", e);
87                return;
88            }
89        };
90
91        if let Err(e) = _watcher.watch(&inbox_clone, notify::RecursiveMode::NonRecursive) {
92            tracing::warn!("watch failed: {}", e);
93            return;
94        }
95        tracing::info!("Inbox watcher (inotify) on {}", inbox_clone.display());
96
97        // Blocking loop — checks shutdown flag every 500ms
98        loop {
99            if shutdown_flag.load(std::sync::atomic::Ordering::Relaxed) {
100                break;
101            }
102            match rx.recv_timeout(std::time::Duration::from_millis(500)) {
103                Ok(Ok(event)) => {
104                    if !event.paths.is_empty() {
105                        let _ = async_tx.send(event.paths);
106                    }
107                }
108                Ok(Err(e)) => tracing::warn!("notify error: {}", e),
109                Err(std::sync::mpsc::RecvTimeoutError::Timeout) => continue,
110                Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => {
111                    break;
112                }
113            }
114        }
115    });
116
117    // Async loop — receives paths from the blocking watcher thread
118    let queue_ref = &queue;
119    let dir_ref = &inbox_dir;
120    loop {
121        if shutdown.load(std::sync::atomic::Ordering::Relaxed) {
122            break;
123        }
124        tokio::select! {
125            result = async_rx.recv() => {
126                match result {
127                    Some(paths) => {
128                        for path in &paths {
129                            process_file(path, queue_ref).await;
130                        }
131                        // Sweep for any files inotify missed in the batch
132                        scan_inbox(dir_ref, queue_ref).await;
133                    }
134                    None => break, // Watcher thread exited, channel closed
135                }
136            }
137            // Safety scan every 2s in case inotify misses something
138            _ = tokio::time::sleep(std::time::Duration::from_secs(2)) => {
139                if shutdown.load(std::sync::atomic::Ordering::Relaxed) {
140                    break;
141                }
142                scan_inbox(dir_ref, queue_ref).await;
143            }
144        }
145    }
146}
147
148
149#[cfg(test)]
150mod tests {
151    use super::*;
152    use crate::events::{Event, Severity};
153
154    #[tokio::test]
155    async fn picks_up_dropped_event() {
156        let dir = tempfile::tempdir().unwrap();
157        let inbox = dir.path().to_path_buf();
158        let queue = Arc::new(EventQueue::new(10));
159        let q = queue.clone();
160        let ibx = inbox.clone();
161        let shutdown = Arc::new(std::sync::atomic::AtomicBool::new(false));
162        let shutdown_for_task = shutdown.clone();
163        let handle = tokio::spawn(async move { watch_inbox(ibx, q, shutdown_for_task).await });
164
165        tokio::time::sleep(std::time::Duration::from_millis(300)).await;
166        let event = Event::simple("test", "hello inbox", Some(Severity::High));
167        let path = inbox.join("1.json");
168        tokio::fs::write(&path, serde_json::to_string(&event).unwrap()).await.unwrap();
169
170        for _ in 0..30 {
171            if queue.len() > 0 { break; }
172            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
173        }
174        shutdown.store(true, std::sync::atomic::Ordering::Relaxed);
175        let _ = tokio::time::timeout(std::time::Duration::from_secs(2), handle).await;
176        let popped = queue.pop().expect("event should have been ingested");
177        assert_eq!(popped.content.text, "hello inbox");
178    }
179
180    #[tokio::test]
181    async fn invalid_json_moved_to_error() {
182        let dir = tempfile::tempdir().unwrap();
183        let inbox = dir.path().to_path_buf();
184        let queue = Arc::new(EventQueue::new(10));
185        let q = queue.clone();
186        let ibx = inbox.clone();
187        let shutdown = Arc::new(std::sync::atomic::AtomicBool::new(false));
188        let shutdown_for_task = shutdown.clone();
189        let handle = tokio::spawn(async move { watch_inbox(ibx, q, shutdown_for_task).await });
190
191        tokio::time::sleep(std::time::Duration::from_millis(300)).await;
192        let path = inbox.join("bad.json");
193        tokio::fs::write(&path, "not json").await.unwrap();
194        let err_path = inbox.join("bad.json.error");
195        for _ in 0..30 {
196            if err_path.exists() { break; }
197            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
198        }
199        shutdown.store(true, std::sync::atomic::Ordering::Relaxed);
200        let _ = tokio::time::timeout(std::time::Duration::from_secs(2), handle).await;
201        assert!(err_path.exists());
202        assert_eq!(queue.len(), 0);
203    }
204}