Skip to main content

mockforge_core/
sync_watcher.rs

1//! File system watcher for bidirectional directory sync
2//!
3//! This module provides real-time file system monitoring for bidirectional
4//! sync between workspaces and external directories.
5
6use crate::workspace_persistence::WorkspacePersistence;
7use crate::{Error, Result};
8use notify::{Config, Event, RecommendedWatcher, Watcher};
9use std::collections::HashMap;
10use std::path::{Path, PathBuf};
11use std::sync::Arc;
12use tokio::sync::{mpsc, Mutex};
13use tokio::time::{timeout, Duration};
14use tracing::{debug, error, info, warn};
15
16/// File system watcher for workspace sync
17pub struct SyncWatcher {
18    /// Active watchers by workspace ID
19    watchers: HashMap<String, RecommendedWatcher>,
20    /// Running state
21    running: Arc<Mutex<bool>>,
22    /// Persistence layer
23    persistence: Arc<WorkspacePersistence>,
24}
25
26/// File system synchronization events for workspace monitoring
27#[derive(Debug, Clone)]
28pub enum SyncEvent {
29    /// A new file was created in the watched directory
30    FileCreated {
31        /// Workspace ID this file belongs to
32        workspace_id: String,
33        /// Path to the created file
34        path: PathBuf,
35        /// Contents of the created file
36        content: String,
37    },
38    /// An existing file was modified in the watched directory
39    FileModified {
40        /// Workspace ID this file belongs to
41        workspace_id: String,
42        /// Path to the modified file
43        path: PathBuf,
44        /// Updated contents of the file
45        content: String,
46    },
47    /// A file was deleted from the watched directory
48    FileDeleted {
49        /// Workspace ID this file belonged to
50        workspace_id: String,
51        /// Path to the deleted file
52        path: PathBuf,
53    },
54    /// Multiple directory changes detected (batched summary)
55    DirectoryChanged {
56        /// Workspace ID where changes occurred
57        workspace_id: String,
58        /// List of file changes detected
59        changes: Vec<FileChange>,
60    },
61}
62
63/// Represents a single file change in the watched directory
64#[derive(Debug, Clone)]
65pub struct FileChange {
66    /// Path to the file that changed
67    pub path: PathBuf,
68    /// Type of change that occurred
69    pub kind: ChangeKind,
70    /// Optional file contents (for created/modified events)
71    pub content: Option<String>,
72}
73
74/// Type of file system change detected
75#[derive(Debug, Clone)]
76pub enum ChangeKind {
77    /// File was created
78    Created,
79    /// File was modified
80    Modified,
81    /// File was deleted
82    Deleted,
83}
84
85impl SyncWatcher {
86    /// Create a new sync watcher
87    pub fn new<P: AsRef<Path>>(workspace_dir: P) -> Self {
88        let persistence = Arc::new(WorkspacePersistence::new(workspace_dir));
89
90        Self {
91            watchers: HashMap::new(),
92            running: Arc::new(Mutex::new(false)),
93            persistence,
94        }
95    }
96
97    /// Start monitoring a workspace directory
98    pub async fn start_monitoring(&mut self, workspace_id: &str, directory: &str) -> Result<()> {
99        let directory_path = PathBuf::from(directory);
100
101        // Ensure directory exists
102        if !directory_path.exists() {
103            std::fs::create_dir_all(&directory_path)
104                .map_err(|e| Error::generic(format!("Failed to create sync directory: {}", e)))?;
105        }
106
107        let (tx, mut rx) = mpsc::channel(100);
108        let workspace_id_string = workspace_id.to_string();
109        let workspace_id_for_watcher = workspace_id_string.clone();
110        let workspace_id_for_processing = workspace_id_string.clone();
111        let directory_path_clone = directory_path.clone();
112        let directory_path_for_processing = directory_path.clone();
113        let directory_str = directory.to_string();
114
115        let config = Config::default()
116            .with_poll_interval(Duration::from_secs(1))
117            .with_compare_contents(true);
118
119        let mut watcher = RecommendedWatcher::new(
120            move |res: notify::Result<Event>| {
121                if let Ok(event) = res {
122                    debug!("File system event: {:?}", event);
123                    let tx_clone = tx.clone();
124                    let workspace_id_clone = workspace_id_string.clone();
125                    let dir_clone = directory_path_clone.clone();
126
127                    tokio::spawn(async move {
128                        if let Err(e) = Self::handle_fs_event(
129                            &tx_clone,
130                            &workspace_id_clone,
131                            &dir_clone,
132                            &event,
133                        )
134                        .await
135                        {
136                            error!("Failed to handle file system event: {}", e);
137                        }
138                    });
139                }
140            },
141            config,
142        )
143        .map_err(|e| Error::generic(format!("Failed to create file watcher: {}", e)))?;
144
145        // Watch the directory recursively
146        watcher
147            .watch(&directory_path, notify::RecursiveMode::Recursive)
148            .map_err(|e| Error::generic(format!("Failed to watch directory: {}", e)))?;
149
150        // Store the watcher
151        self.watchers.insert(workspace_id_for_watcher, watcher);
152
153        // Start processing events
154        let persistence_clone = self.persistence.clone();
155        let is_running = self.running.clone();
156
157        tokio::spawn(async move {
158            info!(
159                "Started monitoring workspace {} in directory {}",
160                workspace_id_for_processing, directory_str
161            );
162            info!(
163                workspace_id = %workspace_id_for_processing,
164                directory = %directory_str,
165                "Monitoring workspace directory"
166            );
167
168            while *is_running.lock().await {
169                match timeout(Duration::from_millis(100), rx.recv()).await {
170                    Ok(Some(event)) => {
171                        if let Err(e) = Self::process_sync_event(
172                            &persistence_clone,
173                            &workspace_id_for_processing,
174                            &directory_path_for_processing,
175                            event,
176                        )
177                        .await
178                        {
179                            error!("Failed to process sync event: {}", e);
180                        }
181                    }
182                    Ok(None) => break,  // Channel closed
183                    Err(_) => continue, // Timeout, continue monitoring
184                }
185            }
186
187            info!(
188                "Stopped monitoring workspace {} in directory {}",
189                workspace_id_for_processing, directory_str
190            );
191            info!(
192                workspace_id = %workspace_id_for_processing,
193                directory = %directory_str,
194                "Stopped monitoring workspace directory"
195            );
196        });
197
198        Ok(())
199    }
200
201    /// Stop monitoring a workspace
202    pub async fn stop_monitoring(&mut self, workspace_id: &str) -> Result<()> {
203        if let Some(watcher) = self.watchers.remove(workspace_id) {
204            // Dropping the watcher will stop it
205            drop(watcher);
206        }
207
208        Ok(())
209    }
210
211    /// Stop all monitoring
212    pub async fn stop_all(&mut self) -> Result<()> {
213        *self.running.lock().await = false;
214        self.watchers.clear();
215        Ok(())
216    }
217
218    /// Handle a file system event
219    async fn handle_fs_event(
220        tx: &mpsc::Sender<SyncEvent>,
221        workspace_id: &str,
222        base_dir: &Path,
223        event: &Event,
224    ) -> Result<()> {
225        let mut changes = Vec::new();
226
227        for path in &event.paths {
228            // Make path relative to the watched directory
229            let relative_path = path.strip_prefix(base_dir).unwrap_or(path);
230
231            // Skip metadata files and temporary files
232            if relative_path.starts_with(".")
233                || relative_path
234                    .file_name()
235                    .map(|n| n.to_string_lossy().starts_with("."))
236                    .unwrap_or(false)
237            {
238                continue;
239            }
240
241            // Only process YAML files
242            if let Some(extension) = path.extension() {
243                if extension != "yaml" && extension != "yml" {
244                    continue;
245                }
246            }
247
248            match event.kind {
249                notify::EventKind::Create(_) => {
250                    if let Ok(content) = tokio::fs::read_to_string(&path).await {
251                        changes.push(FileChange {
252                            path: relative_path.to_path_buf(),
253                            kind: ChangeKind::Created,
254                            content: Some(content),
255                        });
256                    }
257                }
258                notify::EventKind::Modify(_) => {
259                    if let Ok(content) = tokio::fs::read_to_string(&path).await {
260                        changes.push(FileChange {
261                            path: relative_path.to_path_buf(),
262                            kind: ChangeKind::Modified,
263                            content: Some(content),
264                        });
265                    }
266                }
267                notify::EventKind::Remove(_) => {
268                    changes.push(FileChange {
269                        path: relative_path.to_path_buf(),
270                        kind: ChangeKind::Deleted,
271                        content: None,
272                    });
273                }
274                _ => {}
275            }
276        }
277
278        if !changes.is_empty() {
279            let _ = tx
280                .send(SyncEvent::DirectoryChanged {
281                    workspace_id: workspace_id.to_string(),
282                    changes,
283                })
284                .await;
285        }
286
287        Ok(())
288    }
289
290    /// Process a sync event
291    async fn process_sync_event(
292        persistence: &WorkspacePersistence,
293        _workspace_id: &str,
294        _directory: &Path,
295        event: SyncEvent,
296    ) -> Result<()> {
297        if let SyncEvent::DirectoryChanged {
298            workspace_id,
299            changes,
300        } = event
301        {
302            info!("Processing {} file changes for workspace {}", changes.len(), workspace_id);
303
304            if !changes.is_empty() {
305                info!(
306                    workspace_id = %workspace_id,
307                    count = changes.len(),
308                    "Detected file changes in workspace"
309                );
310            }
311
312            for change in changes {
313                match change.kind {
314                    ChangeKind::Created => {
315                        info!(path = %change.path.display(), "File created");
316                        if let Some(content) = change.content {
317                            if let Err(e) = Self::import_yaml_content(
318                                persistence,
319                                &workspace_id,
320                                &change.path,
321                                &content,
322                            )
323                            .await
324                            {
325                                warn!("Failed to import file {}: {}", change.path.display(), e);
326                            } else {
327                                info!(path = %change.path.display(), "Successfully imported");
328                            }
329                        }
330                    }
331                    ChangeKind::Modified => {
332                        info!(path = %change.path.display(), "File modified");
333                        if let Some(content) = change.content {
334                            if let Err(e) = Self::import_yaml_content(
335                                persistence,
336                                &workspace_id,
337                                &change.path,
338                                &content,
339                            )
340                            .await
341                            {
342                                warn!("Failed to import file {}: {}", change.path.display(), e);
343                            } else {
344                                info!(path = %change.path.display(), "Successfully updated");
345                            }
346                        }
347                    }
348                    ChangeKind::Deleted => {
349                        debug!("File deleted: {}", change.path.display());
350                        debug!("Auto-deletion from workspace is disabled");
351                        // For now, we don't auto-delete from workspace on file deletion
352                        // This could be configurable in the future
353                    }
354                }
355            }
356        }
357
358        Ok(())
359    }
360
361    /// Import YAML content into workspace
362    async fn import_yaml_content(
363        persistence: &WorkspacePersistence,
364        workspace_id: &str,
365        path: &Path,
366        content: &str,
367    ) -> Result<()> {
368        // Load the workspace
369        let workspace = persistence.load_workspace(workspace_id).await?;
370
371        // Check sync direction before proceeding
372        if !matches!(workspace.get_sync_direction(), crate::workspace::SyncDirection::Bidirectional)
373        {
374            debug!("Workspace {} is not configured for bidirectional sync", workspace_id);
375            return Ok(());
376        }
377
378        // Try to parse as a workspace export
379        if let Ok(_export) =
380            serde_yaml::from_str::<crate::workspace_persistence::WorkspaceExport>(content)
381        {
382            // This is a full workspace export - we should be cautious about importing
383            // For now, just log the intent
384            info!(
385                "Detected workspace export for {}, skipping full import to avoid conflicts",
386                workspace_id
387            );
388            debug!("Skipping workspace export to avoid conflicts");
389            return Ok(());
390        }
391
392        // Try to parse as a request
393        if let Ok(request) = serde_yaml::from_str::<crate::workspace::MockRequest>(content) {
394            // Import individual request
395            debug!("Importing request {} from {}", request.name, path.display());
396
397            let mut workspace = persistence.load_workspace(workspace_id).await?;
398            // Add to root level
399            workspace.add_request(request)?;
400            persistence.save_workspace(&workspace).await?;
401
402            info!(
403                "Successfully imported request from {} into workspace {}",
404                path.display(),
405                workspace_id
406            );
407        } else {
408            debug!("Content in {} is not a recognized format, skipping", path.display());
409            return Err(Error::generic(
410                "File is not a recognized format (expected MockRequest YAML)".to_string(),
411            ));
412        }
413
414        Ok(())
415    }
416
417    /// Get monitoring status
418    pub async fn is_monitoring(&self, workspace_id: &str) -> bool {
419        self.watchers.contains_key(workspace_id)
420    }
421
422    /// Get list of monitored workspaces
423    pub fn get_monitored_workspaces(&self) -> Vec<String> {
424        self.watchers.keys().cloned().collect()
425    }
426}
427
428impl Drop for SyncWatcher {
429    fn drop(&mut self) {
430        // Note: We can't await in drop, so watchers will be stopped when they're dropped
431        // The runtime will handle cleanup
432    }
433}
434
435/// Background sync service
436pub struct SyncService {
437    watcher: Arc<Mutex<SyncWatcher>>,
438    running: Arc<Mutex<bool>>,
439}
440
441impl SyncService {
442    /// Create a new sync service
443    pub fn new<P: AsRef<Path>>(workspace_dir: P) -> Self {
444        let watcher = Arc::new(Mutex::new(SyncWatcher::new(workspace_dir)));
445
446        Self {
447            watcher,
448            running: Arc::new(Mutex::new(false)),
449        }
450    }
451
452    /// Start the sync service
453    pub async fn start(&self) -> Result<()> {
454        let mut running = self.running.lock().await;
455        *running = true;
456        info!("Sync service started");
457        Ok(())
458    }
459
460    /// Stop the sync service
461    pub async fn stop(&self) -> Result<()> {
462        let mut running = self.running.lock().await;
463        *running = false;
464
465        let mut watcher = self.watcher.lock().await;
466        watcher.stop_all().await?;
467        info!("Sync service stopped");
468        Ok(())
469    }
470
471    /// Start monitoring a workspace
472    pub async fn monitor_workspace(&self, workspace_id: &str, directory: &str) -> Result<()> {
473        let mut watcher = self.watcher.lock().await;
474        watcher.start_monitoring(workspace_id, directory).await?;
475        Ok(())
476    }
477
478    /// Stop monitoring a workspace
479    pub async fn stop_monitoring_workspace(&self, workspace_id: &str) -> Result<()> {
480        let mut watcher = self.watcher.lock().await;
481        watcher.stop_monitoring(workspace_id).await?;
482        Ok(())
483    }
484
485    /// Get monitoring status
486    pub async fn is_workspace_monitored(&self, workspace_id: &str) -> bool {
487        let watcher = self.watcher.lock().await;
488        watcher.is_monitoring(workspace_id).await
489    }
490}
491
492#[cfg(test)]
493mod tests {
494    use super::*;
495    use tempfile::TempDir;
496
497    #[tokio::test]
498    async fn test_sync_service_creation() {
499        let temp_dir = TempDir::new().unwrap();
500        let service = SyncService::new(temp_dir.path());
501
502        assert!(!*service.running.lock().await);
503    }
504
505    #[tokio::test]
506    async fn test_sync_service_lifecycle() {
507        let temp_dir = TempDir::new().unwrap();
508        let service = SyncService::new(temp_dir.path());
509
510        // Start service
511        service.start().await.unwrap();
512        assert!(*service.running.lock().await);
513
514        // Stop service
515        service.stop().await.unwrap();
516        assert!(!*service.running.lock().await);
517    }
518}