synaps_cli/events/
ingest.rs1use std::path::{Path, PathBuf};
5use std::sync::Arc;
6
7use super::queue::EventQueue;
8use super::types::Event;
9use notify::Watcher;
10
11async fn process_file(path: &Path, queue: &EventQueue) {
12 #[cfg(unix)]
13 {
14 if let Ok(meta) = tokio::fs::symlink_metadata(path).await {
15 if meta.file_type().is_symlink() {
16 tracing::warn!("refusing symlink in inbox: {}", path.display());
17 let _ = tokio::fs::remove_file(path).await;
18 return;
19 }
20 }
21 }
22 if path.extension().is_some_and(|e| e == "json") {
23 if let Ok(meta) = tokio::fs::metadata(path).await {
24 if meta.len() > 256 * 1024 {
25 tracing::warn!("inbox file too large ({}B), skipping: {}", meta.len(), path.display());
26 let _ = tokio::fs::rename(path, path.with_extension("json.oversized")).await;
27 return;
28 }
29 }
30 match tokio::fs::read_to_string(path).await {
31 Ok(content) => match serde_json::from_str::<Event>(&content) {
32 Ok(event) => {
33 tracing::info!("Inbox event: {} from {}", event.id, event.source.source_type);
34 match queue.push(event) {
35 Ok(()) => { let _ = tokio::fs::remove_file(path).await; }
36 Err(e) => { tracing::warn!("Queue full, retry later: {}", e); }
37 }
38 }
39 Err(e) => {
40 tracing::warn!("Invalid event {}: {}", path.display(), e);
41 let _ = tokio::fs::rename(path, path.with_extension("json.error")).await;
42 }
43 },
44 Err(e) => tracing::warn!("Read failed {}: {}", path.display(), e),
45 }
46 }
47}
48
49async fn scan_inbox(dir: &Path, queue: &EventQueue) {
50 if let Ok(mut entries) = tokio::fs::read_dir(dir).await {
51 while let Ok(Some(entry)) = entries.next_entry().await {
52 process_file(&entry.path(), queue).await;
53 }
54 }
55}
56
57pub async fn watch_inbox(inbox_dir: PathBuf, queue: Arc<EventQueue>, shutdown: Arc<std::sync::atomic::AtomicBool>) {
58 let _ = tokio::fs::create_dir_all(&inbox_dir).await;
59 #[cfg(unix)]
60 {
61 use std::os::unix::fs::PermissionsExt;
62 if let Ok(meta) = tokio::fs::metadata(&inbox_dir).await {
63 let mut perms = meta.permissions();
64 perms.set_mode(0o700);
65 let _ = tokio::fs::set_permissions(&inbox_dir, perms).await;
66 }
67 }
68 scan_inbox(&inbox_dir, &queue).await;
69
70 let (async_tx, mut async_rx) = tokio::sync::mpsc::unbounded_channel::<Vec<PathBuf>>();
72
73 let shutdown_flag = shutdown.clone();
75
76 let inbox_clone = inbox_dir.clone();
78 let _watcher_handle = tokio::task::spawn_blocking(move || {
79 let (tx, rx) = std::sync::mpsc::channel();
80 let mut _watcher: notify::RecommendedWatcher = match notify::RecommendedWatcher::new(
81 tx,
82 notify::Config::default(),
83 ) {
84 Ok(w) => w,
85 Err(e) => {
86 tracing::warn!("inotify unavailable: {}", e);
87 return;
88 }
89 };
90
91 if let Err(e) = _watcher.watch(&inbox_clone, notify::RecursiveMode::NonRecursive) {
92 tracing::warn!("watch failed: {}", e);
93 return;
94 }
95 tracing::info!("Inbox watcher (inotify) on {}", inbox_clone.display());
96
97 loop {
99 if shutdown_flag.load(std::sync::atomic::Ordering::Relaxed) {
100 break;
101 }
102 match rx.recv_timeout(std::time::Duration::from_millis(500)) {
103 Ok(Ok(event)) => {
104 if !event.paths.is_empty() {
105 let _ = async_tx.send(event.paths);
106 }
107 }
108 Ok(Err(e)) => tracing::warn!("notify error: {}", e),
109 Err(std::sync::mpsc::RecvTimeoutError::Timeout) => continue,
110 Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => {
111 break;
112 }
113 }
114 }
115 });
116
117 let queue_ref = &queue;
119 let dir_ref = &inbox_dir;
120 loop {
121 if shutdown.load(std::sync::atomic::Ordering::Relaxed) {
122 break;
123 }
124 tokio::select! {
125 result = async_rx.recv() => {
126 match result {
127 Some(paths) => {
128 for path in &paths {
129 process_file(path, queue_ref).await;
130 }
131 scan_inbox(dir_ref, queue_ref).await;
133 }
134 None => break, }
136 }
137 _ = tokio::time::sleep(std::time::Duration::from_secs(2)) => {
139 if shutdown.load(std::sync::atomic::Ordering::Relaxed) {
140 break;
141 }
142 scan_inbox(dir_ref, queue_ref).await;
143 }
144 }
145 }
146}
147
148
149#[cfg(test)]
150mod tests {
151 use super::*;
152 use crate::events::{Event, Severity};
153
154 #[tokio::test]
155 async fn picks_up_dropped_event() {
156 let dir = tempfile::tempdir().unwrap();
157 let inbox = dir.path().to_path_buf();
158 let queue = Arc::new(EventQueue::new(10));
159 let q = queue.clone();
160 let ibx = inbox.clone();
161 let shutdown = Arc::new(std::sync::atomic::AtomicBool::new(false));
162 let shutdown_for_task = shutdown.clone();
163 let handle = tokio::spawn(async move { watch_inbox(ibx, q, shutdown_for_task).await });
164
165 tokio::time::sleep(std::time::Duration::from_millis(300)).await;
166 let event = Event::simple("test", "hello inbox", Some(Severity::High));
167 let path = inbox.join("1.json");
168 tokio::fs::write(&path, serde_json::to_string(&event).unwrap()).await.unwrap();
169
170 for _ in 0..30 {
171 if queue.len() > 0 { break; }
172 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
173 }
174 shutdown.store(true, std::sync::atomic::Ordering::Relaxed);
175 let _ = tokio::time::timeout(std::time::Duration::from_secs(2), handle).await;
176 let popped = queue.pop().expect("event should have been ingested");
177 assert_eq!(popped.content.text, "hello inbox");
178 }
179
180 #[tokio::test]
181 async fn invalid_json_moved_to_error() {
182 let dir = tempfile::tempdir().unwrap();
183 let inbox = dir.path().to_path_buf();
184 let queue = Arc::new(EventQueue::new(10));
185 let q = queue.clone();
186 let ibx = inbox.clone();
187 let shutdown = Arc::new(std::sync::atomic::AtomicBool::new(false));
188 let shutdown_for_task = shutdown.clone();
189 let handle = tokio::spawn(async move { watch_inbox(ibx, q, shutdown_for_task).await });
190
191 tokio::time::sleep(std::time::Duration::from_millis(300)).await;
192 let path = inbox.join("bad.json");
193 tokio::fs::write(&path, "not json").await.unwrap();
194 let err_path = inbox.join("bad.json.error");
195 for _ in 0..30 {
196 if err_path.exists() { break; }
197 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
198 }
199 shutdown.store(true, std::sync::atomic::Ordering::Relaxed);
200 let _ = tokio::time::timeout(std::time::Duration::from_secs(2), handle).await;
201 assert!(err_path.exists());
202 assert_eq!(queue.len(), 0);
203 }
204}