Skip to main content

swarm_engine_core/pipeline/
source.rs

1//! Event source trait and implementations.
2
3use notify::{Config, RecommendedWatcher, RecursiveMode, Watcher};
4use std::path::{Path, PathBuf};
5use std::time::{Duration, SystemTime, UNIX_EPOCH};
6use tokio::sync::mpsc;
7
8/// Event emitted by a source.
9#[derive(Debug, Clone)]
10pub struct WatchEvent {
11    /// Scenario name extracted from the path.
12    pub scenario: String,
13    /// Original paths that triggered the event.
14    pub paths: Vec<PathBuf>,
15    /// Timestamp when the event occurred.
16    pub timestamp: u64,
17}
18
19impl WatchEvent {
20    /// Create a new watch event.
21    pub fn new(scenario: String) -> Self {
22        Self {
23            scenario,
24            paths: vec![],
25            timestamp: SystemTime::now()
26                .duration_since(UNIX_EPOCH)
27                .unwrap_or_default()
28                .as_secs(),
29        }
30    }
31
32    /// Create with paths.
33    pub fn with_paths(scenario: String, paths: Vec<PathBuf>) -> Self {
34        Self {
35            scenario,
36            paths,
37            timestamp: SystemTime::now()
38                .duration_since(UNIX_EPOCH)
39                .unwrap_or_default()
40                .as_secs(),
41        }
42    }
43}
44
45/// Trait for event sources (pull-based).
46pub trait EventSource: Send {
47    /// Event type produced by this source.
48    type Event: Send;
49
50    /// Get the next event (async, returns None when exhausted).
51    fn next(&mut self) -> impl std::future::Future<Output = Option<Self::Event>> + Send;
52}
53
54/// File system watcher source using notify crate.
55///
56/// Watches a directory for file changes and emits [`WatchEvent`]s.
57///
58/// # Expected Directory Structure
59///
60/// ```text
61/// {watch_dir}/
62/// └── scenarios/
63///     └── {scenario_name}/
64///         └── sessions/
65///             └── {timestamp}/
66///                 └── stats.json  <-- triggers event
67/// ```
68///
69/// When a file is created or modified under `sessions/`, this source
70/// extracts the scenario name from the path and emits a `WatchEvent`.
71pub struct LocalFileWatcherSource {
72    rx: mpsc::Receiver<WatchEvent>,
73    _watcher: RecommendedWatcher,
74}
75
76impl LocalFileWatcherSource {
77    /// Create a new file watcher source.
78    ///
79    /// # Arguments
80    /// * `watch_dir` - Directory to watch for changes (e.g., `~/.swarm-engine/learning`)
81    /// * `poll_interval` - How often to poll for changes (for polling-based backends)
82    ///
83    /// # Platform Notes
84    /// On macOS/Linux with native FS events, `poll_interval` is only used as a fallback.
85    pub fn new(watch_dir: &Path, poll_interval: Duration) -> Result<Self, notify::Error> {
86        let (tx, rx) = mpsc::channel(64);
87
88        let tx_clone = tx.clone();
89        let mut watcher = RecommendedWatcher::new(
90            move |res: Result<notify::Event, notify::Error>| {
91                if let Ok(event) = res {
92                    // Only process write/create events
93                    if matches!(
94                        event.kind,
95                        notify::EventKind::Create(_) | notify::EventKind::Modify(_)
96                    ) {
97                        if let Some(scenario) = extract_scenario_from_paths(&event.paths) {
98                            let watch_event = WatchEvent::with_paths(scenario, event.paths);
99                            let _ = tx_clone.blocking_send(watch_event);
100                        }
101                    }
102                }
103            },
104            Config::default().with_poll_interval(poll_interval),
105        )?;
106
107        watcher.watch(watch_dir, RecursiveMode::Recursive)?;
108
109        Ok(Self {
110            rx,
111            _watcher: watcher,
112        })
113    }
114}
115
116impl EventSource for LocalFileWatcherSource {
117    type Event = WatchEvent;
118
119    async fn next(&mut self) -> Option<Self::Event> {
120        self.rx.recv().await
121    }
122}
123
124/// Extract scenario name from file paths.
125///
126/// Looks for the pattern `scenarios/{scenario_name}/sessions/` in the path
127/// and extracts `scenario_name`. Handles scenario names with spaces or
128/// special characters (as long as they're valid filesystem names).
129///
130/// # Expected Patterns
131/// - `.../scenarios/troubleshooting/sessions/...` → `"troubleshooting"`
132/// - `.../scenarios/Service Troubleshooting/sessions/...` → `"Service Troubleshooting"`
133///
134/// # Returns
135/// - `Some(scenario_name)` if pattern found
136/// - `None` if path doesn't match expected structure
137fn extract_scenario_from_paths(paths: &[PathBuf]) -> Option<String> {
138    for path in paths {
139        let components: Vec<_> = path.components().collect();
140        for (i, comp) in components.iter().enumerate() {
141            if let std::path::Component::Normal(s) = comp {
142                if s.to_string_lossy() == "scenarios" {
143                    // Check if there's a scenario name followed by "sessions"
144                    if let Some(std::path::Component::Normal(scenario)) = components.get(i + 1) {
145                        // Verify the structure: scenarios/{name}/sessions/...
146                        if let Some(std::path::Component::Normal(sessions)) = components.get(i + 2)
147                        {
148                            if sessions.to_string_lossy() == "sessions" {
149                                return Some(scenario.to_string_lossy().into_owned());
150                            }
151                        }
152                    }
153                }
154            }
155        }
156    }
157    None
158}
159
160#[cfg(test)]
161mod tests {
162    use super::*;
163
164    #[test]
165    fn test_extract_scenario_from_paths() {
166        // Standard case
167        let paths = vec![PathBuf::from(
168            "/home/user/.swarm-engine/learning/scenarios/troubleshooting/sessions/123/stats.json",
169        )];
170        assert_eq!(
171            extract_scenario_from_paths(&paths),
172            Some("troubleshooting".to_string())
173        );
174
175        // Scenario name with spaces
176        let paths_with_spaces = vec![PathBuf::from(
177            "/home/user/.swarm-engine/learning/scenarios/Service Troubleshooting/sessions/456/stats.json",
178        )];
179        assert_eq!(
180            extract_scenario_from_paths(&paths_with_spaces),
181            Some("Service Troubleshooting".to_string())
182        );
183
184        // Empty paths
185        let empty: Vec<PathBuf> = vec![];
186        assert_eq!(extract_scenario_from_paths(&empty), None);
187
188        // No scenario structure
189        let no_scenario = vec![PathBuf::from("/home/user/random/file.txt")];
190        assert_eq!(extract_scenario_from_paths(&no_scenario), None);
191
192        // Has "scenarios" but not "sessions" - should not match
193        let no_sessions = vec![PathBuf::from(
194            "/home/user/.swarm-engine/learning/scenarios/troubleshooting/stats.json",
195        )];
196        assert_eq!(extract_scenario_from_paths(&no_sessions), None);
197    }
198
199    #[test]
200    fn test_watch_event_new() {
201        let event = WatchEvent::new("test_scenario".into());
202        assert_eq!(event.scenario, "test_scenario");
203        assert!(event.paths.is_empty());
204        assert!(event.timestamp > 0);
205    }
206}