bzzz-core 0.1.0

Bzzz core library - Declarative orchestration engine for AI Agents
Documentation
//! Configuration File Watcher
//!
//! Monitors configuration files for changes and triggers reloads.

use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;

use tokio::sync::RwLock;
use tokio::time::interval;

/// Configuration change callback type
pub type ConfigChangeCallback = Arc<dyn Fn(PathBuf) + Send + Sync>;

/// Configuration watcher state
#[derive(Debug, Clone)]
struct WatchedFile {
    path: PathBuf,
    last_modified: Option<std::time::SystemTime>,
}

/// Configuration file watcher
pub struct ConfigWatcher {
    watched_files: Arc<RwLock<Vec<WatchedFile>>>,
    callback: ConfigChangeCallback,
    poll_interval: Duration,
    running: Arc<RwLock<bool>>,
}

impl ConfigWatcher {
    /// Create a new configuration watcher
    pub fn new(callback: ConfigChangeCallback) -> Self {
        ConfigWatcher {
            watched_files: Arc::new(RwLock::new(Vec::new())),
            callback,
            poll_interval: Duration::from_secs(1),
            running: Arc::new(RwLock::new(false)),
        }
    }

    /// Set the poll interval
    pub fn with_poll_interval(mut self, interval: Duration) -> Self {
        self.poll_interval = interval;
        self
    }

    /// Add a file to watch
    pub async fn watch(&self, path: PathBuf) {
        let modified = std::fs::metadata(&path)
            .ok()
            .and_then(|m| m.modified().ok());

        let file = WatchedFile {
            path,
            last_modified: modified,
        };

        let mut files = self.watched_files.write().await;
        files.push(file);
    }

    /// Remove a file from watching
    pub async fn unwatch(&self, path: &PathBuf) {
        let mut files = self.watched_files.write().await;
        files.retain(|f| &f.path != path);
    }

    /// Start watching for changes
    pub async fn start(&self) {
        let mut running = self.running.write().await;
        if *running {
            return;
        }
        *running = true;
        drop(running);

        let watched_files = self.watched_files.clone();
        let callback = self.callback.clone();
        let poll_interval = self.poll_interval;
        let running_flag = self.running.clone();

        tokio::spawn(async move {
            let mut timer = interval(poll_interval);

            loop {
                timer.tick().await;

                // Check if we should stop
                {
                    let running = running_flag.read().await;
                    if !*running {
                        break;
                    }
                }

                // Check each watched file
                let mut files = watched_files.write().await;
                for file in files.iter_mut() {
                    let current_modified = std::fs::metadata(&file.path)
                        .ok()
                        .and_then(|m| m.modified().ok());

                    match (file.last_modified, current_modified) {
                        (Some(last), Some(current)) => {
                            if current > last {
                                file.last_modified = Some(current);
                                callback(file.path.clone());
                            }
                        }
                        (None, Some(current)) => {
                            file.last_modified = Some(current);
                            callback(file.path.clone());
                        }
                        _ => {}
                    }
                }
            }
        });
    }

    /// Stop watching
    pub async fn stop(&self) {
        let mut running = self.running.write().await;
        *running = false;
    }

    /// Check if watcher is running
    pub async fn is_running(&self) -> bool {
        *self.running.read().await
    }

    /// Get list of watched files
    pub async fn watched_files(&self) -> Vec<PathBuf> {
        self.watched_files
            .read()
            .await
            .iter()
            .map(|f| f.path.clone())
            .collect()
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::sync::atomic::{AtomicUsize, Ordering};

    #[tokio::test]
    async fn test_config_watcher_creation() {
        let counter = Arc::new(AtomicUsize::new(0));
        let counter_clone = counter.clone();

        let callback = Arc::new(move |_path: PathBuf| {
            counter_clone.fetch_add(1, Ordering::SeqCst);
        });

        let watcher = ConfigWatcher::new(callback);
        assert!(!watcher.is_running().await);
    }

    #[tokio::test]
    async fn test_watch_file() {
        let counter = Arc::new(AtomicUsize::new(0));
        let counter_clone = counter.clone();

        let callback = Arc::new(move |_path: PathBuf| {
            counter_clone.fetch_add(1, Ordering::SeqCst);
        });

        let watcher = ConfigWatcher::new(callback);
        let temp_file = std::env::temp_dir().join("bzzz-test-config.yaml");

        watcher.watch(temp_file.clone()).await;
        let files = watcher.watched_files().await;
        assert_eq!(files.len(), 1);
    }

    #[tokio::test]
    async fn test_unwatch_file() {
        let counter = Arc::new(AtomicUsize::new(0));
        let counter_clone = counter.clone();

        let callback = Arc::new(move |_path: PathBuf| {
            counter_clone.fetch_add(1, Ordering::SeqCst);
        });

        let watcher = ConfigWatcher::new(callback);
        let temp_file = std::env::temp_dir().join("bzzz-test-config.yaml");

        watcher.watch(temp_file.clone()).await;
        assert_eq!(watcher.watched_files().await.len(), 1);

        watcher.unwatch(&temp_file).await;
        assert_eq!(watcher.watched_files().await.len(), 0);
    }
}