mockforge_core/
sync_watcher.rs

1//! File system watcher for bidirectional directory sync
2//!
3//! This module provides real-time file system monitoring for bidirectional
4//! sync between workspaces and external directories.
5
6use crate::workspace_persistence::WorkspacePersistence;
7use crate::{Error, Result};
8use notify::{Config, Event, RecommendedWatcher, Watcher};
9use std::collections::HashMap;
10use std::path::{Path, PathBuf};
11use std::sync::Arc;
12use tokio::sync::{mpsc, Mutex};
13use tokio::time::{timeout, Duration};
14use tracing::{debug, error, info, warn};
15
16/// File system watcher for workspace sync
17pub struct SyncWatcher {
18    /// Active watchers by workspace ID
19    watchers: HashMap<String, RecommendedWatcher>,
20    /// Running state
21    running: Arc<Mutex<bool>>,
22    /// Persistence layer
23    persistence: Arc<WorkspacePersistence>,
24}
25
26#[derive(Debug, Clone)]
27pub enum SyncEvent {
28    /// File created in directory
29    FileCreated {
30        workspace_id: String,
31        path: PathBuf,
32        content: String,
33    },
34    /// File modified in directory
35    FileModified {
36        workspace_id: String,
37        path: PathBuf,
38        content: String,
39    },
40    /// File deleted from directory
41    FileDeleted { workspace_id: String, path: PathBuf },
42    /// Directory changes detected (summary)
43    DirectoryChanged {
44        workspace_id: String,
45        changes: Vec<FileChange>,
46    },
47}
48
49#[derive(Debug, Clone)]
50pub struct FileChange {
51    pub path: PathBuf,
52    pub kind: ChangeKind,
53    pub content: Option<String>,
54}
55
56#[derive(Debug, Clone)]
57pub enum ChangeKind {
58    Created,
59    Modified,
60    Deleted,
61}
62
63impl SyncWatcher {
64    /// Create a new sync watcher
65    pub fn new<P: AsRef<Path>>(workspace_dir: P) -> Self {
66        let persistence = Arc::new(WorkspacePersistence::new(workspace_dir));
67
68        Self {
69            watchers: HashMap::new(),
70            running: Arc::new(Mutex::new(false)),
71            persistence,
72        }
73    }
74
75    /// Start monitoring a workspace directory
76    pub async fn start_monitoring(&mut self, workspace_id: &str, directory: &str) -> Result<()> {
77        let directory_path = PathBuf::from(directory);
78
79        // Ensure directory exists
80        if !directory_path.exists() {
81            std::fs::create_dir_all(&directory_path)
82                .map_err(|e| Error::generic(format!("Failed to create sync directory: {}", e)))?;
83        }
84
85        let (tx, mut rx) = mpsc::channel(100);
86        let workspace_id_string = workspace_id.to_string();
87        let workspace_id_for_watcher = workspace_id_string.clone();
88        let workspace_id_for_processing = workspace_id_string.clone();
89        let directory_path_clone = directory_path.clone();
90        let directory_path_for_processing = directory_path.clone();
91        let directory_str = directory.to_string();
92
93        let config = Config::default()
94            .with_poll_interval(Duration::from_secs(1))
95            .with_compare_contents(true);
96
97        let mut watcher = RecommendedWatcher::new(
98            move |res: notify::Result<Event>| {
99                if let Ok(event) = res {
100                    debug!("File system event: {:?}", event);
101                    let tx_clone = tx.clone();
102                    let workspace_id_clone = workspace_id_string.clone();
103                    let dir_clone = directory_path_clone.clone();
104
105                    tokio::spawn(async move {
106                        if let Err(e) = Self::handle_fs_event(
107                            &tx_clone,
108                            &workspace_id_clone,
109                            &dir_clone,
110                            &event,
111                        )
112                        .await
113                        {
114                            error!("Failed to handle file system event: {}", e);
115                        }
116                    });
117                }
118            },
119            config,
120        )
121        .map_err(|e| Error::generic(format!("Failed to create file watcher: {}", e)))?;
122
123        // Watch the directory recursively
124        watcher
125            .watch(&directory_path, notify::RecursiveMode::Recursive)
126            .map_err(|e| Error::generic(format!("Failed to watch directory: {}", e)))?;
127
128        // Store the watcher
129        self.watchers.insert(workspace_id_for_watcher, watcher);
130
131        // Start processing events
132        let persistence_clone = self.persistence.clone();
133        let is_running = self.running.clone();
134
135        tokio::spawn(async move {
136            info!(
137                "Started monitoring workspace {} in directory {}",
138                workspace_id_for_processing, directory_str
139            );
140            println!(
141                "📂 Monitoring workspace '{}' in directory: {}",
142                workspace_id_for_processing, directory_str
143            );
144
145            while *is_running.lock().await {
146                match timeout(Duration::from_millis(100), rx.recv()).await {
147                    Ok(Some(event)) => {
148                        if let Err(e) = Self::process_sync_event(
149                            &persistence_clone,
150                            &workspace_id_for_processing,
151                            &directory_path_for_processing,
152                            event,
153                        )
154                        .await
155                        {
156                            error!("Failed to process sync event: {}", e);
157                            eprintln!("❌ Sync error: {}", e);
158                        }
159                    }
160                    Ok(None) => break,  // Channel closed
161                    Err(_) => continue, // Timeout, continue monitoring
162                }
163            }
164
165            info!(
166                "Stopped monitoring workspace {} in directory {}",
167                workspace_id_for_processing, directory_str
168            );
169            println!(
170                "âšī¸  Stopped monitoring workspace '{}' in directory: {}",
171                workspace_id_for_processing, directory_str
172            );
173        });
174
175        Ok(())
176    }
177
178    /// Stop monitoring a workspace
179    pub async fn stop_monitoring(&mut self, workspace_id: &str) -> Result<()> {
180        if let Some(watcher) = self.watchers.remove(workspace_id) {
181            // Dropping the watcher will stop it
182            drop(watcher);
183        }
184
185        Ok(())
186    }
187
188    /// Stop all monitoring
189    pub async fn stop_all(&mut self) -> Result<()> {
190        *self.running.lock().await = false;
191        self.watchers.clear();
192        Ok(())
193    }
194
195    /// Handle a file system event
196    async fn handle_fs_event(
197        tx: &mpsc::Sender<SyncEvent>,
198        workspace_id: &str,
199        base_dir: &Path,
200        event: &Event,
201    ) -> Result<()> {
202        let mut changes = Vec::new();
203
204        for path in &event.paths {
205            // Make path relative to the watched directory
206            let relative_path = path.strip_prefix(base_dir).unwrap_or(path);
207
208            // Skip metadata files and temporary files
209            if relative_path.starts_with(".")
210                || relative_path
211                    .file_name()
212                    .map(|n| n.to_string_lossy().starts_with("."))
213                    .unwrap_or(false)
214            {
215                continue;
216            }
217
218            // Only process YAML files
219            if let Some(extension) = path.extension() {
220                if extension != "yaml" && extension != "yml" {
221                    continue;
222                }
223            }
224
225            match event.kind {
226                notify::EventKind::Create(_) => {
227                    if let Ok(content) = tokio::fs::read_to_string(&path).await {
228                        changes.push(FileChange {
229                            path: relative_path.to_path_buf(),
230                            kind: ChangeKind::Created,
231                            content: Some(content),
232                        });
233                    }
234                }
235                notify::EventKind::Modify(_) => {
236                    if let Ok(content) = tokio::fs::read_to_string(&path).await {
237                        changes.push(FileChange {
238                            path: relative_path.to_path_buf(),
239                            kind: ChangeKind::Modified,
240                            content: Some(content),
241                        });
242                    }
243                }
244                notify::EventKind::Remove(_) => {
245                    changes.push(FileChange {
246                        path: relative_path.to_path_buf(),
247                        kind: ChangeKind::Deleted,
248                        content: None,
249                    });
250                }
251                _ => {}
252            }
253        }
254
255        if !changes.is_empty() {
256            let _ = tx
257                .send(SyncEvent::DirectoryChanged {
258                    workspace_id: workspace_id.to_string(),
259                    changes,
260                })
261                .await;
262        }
263
264        Ok(())
265    }
266
267    /// Process a sync event
268    async fn process_sync_event(
269        persistence: &WorkspacePersistence,
270        _workspace_id: &str,
271        _directory: &Path,
272        event: SyncEvent,
273    ) -> Result<()> {
274        if let SyncEvent::DirectoryChanged {
275            workspace_id,
276            changes,
277        } = event
278        {
279            info!("Processing {} file changes for workspace {}", changes.len(), workspace_id);
280
281            if !changes.is_empty() {
282                println!(
283                    "🔄 Detected {} file change{} in workspace '{}'",
284                    changes.len(),
285                    if changes.len() == 1 { "" } else { "s" },
286                    workspace_id
287                );
288            }
289
290            for change in changes {
291                match change.kind {
292                    ChangeKind::Created => {
293                        println!("  ➕ Created: {}", change.path.display());
294                        if let Some(content) = change.content {
295                            if let Err(e) = Self::import_yaml_content(
296                                persistence,
297                                &workspace_id,
298                                &change.path,
299                                &content,
300                            )
301                            .await
302                            {
303                                warn!("Failed to import file {}: {}", change.path.display(), e);
304                                eprintln!("     âš ī¸  Failed to import: {}", e);
305                            } else {
306                                println!("     ✅ Successfully imported");
307                            }
308                        }
309                    }
310                    ChangeKind::Modified => {
311                        println!("  📝 Modified: {}", change.path.display());
312                        if let Some(content) = change.content {
313                            if let Err(e) = Self::import_yaml_content(
314                                persistence,
315                                &workspace_id,
316                                &change.path,
317                                &content,
318                            )
319                            .await
320                            {
321                                warn!("Failed to import file {}: {}", change.path.display(), e);
322                                eprintln!("     âš ī¸  Failed to import: {}", e);
323                            } else {
324                                println!("     ✅ Successfully updated");
325                            }
326                        }
327                    }
328                    ChangeKind::Deleted => {
329                        println!("  đŸ—‘ī¸  Deleted: {}", change.path.display());
330                        println!("     â„šī¸  Auto-deletion from workspace is disabled");
331                        debug!("File deleted: {}", change.path.display());
332                        // For now, we don't auto-delete from workspace on file deletion
333                        // This could be configurable in the future
334                    }
335                }
336            }
337        }
338
339        Ok(())
340    }
341
342    /// Import YAML content into workspace
343    async fn import_yaml_content(
344        persistence: &WorkspacePersistence,
345        workspace_id: &str,
346        path: &Path,
347        content: &str,
348    ) -> Result<()> {
349        // Load the workspace
350        let workspace = persistence.load_workspace(workspace_id).await?;
351
352        // Check sync direction before proceeding
353        if !matches!(workspace.get_sync_direction(), crate::workspace::SyncDirection::Bidirectional)
354        {
355            debug!("Workspace {} is not configured for bidirectional sync", workspace_id);
356            return Ok(());
357        }
358
359        // Try to parse as a workspace export
360        if let Ok(_export) =
361            serde_yaml::from_str::<crate::workspace_persistence::WorkspaceExport>(content)
362        {
363            // This is a full workspace export - we should be cautious about importing
364            // For now, just log the intent
365            info!(
366                "Detected workspace export for {}, skipping full import to avoid conflicts",
367                workspace_id
368            );
369            debug!("Skipping workspace export to avoid conflicts");
370            return Ok(());
371        }
372
373        // Try to parse as a request
374        if let Ok(request) = serde_yaml::from_str::<crate::workspace::MockRequest>(content) {
375            // Import individual request
376            debug!("Importing request {} from {}", request.name, path.display());
377
378            let mut workspace = persistence.load_workspace(workspace_id).await?;
379            // Add to root level
380            workspace.add_request(request)?;
381            persistence.save_workspace(&workspace).await?;
382
383            info!(
384                "Successfully imported request from {} into workspace {}",
385                path.display(),
386                workspace_id
387            );
388        } else {
389            debug!("Content in {} is not a recognized format, skipping", path.display());
390            return Err(Error::generic(
391                "File is not a recognized format (expected MockRequest YAML)".to_string(),
392            ));
393        }
394
395        Ok(())
396    }
397
398    /// Get monitoring status
399    pub async fn is_monitoring(&self, workspace_id: &str) -> bool {
400        self.watchers.contains_key(workspace_id)
401    }
402
403    /// Get list of monitored workspaces
404    pub fn get_monitored_workspaces(&self) -> Vec<String> {
405        self.watchers.keys().cloned().collect()
406    }
407}
408
409impl Drop for SyncWatcher {
410    fn drop(&mut self) {
411        // Note: We can't await in drop, so watchers will be stopped when they're dropped
412        // The runtime will handle cleanup
413    }
414}
415
416/// Background sync service
417pub struct SyncService {
418    watcher: Arc<Mutex<SyncWatcher>>,
419    running: Arc<Mutex<bool>>,
420}
421
422impl SyncService {
423    /// Create a new sync service
424    pub fn new<P: AsRef<Path>>(workspace_dir: P) -> Self {
425        let watcher = Arc::new(Mutex::new(SyncWatcher::new(workspace_dir)));
426
427        Self {
428            watcher,
429            running: Arc::new(Mutex::new(false)),
430        }
431    }
432
433    /// Start the sync service
434    pub async fn start(&self) -> Result<()> {
435        let mut running = self.running.lock().await;
436        *running = true;
437        info!("Sync service started");
438        Ok(())
439    }
440
441    /// Stop the sync service
442    pub async fn stop(&self) -> Result<()> {
443        let mut running = self.running.lock().await;
444        *running = false;
445
446        let mut watcher = self.watcher.lock().await;
447        watcher.stop_all().await?;
448        info!("Sync service stopped");
449        Ok(())
450    }
451
452    /// Start monitoring a workspace
453    pub async fn monitor_workspace(&self, workspace_id: &str, directory: &str) -> Result<()> {
454        let mut watcher = self.watcher.lock().await;
455        watcher.start_monitoring(workspace_id, directory).await?;
456        Ok(())
457    }
458
459    /// Stop monitoring a workspace
460    pub async fn stop_monitoring_workspace(&self, workspace_id: &str) -> Result<()> {
461        let mut watcher = self.watcher.lock().await;
462        watcher.stop_monitoring(workspace_id).await?;
463        Ok(())
464    }
465
466    /// Get monitoring status
467    pub async fn is_workspace_monitored(&self, workspace_id: &str) -> bool {
468        let watcher = self.watcher.lock().await;
469        watcher.is_monitoring(workspace_id).await
470    }
471}
472
473#[cfg(test)]
474mod tests {
475    use super::*;
476    use tempfile::TempDir;
477
478    #[tokio::test]
479    async fn test_sync_service_creation() {
480        let temp_dir = TempDir::new().unwrap();
481        let service = SyncService::new(temp_dir.path());
482
483        assert!(!*service.running.lock().await);
484    }
485
486    #[tokio::test]
487    async fn test_sync_service_lifecycle() {
488        let temp_dir = TempDir::new().unwrap();
489        let service = SyncService::new(temp_dir.path());
490
491        // Start service
492        service.start().await.unwrap();
493        assert!(*service.running.lock().await);
494
495        // Stop service
496        service.stop().await.unwrap();
497        assert!(!*service.running.lock().await);
498    }
499}