mockforge_core/
sync_watcher.rs

1//! File system watcher for bidirectional directory sync
2//!
3//! This module provides real-time file system monitoring for bidirectional
4//! sync between workspaces and external directories.
5
6use crate::workspace_persistence::WorkspacePersistence;
7use crate::{Error, Result};
8use notify::{Config, Event, RecommendedWatcher, Watcher};
9use std::collections::HashMap;
10use std::path::{Path, PathBuf};
11use std::sync::Arc;
12use tokio::sync::{mpsc, Mutex};
13use tokio::time::{timeout, Duration};
14use tracing::{debug, error, info, warn};
15
16/// File system watcher for workspace sync
17pub struct SyncWatcher {
18    /// Active watchers by workspace ID
19    watchers: HashMap<String, RecommendedWatcher>,
20    /// Running state
21    running: Arc<Mutex<bool>>,
22    /// Persistence layer
23    persistence: Arc<WorkspacePersistence>,
24}
25
26/// File system synchronization events for workspace monitoring
27#[derive(Debug, Clone)]
28pub enum SyncEvent {
29    /// A new file was created in the watched directory
30    FileCreated {
31        /// Workspace ID this file belongs to
32        workspace_id: String,
33        /// Path to the created file
34        path: PathBuf,
35        /// Contents of the created file
36        content: String,
37    },
38    /// An existing file was modified in the watched directory
39    FileModified {
40        /// Workspace ID this file belongs to
41        workspace_id: String,
42        /// Path to the modified file
43        path: PathBuf,
44        /// Updated contents of the file
45        content: String,
46    },
47    /// A file was deleted from the watched directory
48    FileDeleted {
49        /// Workspace ID this file belonged to
50        workspace_id: String,
51        /// Path to the deleted file
52        path: PathBuf,
53    },
54    /// Multiple directory changes detected (batched summary)
55    DirectoryChanged {
56        /// Workspace ID where changes occurred
57        workspace_id: String,
58        /// List of file changes detected
59        changes: Vec<FileChange>,
60    },
61}
62
63/// Represents a single file change in the watched directory
64#[derive(Debug, Clone)]
65pub struct FileChange {
66    /// Path to the file that changed
67    pub path: PathBuf,
68    /// Type of change that occurred
69    pub kind: ChangeKind,
70    /// Optional file contents (for created/modified events)
71    pub content: Option<String>,
72}
73
74/// Type of file system change detected
75#[derive(Debug, Clone)]
76pub enum ChangeKind {
77    /// File was created
78    Created,
79    /// File was modified
80    Modified,
81    /// File was deleted
82    Deleted,
83}
84
85impl SyncWatcher {
86    /// Create a new sync watcher
87    pub fn new<P: AsRef<Path>>(workspace_dir: P) -> Self {
88        let persistence = Arc::new(WorkspacePersistence::new(workspace_dir));
89
90        Self {
91            watchers: HashMap::new(),
92            running: Arc::new(Mutex::new(false)),
93            persistence,
94        }
95    }
96
97    /// Start monitoring a workspace directory
98    pub async fn start_monitoring(&mut self, workspace_id: &str, directory: &str) -> Result<()> {
99        let directory_path = PathBuf::from(directory);
100
101        // Ensure directory exists
102        if !directory_path.exists() {
103            std::fs::create_dir_all(&directory_path)
104                .map_err(|e| Error::generic(format!("Failed to create sync directory: {}", e)))?;
105        }
106
107        let (tx, mut rx) = mpsc::channel(100);
108        let workspace_id_string = workspace_id.to_string();
109        let workspace_id_for_watcher = workspace_id_string.clone();
110        let workspace_id_for_processing = workspace_id_string.clone();
111        let directory_path_clone = directory_path.clone();
112        let directory_path_for_processing = directory_path.clone();
113        let directory_str = directory.to_string();
114
115        let config = Config::default()
116            .with_poll_interval(Duration::from_secs(1))
117            .with_compare_contents(true);
118
119        let mut watcher = RecommendedWatcher::new(
120            move |res: notify::Result<Event>| {
121                if let Ok(event) = res {
122                    debug!("File system event: {:?}", event);
123                    let tx_clone = tx.clone();
124                    let workspace_id_clone = workspace_id_string.clone();
125                    let dir_clone = directory_path_clone.clone();
126
127                    tokio::spawn(async move {
128                        if let Err(e) = Self::handle_fs_event(
129                            &tx_clone,
130                            &workspace_id_clone,
131                            &dir_clone,
132                            &event,
133                        )
134                        .await
135                        {
136                            error!("Failed to handle file system event: {}", e);
137                        }
138                    });
139                }
140            },
141            config,
142        )
143        .map_err(|e| Error::generic(format!("Failed to create file watcher: {}", e)))?;
144
145        // Watch the directory recursively
146        watcher
147            .watch(&directory_path, notify::RecursiveMode::Recursive)
148            .map_err(|e| Error::generic(format!("Failed to watch directory: {}", e)))?;
149
150        // Store the watcher
151        self.watchers.insert(workspace_id_for_watcher, watcher);
152
153        // Start processing events
154        let persistence_clone = self.persistence.clone();
155        let is_running = self.running.clone();
156
157        tokio::spawn(async move {
158            info!(
159                "Started monitoring workspace {} in directory {}",
160                workspace_id_for_processing, directory_str
161            );
162            println!(
163                "📂 Monitoring workspace '{}' in directory: {}",
164                workspace_id_for_processing, directory_str
165            );
166
167            while *is_running.lock().await {
168                match timeout(Duration::from_millis(100), rx.recv()).await {
169                    Ok(Some(event)) => {
170                        if let Err(e) = Self::process_sync_event(
171                            &persistence_clone,
172                            &workspace_id_for_processing,
173                            &directory_path_for_processing,
174                            event,
175                        )
176                        .await
177                        {
178                            error!("Failed to process sync event: {}", e);
179                            eprintln!("❌ Sync error: {}", e);
180                        }
181                    }
182                    Ok(None) => break,  // Channel closed
183                    Err(_) => continue, // Timeout, continue monitoring
184                }
185            }
186
187            info!(
188                "Stopped monitoring workspace {} in directory {}",
189                workspace_id_for_processing, directory_str
190            );
191            println!(
192                "âšī¸  Stopped monitoring workspace '{}' in directory: {}",
193                workspace_id_for_processing, directory_str
194            );
195        });
196
197        Ok(())
198    }
199
200    /// Stop monitoring a workspace
201    pub async fn stop_monitoring(&mut self, workspace_id: &str) -> Result<()> {
202        if let Some(watcher) = self.watchers.remove(workspace_id) {
203            // Dropping the watcher will stop it
204            drop(watcher);
205        }
206
207        Ok(())
208    }
209
210    /// Stop all monitoring
211    pub async fn stop_all(&mut self) -> Result<()> {
212        *self.running.lock().await = false;
213        self.watchers.clear();
214        Ok(())
215    }
216
217    /// Handle a file system event
218    async fn handle_fs_event(
219        tx: &mpsc::Sender<SyncEvent>,
220        workspace_id: &str,
221        base_dir: &Path,
222        event: &Event,
223    ) -> Result<()> {
224        let mut changes = Vec::new();
225
226        for path in &event.paths {
227            // Make path relative to the watched directory
228            let relative_path = path.strip_prefix(base_dir).unwrap_or(path);
229
230            // Skip metadata files and temporary files
231            if relative_path.starts_with(".")
232                || relative_path
233                    .file_name()
234                    .map(|n| n.to_string_lossy().starts_with("."))
235                    .unwrap_or(false)
236            {
237                continue;
238            }
239
240            // Only process YAML files
241            if let Some(extension) = path.extension() {
242                if extension != "yaml" && extension != "yml" {
243                    continue;
244                }
245            }
246
247            match event.kind {
248                notify::EventKind::Create(_) => {
249                    if let Ok(content) = tokio::fs::read_to_string(&path).await {
250                        changes.push(FileChange {
251                            path: relative_path.to_path_buf(),
252                            kind: ChangeKind::Created,
253                            content: Some(content),
254                        });
255                    }
256                }
257                notify::EventKind::Modify(_) => {
258                    if let Ok(content) = tokio::fs::read_to_string(&path).await {
259                        changes.push(FileChange {
260                            path: relative_path.to_path_buf(),
261                            kind: ChangeKind::Modified,
262                            content: Some(content),
263                        });
264                    }
265                }
266                notify::EventKind::Remove(_) => {
267                    changes.push(FileChange {
268                        path: relative_path.to_path_buf(),
269                        kind: ChangeKind::Deleted,
270                        content: None,
271                    });
272                }
273                _ => {}
274            }
275        }
276
277        if !changes.is_empty() {
278            let _ = tx
279                .send(SyncEvent::DirectoryChanged {
280                    workspace_id: workspace_id.to_string(),
281                    changes,
282                })
283                .await;
284        }
285
286        Ok(())
287    }
288
289    /// Process a sync event
290    async fn process_sync_event(
291        persistence: &WorkspacePersistence,
292        _workspace_id: &str,
293        _directory: &Path,
294        event: SyncEvent,
295    ) -> Result<()> {
296        if let SyncEvent::DirectoryChanged {
297            workspace_id,
298            changes,
299        } = event
300        {
301            info!("Processing {} file changes for workspace {}", changes.len(), workspace_id);
302
303            if !changes.is_empty() {
304                println!(
305                    "🔄 Detected {} file change{} in workspace '{}'",
306                    changes.len(),
307                    if changes.len() == 1 { "" } else { "s" },
308                    workspace_id
309                );
310            }
311
312            for change in changes {
313                match change.kind {
314                    ChangeKind::Created => {
315                        println!("  ➕ Created: {}", change.path.display());
316                        if let Some(content) = change.content {
317                            if let Err(e) = Self::import_yaml_content(
318                                persistence,
319                                &workspace_id,
320                                &change.path,
321                                &content,
322                            )
323                            .await
324                            {
325                                warn!("Failed to import file {}: {}", change.path.display(), e);
326                                eprintln!("     âš ī¸  Failed to import: {}", e);
327                            } else {
328                                println!("     ✅ Successfully imported");
329                            }
330                        }
331                    }
332                    ChangeKind::Modified => {
333                        println!("  📝 Modified: {}", change.path.display());
334                        if let Some(content) = change.content {
335                            if let Err(e) = Self::import_yaml_content(
336                                persistence,
337                                &workspace_id,
338                                &change.path,
339                                &content,
340                            )
341                            .await
342                            {
343                                warn!("Failed to import file {}: {}", change.path.display(), e);
344                                eprintln!("     âš ī¸  Failed to import: {}", e);
345                            } else {
346                                println!("     ✅ Successfully updated");
347                            }
348                        }
349                    }
350                    ChangeKind::Deleted => {
351                        println!("  đŸ—‘ī¸  Deleted: {}", change.path.display());
352                        println!("     â„šī¸  Auto-deletion from workspace is disabled");
353                        debug!("File deleted: {}", change.path.display());
354                        // For now, we don't auto-delete from workspace on file deletion
355                        // This could be configurable in the future
356                    }
357                }
358            }
359        }
360
361        Ok(())
362    }
363
364    /// Import YAML content into workspace
365    async fn import_yaml_content(
366        persistence: &WorkspacePersistence,
367        workspace_id: &str,
368        path: &Path,
369        content: &str,
370    ) -> Result<()> {
371        // Load the workspace
372        let workspace = persistence.load_workspace(workspace_id).await?;
373
374        // Check sync direction before proceeding
375        if !matches!(workspace.get_sync_direction(), crate::workspace::SyncDirection::Bidirectional)
376        {
377            debug!("Workspace {} is not configured for bidirectional sync", workspace_id);
378            return Ok(());
379        }
380
381        // Try to parse as a workspace export
382        if let Ok(_export) =
383            serde_yaml::from_str::<crate::workspace_persistence::WorkspaceExport>(content)
384        {
385            // This is a full workspace export - we should be cautious about importing
386            // For now, just log the intent
387            info!(
388                "Detected workspace export for {}, skipping full import to avoid conflicts",
389                workspace_id
390            );
391            debug!("Skipping workspace export to avoid conflicts");
392            return Ok(());
393        }
394
395        // Try to parse as a request
396        if let Ok(request) = serde_yaml::from_str::<crate::workspace::MockRequest>(content) {
397            // Import individual request
398            debug!("Importing request {} from {}", request.name, path.display());
399
400            let mut workspace = persistence.load_workspace(workspace_id).await?;
401            // Add to root level
402            workspace.add_request(request)?;
403            persistence.save_workspace(&workspace).await?;
404
405            info!(
406                "Successfully imported request from {} into workspace {}",
407                path.display(),
408                workspace_id
409            );
410        } else {
411            debug!("Content in {} is not a recognized format, skipping", path.display());
412            return Err(Error::generic(
413                "File is not a recognized format (expected MockRequest YAML)".to_string(),
414            ));
415        }
416
417        Ok(())
418    }
419
420    /// Get monitoring status
421    pub async fn is_monitoring(&self, workspace_id: &str) -> bool {
422        self.watchers.contains_key(workspace_id)
423    }
424
425    /// Get list of monitored workspaces
426    pub fn get_monitored_workspaces(&self) -> Vec<String> {
427        self.watchers.keys().cloned().collect()
428    }
429}
430
431impl Drop for SyncWatcher {
432    fn drop(&mut self) {
433        // Note: We can't await in drop, so watchers will be stopped when they're dropped
434        // The runtime will handle cleanup
435    }
436}
437
438/// Background sync service
439pub struct SyncService {
440    watcher: Arc<Mutex<SyncWatcher>>,
441    running: Arc<Mutex<bool>>,
442}
443
444impl SyncService {
445    /// Create a new sync service
446    pub fn new<P: AsRef<Path>>(workspace_dir: P) -> Self {
447        let watcher = Arc::new(Mutex::new(SyncWatcher::new(workspace_dir)));
448
449        Self {
450            watcher,
451            running: Arc::new(Mutex::new(false)),
452        }
453    }
454
455    /// Start the sync service
456    pub async fn start(&self) -> Result<()> {
457        let mut running = self.running.lock().await;
458        *running = true;
459        info!("Sync service started");
460        Ok(())
461    }
462
463    /// Stop the sync service
464    pub async fn stop(&self) -> Result<()> {
465        let mut running = self.running.lock().await;
466        *running = false;
467
468        let mut watcher = self.watcher.lock().await;
469        watcher.stop_all().await?;
470        info!("Sync service stopped");
471        Ok(())
472    }
473
474    /// Start monitoring a workspace
475    pub async fn monitor_workspace(&self, workspace_id: &str, directory: &str) -> Result<()> {
476        let mut watcher = self.watcher.lock().await;
477        watcher.start_monitoring(workspace_id, directory).await?;
478        Ok(())
479    }
480
481    /// Stop monitoring a workspace
482    pub async fn stop_monitoring_workspace(&self, workspace_id: &str) -> Result<()> {
483        let mut watcher = self.watcher.lock().await;
484        watcher.stop_monitoring(workspace_id).await?;
485        Ok(())
486    }
487
488    /// Get monitoring status
489    pub async fn is_workspace_monitored(&self, workspace_id: &str) -> bool {
490        let watcher = self.watcher.lock().await;
491        watcher.is_monitoring(workspace_id).await
492    }
493}
494
495#[cfg(test)]
496mod tests {
497    use super::*;
498    use tempfile::TempDir;
499
500    #[tokio::test]
501    async fn test_sync_service_creation() {
502        let temp_dir = TempDir::new().unwrap();
503        let service = SyncService::new(temp_dir.path());
504
505        assert!(!*service.running.lock().await);
506    }
507
508    #[tokio::test]
509    async fn test_sync_service_lifecycle() {
510        let temp_dir = TempDir::new().unwrap();
511        let service = SyncService::new(temp_dir.path());
512
513        // Start service
514        service.start().await.unwrap();
515        assert!(*service.running.lock().await);
516
517        // Stop service
518        service.stop().await.unwrap();
519        assert!(!*service.running.lock().await);
520    }
521}