Skip to main content

mockforge_core/
sync_watcher.rs

1//! File system watcher for bidirectional directory sync
2//!
3//! This module provides real-time file system monitoring for bidirectional
4//! sync between workspaces and external directories.
5
6use crate::workspace_persistence::WorkspacePersistence;
7use crate::{Error, Result};
8use notify::{Config, Event, RecommendedWatcher, Watcher};
9use std::collections::HashMap;
10use std::path::{Path, PathBuf};
11use std::sync::Arc;
12use tokio::sync::{mpsc, Mutex};
13use tokio::time::{timeout, Duration};
14use tracing::{debug, error, info, warn};
15
16/// File system watcher for workspace sync
17pub struct SyncWatcher {
18    /// Active watchers by workspace ID
19    watchers: HashMap<String, RecommendedWatcher>,
20    /// Running state
21    running: Arc<Mutex<bool>>,
22    /// Persistence layer
23    persistence: Arc<WorkspacePersistence>,
24}
25
26/// File system synchronization events for workspace monitoring
27#[derive(Debug, Clone)]
28pub enum SyncEvent {
29    /// A new file was created in the watched directory
30    FileCreated {
31        /// Workspace ID this file belongs to
32        workspace_id: String,
33        /// Path to the created file
34        path: PathBuf,
35        /// Contents of the created file
36        content: String,
37    },
38    /// An existing file was modified in the watched directory
39    FileModified {
40        /// Workspace ID this file belongs to
41        workspace_id: String,
42        /// Path to the modified file
43        path: PathBuf,
44        /// Updated contents of the file
45        content: String,
46    },
47    /// A file was deleted from the watched directory
48    FileDeleted {
49        /// Workspace ID this file belonged to
50        workspace_id: String,
51        /// Path to the deleted file
52        path: PathBuf,
53    },
54    /// Multiple directory changes detected (batched summary)
55    DirectoryChanged {
56        /// Workspace ID where changes occurred
57        workspace_id: String,
58        /// List of file changes detected
59        changes: Vec<FileChange>,
60    },
61}
62
63/// Represents a single file change in the watched directory
64#[derive(Debug, Clone)]
65pub struct FileChange {
66    /// Path to the file that changed
67    pub path: PathBuf,
68    /// Type of change that occurred
69    pub kind: ChangeKind,
70    /// Optional file contents (for created/modified events)
71    pub content: Option<String>,
72}
73
74/// Type of file system change detected
75#[derive(Debug, Clone)]
76pub enum ChangeKind {
77    /// File was created
78    Created,
79    /// File was modified
80    Modified,
81    /// File was deleted
82    Deleted,
83}
84
85impl SyncWatcher {
86    /// Create a new sync watcher
87    pub fn new<P: AsRef<Path>>(workspace_dir: P) -> Self {
88        let persistence = Arc::new(WorkspacePersistence::new(workspace_dir));
89
90        Self {
91            watchers: HashMap::new(),
92            running: Arc::new(Mutex::new(false)),
93            persistence,
94        }
95    }
96
97    /// Start monitoring a workspace directory
98    pub async fn start_monitoring(&mut self, workspace_id: &str, directory: &str) -> Result<()> {
99        let directory_path = PathBuf::from(directory);
100
101        // Ensure directory exists
102        if !directory_path.exists() {
103            std::fs::create_dir_all(&directory_path)
104                .map_err(|e| Error::generic(format!("Failed to create sync directory: {}", e)))?;
105        }
106
107        let (tx, mut rx) = mpsc::channel(100);
108        let workspace_id_string = workspace_id.to_string();
109        let workspace_id_for_watcher = workspace_id_string.clone();
110        let workspace_id_for_processing = workspace_id_string.clone();
111        let directory_path_clone = directory_path.clone();
112        let directory_path_for_processing = directory_path.clone();
113        let directory_str = directory.to_string();
114
115        let config = Config::default()
116            .with_poll_interval(Duration::from_secs(1))
117            .with_compare_contents(true);
118
119        let mut watcher = RecommendedWatcher::new(
120            move |res: notify::Result<Event>| {
121                if let Ok(event) = res {
122                    debug!("File system event: {:?}", event);
123                    let tx_clone = tx.clone();
124                    let workspace_id_clone = workspace_id_string.clone();
125                    let dir_clone = directory_path_clone.clone();
126
127                    tokio::spawn(async move {
128                        if let Err(e) = Self::handle_fs_event(
129                            &tx_clone,
130                            &workspace_id_clone,
131                            &dir_clone,
132                            &event,
133                        )
134                        .await
135                        {
136                            error!("Failed to handle file system event: {}", e);
137                        }
138                    });
139                }
140            },
141            config,
142        )
143        .map_err(|e| Error::generic(format!("Failed to create file watcher: {}", e)))?;
144
145        // Watch the directory recursively
146        watcher
147            .watch(&directory_path, notify::RecursiveMode::Recursive)
148            .map_err(|e| Error::generic(format!("Failed to watch directory: {}", e)))?;
149
150        // Store the watcher
151        self.watchers.insert(workspace_id_for_watcher, watcher);
152
153        // Start processing events
154        let persistence_clone = self.persistence.clone();
155        let is_running = self.running.clone();
156
157        tokio::spawn(async move {
158            info!(
159                "Started monitoring workspace {} in directory {}",
160                workspace_id_for_processing, directory_str
161            );
162            info!(
163                workspace_id = %workspace_id_for_processing,
164                directory = %directory_str,
165                "Monitoring workspace directory"
166            );
167
168            while *is_running.lock().await {
169                match timeout(Duration::from_millis(100), rx.recv()).await {
170                    Ok(Some(event)) => {
171                        if let Err(e) = Self::process_sync_event(
172                            &persistence_clone,
173                            &workspace_id_for_processing,
174                            &directory_path_for_processing,
175                            event,
176                        )
177                        .await
178                        {
179                            error!("Failed to process sync event: {}", e);
180                        }
181                    }
182                    Ok(None) => break,  // Channel closed
183                    Err(_) => continue, // Timeout, continue monitoring
184                }
185            }
186
187            info!(
188                "Stopped monitoring workspace {} in directory {}",
189                workspace_id_for_processing, directory_str
190            );
191            info!(
192                workspace_id = %workspace_id_for_processing,
193                directory = %directory_str,
194                "Stopped monitoring workspace directory"
195            );
196        });
197
198        Ok(())
199    }
200
201    /// Stop monitoring a workspace
202    pub async fn stop_monitoring(&mut self, workspace_id: &str) -> Result<()> {
203        if let Some(watcher) = self.watchers.remove(workspace_id) {
204            // Dropping the watcher will stop it
205            drop(watcher);
206        }
207
208        Ok(())
209    }
210
211    /// Stop all monitoring
212    pub async fn stop_all(&mut self) -> Result<()> {
213        *self.running.lock().await = false;
214        self.watchers.clear();
215        Ok(())
216    }
217
218    /// Handle a file system event
219    async fn handle_fs_event(
220        tx: &mpsc::Sender<SyncEvent>,
221        workspace_id: &str,
222        base_dir: &Path,
223        event: &Event,
224    ) -> Result<()> {
225        let mut changes = Vec::new();
226
227        for path in &event.paths {
228            // Make path relative to the watched directory
229            let relative_path = path.strip_prefix(base_dir).unwrap_or(path);
230
231            // Skip metadata files and temporary files
232            if relative_path.starts_with(".")
233                || relative_path
234                    .file_name()
235                    .map(|n| n.to_string_lossy().starts_with("."))
236                    .unwrap_or(false)
237            {
238                continue;
239            }
240
241            // Only process YAML files
242            if let Some(extension) = path.extension() {
243                if extension != "yaml" && extension != "yml" {
244                    continue;
245                }
246            }
247
248            match event.kind {
249                notify::EventKind::Create(_) => {
250                    if let Ok(content) = tokio::fs::read_to_string(&path).await {
251                        changes.push(FileChange {
252                            path: relative_path.to_path_buf(),
253                            kind: ChangeKind::Created,
254                            content: Some(content),
255                        });
256                    }
257                }
258                notify::EventKind::Modify(_) => {
259                    if let Ok(content) = tokio::fs::read_to_string(&path).await {
260                        changes.push(FileChange {
261                            path: relative_path.to_path_buf(),
262                            kind: ChangeKind::Modified,
263                            content: Some(content),
264                        });
265                    }
266                }
267                notify::EventKind::Remove(_) => {
268                    changes.push(FileChange {
269                        path: relative_path.to_path_buf(),
270                        kind: ChangeKind::Deleted,
271                        content: None,
272                    });
273                }
274                _ => {}
275            }
276        }
277
278        if !changes.is_empty() {
279            let _ = tx
280                .send(SyncEvent::DirectoryChanged {
281                    workspace_id: workspace_id.to_string(),
282                    changes,
283                })
284                .await;
285        }
286
287        Ok(())
288    }
289
290    /// Process a sync event
291    async fn process_sync_event(
292        persistence: &WorkspacePersistence,
293        _workspace_id: &str,
294        _directory: &Path,
295        event: SyncEvent,
296    ) -> Result<()> {
297        if let SyncEvent::DirectoryChanged {
298            workspace_id,
299            changes,
300        } = event
301        {
302            info!("Processing {} file changes for workspace {}", changes.len(), workspace_id);
303
304            if !changes.is_empty() {
305                info!(
306                    workspace_id = %workspace_id,
307                    count = changes.len(),
308                    "Detected file changes in workspace"
309                );
310            }
311
312            for change in changes {
313                match change.kind {
314                    ChangeKind::Created => {
315                        info!(path = %change.path.display(), "File created");
316                        if let Some(content) = change.content {
317                            if let Err(e) = Self::import_yaml_content(
318                                persistence,
319                                &workspace_id,
320                                &change.path,
321                                &content,
322                            )
323                            .await
324                            {
325                                warn!("Failed to import file {}: {}", change.path.display(), e);
326                            } else {
327                                info!(path = %change.path.display(), "Successfully imported");
328                            }
329                        }
330                    }
331                    ChangeKind::Modified => {
332                        info!(path = %change.path.display(), "File modified");
333                        if let Some(content) = change.content {
334                            if let Err(e) = Self::import_yaml_content(
335                                persistence,
336                                &workspace_id,
337                                &change.path,
338                                &content,
339                            )
340                            .await
341                            {
342                                warn!("Failed to import file {}: {}", change.path.display(), e);
343                            } else {
344                                info!(path = %change.path.display(), "Successfully updated");
345                            }
346                        }
347                    }
348                    ChangeKind::Deleted => {
349                        info!(
350                            path = %change.path.display(),
351                            workspace_id = %workspace_id,
352                            "File deleted from watched directory — workspace may be out of sync. \
353                             Re-export workspace or restart sync to reconcile."
354                        );
355                    }
356                }
357            }
358        }
359
360        Ok(())
361    }
362
363    /// Import YAML content into workspace
364    async fn import_yaml_content(
365        persistence: &WorkspacePersistence,
366        workspace_id: &str,
367        path: &Path,
368        content: &str,
369    ) -> Result<()> {
370        // Load the workspace
371        let workspace = persistence.load_workspace(workspace_id).await?;
372
373        // Check sync direction before proceeding
374        if !matches!(workspace.get_sync_direction(), crate::workspace::SyncDirection::Bidirectional)
375        {
376            debug!("Workspace {} is not configured for bidirectional sync", workspace_id);
377            return Ok(());
378        }
379
380        // Try to parse as a workspace export
381        if let Ok(_export) =
382            serde_yaml::from_str::<crate::workspace_persistence::WorkspaceExport>(content)
383        {
384            // This is a full workspace export - we should be cautious about importing
385            // For now, just log the intent
386            info!(
387                "Detected workspace export for {}, skipping full import to avoid conflicts",
388                workspace_id
389            );
390            debug!("Skipping workspace export to avoid conflicts");
391            return Ok(());
392        }
393
394        // Try to parse as a request
395        if let Ok(request) = serde_yaml::from_str::<crate::workspace::MockRequest>(content) {
396            // Import individual request
397            debug!("Importing request {} from {}", request.name, path.display());
398
399            let mut workspace = persistence.load_workspace(workspace_id).await?;
400            // Add to root level
401            workspace.add_request(request)?;
402            persistence.save_workspace(&workspace).await?;
403
404            info!(
405                "Successfully imported request from {} into workspace {}",
406                path.display(),
407                workspace_id
408            );
409        } else {
410            debug!("Content in {} is not a recognized format, skipping", path.display());
411            return Err(Error::generic(
412                "File is not a recognized format (expected MockRequest YAML)".to_string(),
413            ));
414        }
415
416        Ok(())
417    }
418
419    /// Get monitoring status
420    pub async fn is_monitoring(&self, workspace_id: &str) -> bool {
421        self.watchers.contains_key(workspace_id)
422    }
423
424    /// Get list of monitored workspaces
425    pub fn get_monitored_workspaces(&self) -> Vec<String> {
426        self.watchers.keys().cloned().collect()
427    }
428}
429
430impl Drop for SyncWatcher {
431    fn drop(&mut self) {
432        // Note: We can't await in drop, so watchers will be stopped when they're dropped
433        // The runtime will handle cleanup
434    }
435}
436
437/// Background sync service
438pub struct SyncService {
439    watcher: Arc<Mutex<SyncWatcher>>,
440    running: Arc<Mutex<bool>>,
441}
442
443impl SyncService {
444    /// Create a new sync service
445    pub fn new<P: AsRef<Path>>(workspace_dir: P) -> Self {
446        let watcher = Arc::new(Mutex::new(SyncWatcher::new(workspace_dir)));
447
448        Self {
449            watcher,
450            running: Arc::new(Mutex::new(false)),
451        }
452    }
453
454    /// Start the sync service
455    pub async fn start(&self) -> Result<()> {
456        let mut running = self.running.lock().await;
457        *running = true;
458        info!("Sync service started");
459        Ok(())
460    }
461
462    /// Stop the sync service
463    pub async fn stop(&self) -> Result<()> {
464        let mut running = self.running.lock().await;
465        *running = false;
466
467        let mut watcher = self.watcher.lock().await;
468        watcher.stop_all().await?;
469        info!("Sync service stopped");
470        Ok(())
471    }
472
473    /// Start monitoring a workspace
474    pub async fn monitor_workspace(&self, workspace_id: &str, directory: &str) -> Result<()> {
475        let mut watcher = self.watcher.lock().await;
476        watcher.start_monitoring(workspace_id, directory).await?;
477        Ok(())
478    }
479
480    /// Stop monitoring a workspace
481    pub async fn stop_monitoring_workspace(&self, workspace_id: &str) -> Result<()> {
482        let mut watcher = self.watcher.lock().await;
483        watcher.stop_monitoring(workspace_id).await?;
484        Ok(())
485    }
486
487    /// Get monitoring status
488    pub async fn is_workspace_monitored(&self, workspace_id: &str) -> bool {
489        let watcher = self.watcher.lock().await;
490        watcher.is_monitoring(workspace_id).await
491    }
492}
493
494#[cfg(test)]
495mod tests {
496    use super::*;
497    use tempfile::TempDir;
498
499    #[tokio::test]
500    async fn test_sync_service_creation() {
501        let temp_dir = TempDir::new().unwrap();
502        let service = SyncService::new(temp_dir.path());
503
504        assert!(!*service.running.lock().await);
505    }
506
507    #[tokio::test]
508    async fn test_sync_service_lifecycle() {
509        let temp_dir = TempDir::new().unwrap();
510        let service = SyncService::new(temp_dir.path());
511
512        // Start service
513        service.start().await.unwrap();
514        assert!(*service.running.lock().await);
515
516        // Stop service
517        service.stop().await.unwrap();
518        assert!(!*service.running.lock().await);
519    }
520}