swarm-engine-core 0.1.6

Core types and orchestration for SwarmEngine
Documentation
//! Event source trait and implementations.

use notify::{Config, RecommendedWatcher, RecursiveMode, Watcher};
use std::path::{Path, PathBuf};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::sync::mpsc;

/// Event emitted by a source.
#[derive(Debug, Clone)]
pub struct WatchEvent {
    /// Scenario name extracted from the path.
    pub scenario: String,
    /// Original paths that triggered the event.
    pub paths: Vec<PathBuf>,
    /// Timestamp when the event occurred.
    pub timestamp: u64,
}

impl WatchEvent {
    /// Create a new watch event.
    pub fn new(scenario: String) -> Self {
        Self {
            scenario,
            paths: vec![],
            timestamp: SystemTime::now()
                .duration_since(UNIX_EPOCH)
                .unwrap_or_default()
                .as_secs(),
        }
    }

    /// Create with paths.
    pub fn with_paths(scenario: String, paths: Vec<PathBuf>) -> Self {
        Self {
            scenario,
            paths,
            timestamp: SystemTime::now()
                .duration_since(UNIX_EPOCH)
                .unwrap_or_default()
                .as_secs(),
        }
    }
}

/// Trait for event sources (pull-based).
pub trait EventSource: Send {
    /// Event type produced by this source.
    type Event: Send;

    /// Get the next event (async, returns None when exhausted).
    fn next(&mut self) -> impl std::future::Future<Output = Option<Self::Event>> + Send;
}

/// File system watcher source using notify crate.
///
/// Watches a directory for file changes and emits [`WatchEvent`]s.
///
/// # Expected Directory Structure
///
/// ```text
/// {watch_dir}/
/// └── scenarios/
///     └── {scenario_name}/
///         └── sessions/
///             └── {timestamp}/
///                 └── stats.json  <-- triggers event
/// ```
///
/// When a file is created or modified under `sessions/`, this source
/// extracts the scenario name from the path and emits a `WatchEvent`.
pub struct LocalFileWatcherSource {
    rx: mpsc::Receiver<WatchEvent>,
    _watcher: RecommendedWatcher,
}

impl LocalFileWatcherSource {
    /// Create a new file watcher source.
    ///
    /// # Arguments
    /// * `watch_dir` - Directory to watch for changes (e.g., `~/.swarm-engine/learning`)
    /// * `poll_interval` - How often to poll for changes (for polling-based backends)
    ///
    /// # Platform Notes
    /// On macOS/Linux with native FS events, `poll_interval` is only used as a fallback.
    pub fn new(watch_dir: &Path, poll_interval: Duration) -> Result<Self, notify::Error> {
        let (tx, rx) = mpsc::channel(64);

        let tx_clone = tx.clone();
        let mut watcher = RecommendedWatcher::new(
            move |res: Result<notify::Event, notify::Error>| {
                if let Ok(event) = res {
                    // Only process write/create events
                    if matches!(
                        event.kind,
                        notify::EventKind::Create(_) | notify::EventKind::Modify(_)
                    ) {
                        if let Some(scenario) = extract_scenario_from_paths(&event.paths) {
                            let watch_event = WatchEvent::with_paths(scenario, event.paths);
                            let _ = tx_clone.blocking_send(watch_event);
                        }
                    }
                }
            },
            Config::default().with_poll_interval(poll_interval),
        )?;

        watcher.watch(watch_dir, RecursiveMode::Recursive)?;

        Ok(Self {
            rx,
            _watcher: watcher,
        })
    }
}

impl EventSource for LocalFileWatcherSource {
    type Event = WatchEvent;

    async fn next(&mut self) -> Option<Self::Event> {
        self.rx.recv().await
    }
}

/// Extract scenario name from file paths.
///
/// Looks for the pattern `scenarios/{scenario_name}/sessions/` in the path
/// and extracts `scenario_name`. Handles scenario names with spaces or
/// special characters (as long as they're valid filesystem names).
///
/// # Expected Patterns
/// - `.../scenarios/troubleshooting/sessions/...` → `"troubleshooting"`
/// - `.../scenarios/Service Troubleshooting/sessions/...` → `"Service Troubleshooting"`
///
/// # Returns
/// - `Some(scenario_name)` if pattern found
/// - `None` if path doesn't match expected structure
fn extract_scenario_from_paths(paths: &[PathBuf]) -> Option<String> {
    for path in paths {
        let components: Vec<_> = path.components().collect();
        for (i, comp) in components.iter().enumerate() {
            if let std::path::Component::Normal(s) = comp {
                if s.to_string_lossy() == "scenarios" {
                    // Check if there's a scenario name followed by "sessions"
                    if let Some(std::path::Component::Normal(scenario)) = components.get(i + 1) {
                        // Verify the structure: scenarios/{name}/sessions/...
                        if let Some(std::path::Component::Normal(sessions)) = components.get(i + 2)
                        {
                            if sessions.to_string_lossy() == "sessions" {
                                return Some(scenario.to_string_lossy().into_owned());
                            }
                        }
                    }
                }
            }
        }
    }
    None
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_extract_scenario_from_paths() {
        // Standard case
        let paths = vec![PathBuf::from(
            "/home/user/.swarm-engine/learning/scenarios/troubleshooting/sessions/123/stats.json",
        )];
        assert_eq!(
            extract_scenario_from_paths(&paths),
            Some("troubleshooting".to_string())
        );

        // Scenario name with spaces
        let paths_with_spaces = vec![PathBuf::from(
            "/home/user/.swarm-engine/learning/scenarios/Service Troubleshooting/sessions/456/stats.json",
        )];
        assert_eq!(
            extract_scenario_from_paths(&paths_with_spaces),
            Some("Service Troubleshooting".to_string())
        );

        // Empty paths
        let empty: Vec<PathBuf> = vec![];
        assert_eq!(extract_scenario_from_paths(&empty), None);

        // No scenario structure
        let no_scenario = vec![PathBuf::from("/home/user/random/file.txt")];
        assert_eq!(extract_scenario_from_paths(&no_scenario), None);

        // Has "scenarios" but not "sessions" - should not match
        let no_sessions = vec![PathBuf::from(
            "/home/user/.swarm-engine/learning/scenarios/troubleshooting/stats.json",
        )];
        assert_eq!(extract_scenario_from_paths(&no_sessions), None);
    }

    #[test]
    fn test_watch_event_new() {
        let event = WatchEvent::new("test_scenario".into());
        assert_eq!(event.scenario, "test_scenario");
        assert!(event.paths.is_empty());
        assert!(event.timestamp > 0);
    }
}