1use crate::workspace_persistence::WorkspacePersistence;
7use crate::{Error, Result};
8use notify::{Config, Event, RecommendedWatcher, Watcher};
9use std::collections::HashMap;
10use std::path::{Path, PathBuf};
11use std::sync::Arc;
12use tokio::sync::{mpsc, Mutex};
13use tokio::time::{timeout, Duration};
14use tracing::{debug, error, info, warn};
15
16pub struct SyncWatcher {
18 watchers: HashMap<String, RecommendedWatcher>,
20 running: Arc<Mutex<bool>>,
22 persistence: Arc<WorkspacePersistence>,
24}
25
26#[derive(Debug, Clone)]
28pub enum SyncEvent {
29 FileCreated {
31 workspace_id: String,
33 path: PathBuf,
35 content: String,
37 },
38 FileModified {
40 workspace_id: String,
42 path: PathBuf,
44 content: String,
46 },
47 FileDeleted {
49 workspace_id: String,
51 path: PathBuf,
53 },
54 DirectoryChanged {
56 workspace_id: String,
58 changes: Vec<FileChange>,
60 },
61}
62
63#[derive(Debug, Clone)]
65pub struct FileChange {
66 pub path: PathBuf,
68 pub kind: ChangeKind,
70 pub content: Option<String>,
72}
73
74#[derive(Debug, Clone)]
76pub enum ChangeKind {
77 Created,
79 Modified,
81 Deleted,
83}
84
85impl SyncWatcher {
86 pub fn new<P: AsRef<Path>>(workspace_dir: P) -> Self {
88 let persistence = Arc::new(WorkspacePersistence::new(workspace_dir));
89
90 Self {
91 watchers: HashMap::new(),
92 running: Arc::new(Mutex::new(false)),
93 persistence,
94 }
95 }
96
97 pub async fn start_monitoring(&mut self, workspace_id: &str, directory: &str) -> Result<()> {
99 let directory_path = PathBuf::from(directory);
100
101 if !directory_path.exists() {
103 std::fs::create_dir_all(&directory_path)
104 .map_err(|e| Error::generic(format!("Failed to create sync directory: {}", e)))?;
105 }
106
107 let (tx, mut rx) = mpsc::channel(100);
108 let workspace_id_string = workspace_id.to_string();
109 let workspace_id_for_watcher = workspace_id_string.clone();
110 let workspace_id_for_processing = workspace_id_string.clone();
111 let directory_path_clone = directory_path.clone();
112 let directory_path_for_processing = directory_path.clone();
113 let directory_str = directory.to_string();
114
115 let config = Config::default()
116 .with_poll_interval(Duration::from_secs(1))
117 .with_compare_contents(true);
118
119 let mut watcher = RecommendedWatcher::new(
120 move |res: notify::Result<Event>| {
121 if let Ok(event) = res {
122 debug!("File system event: {:?}", event);
123 let tx_clone = tx.clone();
124 let workspace_id_clone = workspace_id_string.clone();
125 let dir_clone = directory_path_clone.clone();
126
127 tokio::spawn(async move {
128 if let Err(e) = Self::handle_fs_event(
129 &tx_clone,
130 &workspace_id_clone,
131 &dir_clone,
132 &event,
133 )
134 .await
135 {
136 error!("Failed to handle file system event: {}", e);
137 }
138 });
139 }
140 },
141 config,
142 )
143 .map_err(|e| Error::generic(format!("Failed to create file watcher: {}", e)))?;
144
145 watcher
147 .watch(&directory_path, notify::RecursiveMode::Recursive)
148 .map_err(|e| Error::generic(format!("Failed to watch directory: {}", e)))?;
149
150 self.watchers.insert(workspace_id_for_watcher, watcher);
152
153 let persistence_clone = self.persistence.clone();
155 let is_running = self.running.clone();
156
157 tokio::spawn(async move {
158 info!(
159 "Started monitoring workspace {} in directory {}",
160 workspace_id_for_processing, directory_str
161 );
162 info!(
163 workspace_id = %workspace_id_for_processing,
164 directory = %directory_str,
165 "Monitoring workspace directory"
166 );
167
168 while *is_running.lock().await {
169 match timeout(Duration::from_millis(100), rx.recv()).await {
170 Ok(Some(event)) => {
171 if let Err(e) = Self::process_sync_event(
172 &persistence_clone,
173 &workspace_id_for_processing,
174 &directory_path_for_processing,
175 event,
176 )
177 .await
178 {
179 error!("Failed to process sync event: {}", e);
180 }
181 }
182 Ok(None) => break, Err(_) => continue, }
185 }
186
187 info!(
188 "Stopped monitoring workspace {} in directory {}",
189 workspace_id_for_processing, directory_str
190 );
191 info!(
192 workspace_id = %workspace_id_for_processing,
193 directory = %directory_str,
194 "Stopped monitoring workspace directory"
195 );
196 });
197
198 Ok(())
199 }
200
201 pub async fn stop_monitoring(&mut self, workspace_id: &str) -> Result<()> {
203 if let Some(watcher) = self.watchers.remove(workspace_id) {
204 drop(watcher);
206 }
207
208 Ok(())
209 }
210
211 pub async fn stop_all(&mut self) -> Result<()> {
213 *self.running.lock().await = false;
214 self.watchers.clear();
215 Ok(())
216 }
217
218 async fn handle_fs_event(
220 tx: &mpsc::Sender<SyncEvent>,
221 workspace_id: &str,
222 base_dir: &Path,
223 event: &Event,
224 ) -> Result<()> {
225 let mut changes = Vec::new();
226
227 for path in &event.paths {
228 let relative_path = path.strip_prefix(base_dir).unwrap_or(path);
230
231 if relative_path.starts_with(".")
233 || relative_path
234 .file_name()
235 .map(|n| n.to_string_lossy().starts_with("."))
236 .unwrap_or(false)
237 {
238 continue;
239 }
240
241 if let Some(extension) = path.extension() {
243 if extension != "yaml" && extension != "yml" {
244 continue;
245 }
246 }
247
248 match event.kind {
249 notify::EventKind::Create(_) => {
250 if let Ok(content) = tokio::fs::read_to_string(&path).await {
251 changes.push(FileChange {
252 path: relative_path.to_path_buf(),
253 kind: ChangeKind::Created,
254 content: Some(content),
255 });
256 }
257 }
258 notify::EventKind::Modify(_) => {
259 if let Ok(content) = tokio::fs::read_to_string(&path).await {
260 changes.push(FileChange {
261 path: relative_path.to_path_buf(),
262 kind: ChangeKind::Modified,
263 content: Some(content),
264 });
265 }
266 }
267 notify::EventKind::Remove(_) => {
268 changes.push(FileChange {
269 path: relative_path.to_path_buf(),
270 kind: ChangeKind::Deleted,
271 content: None,
272 });
273 }
274 _ => {}
275 }
276 }
277
278 if !changes.is_empty() {
279 let _ = tx
280 .send(SyncEvent::DirectoryChanged {
281 workspace_id: workspace_id.to_string(),
282 changes,
283 })
284 .await;
285 }
286
287 Ok(())
288 }
289
290 async fn process_sync_event(
292 persistence: &WorkspacePersistence,
293 _workspace_id: &str,
294 _directory: &Path,
295 event: SyncEvent,
296 ) -> Result<()> {
297 if let SyncEvent::DirectoryChanged {
298 workspace_id,
299 changes,
300 } = event
301 {
302 info!("Processing {} file changes for workspace {}", changes.len(), workspace_id);
303
304 if !changes.is_empty() {
305 info!(
306 workspace_id = %workspace_id,
307 count = changes.len(),
308 "Detected file changes in workspace"
309 );
310 }
311
312 for change in changes {
313 match change.kind {
314 ChangeKind::Created => {
315 info!(path = %change.path.display(), "File created");
316 if let Some(content) = change.content {
317 if let Err(e) = Self::import_yaml_content(
318 persistence,
319 &workspace_id,
320 &change.path,
321 &content,
322 )
323 .await
324 {
325 warn!("Failed to import file {}: {}", change.path.display(), e);
326 } else {
327 info!(path = %change.path.display(), "Successfully imported");
328 }
329 }
330 }
331 ChangeKind::Modified => {
332 info!(path = %change.path.display(), "File modified");
333 if let Some(content) = change.content {
334 if let Err(e) = Self::import_yaml_content(
335 persistence,
336 &workspace_id,
337 &change.path,
338 &content,
339 )
340 .await
341 {
342 warn!("Failed to import file {}: {}", change.path.display(), e);
343 } else {
344 info!(path = %change.path.display(), "Successfully updated");
345 }
346 }
347 }
348 ChangeKind::Deleted => {
349 info!(
350 path = %change.path.display(),
351 workspace_id = %workspace_id,
352 "File deleted from watched directory — workspace may be out of sync. \
353 Re-export workspace or restart sync to reconcile."
354 );
355 }
356 }
357 }
358 }
359
360 Ok(())
361 }
362
363 async fn import_yaml_content(
365 persistence: &WorkspacePersistence,
366 workspace_id: &str,
367 path: &Path,
368 content: &str,
369 ) -> Result<()> {
370 let workspace = persistence.load_workspace(workspace_id).await?;
372
373 if !matches!(workspace.get_sync_direction(), crate::workspace::SyncDirection::Bidirectional)
375 {
376 debug!("Workspace {} is not configured for bidirectional sync", workspace_id);
377 return Ok(());
378 }
379
380 if let Ok(_export) =
382 serde_yaml::from_str::<crate::workspace_persistence::WorkspaceExport>(content)
383 {
384 info!(
387 "Detected workspace export for {}, skipping full import to avoid conflicts",
388 workspace_id
389 );
390 debug!("Skipping workspace export to avoid conflicts");
391 return Ok(());
392 }
393
394 if let Ok(request) = serde_yaml::from_str::<crate::workspace::MockRequest>(content) {
396 debug!("Importing request {} from {}", request.name, path.display());
398
399 let mut workspace = persistence.load_workspace(workspace_id).await?;
400 workspace.add_request(request)?;
402 persistence.save_workspace(&workspace).await?;
403
404 info!(
405 "Successfully imported request from {} into workspace {}",
406 path.display(),
407 workspace_id
408 );
409 } else {
410 debug!("Content in {} is not a recognized format, skipping", path.display());
411 return Err(Error::generic(
412 "File is not a recognized format (expected MockRequest YAML)".to_string(),
413 ));
414 }
415
416 Ok(())
417 }
418
419 pub async fn is_monitoring(&self, workspace_id: &str) -> bool {
421 self.watchers.contains_key(workspace_id)
422 }
423
424 pub fn get_monitored_workspaces(&self) -> Vec<String> {
426 self.watchers.keys().cloned().collect()
427 }
428}
429
430impl Drop for SyncWatcher {
431 fn drop(&mut self) {
432 }
435}
436
437pub struct SyncService {
439 watcher: Arc<Mutex<SyncWatcher>>,
440 running: Arc<Mutex<bool>>,
441}
442
443impl SyncService {
444 pub fn new<P: AsRef<Path>>(workspace_dir: P) -> Self {
446 let watcher = Arc::new(Mutex::new(SyncWatcher::new(workspace_dir)));
447
448 Self {
449 watcher,
450 running: Arc::new(Mutex::new(false)),
451 }
452 }
453
454 pub async fn start(&self) -> Result<()> {
456 let mut running = self.running.lock().await;
457 *running = true;
458 info!("Sync service started");
459 Ok(())
460 }
461
462 pub async fn stop(&self) -> Result<()> {
464 let mut running = self.running.lock().await;
465 *running = false;
466
467 let mut watcher = self.watcher.lock().await;
468 watcher.stop_all().await?;
469 info!("Sync service stopped");
470 Ok(())
471 }
472
473 pub async fn monitor_workspace(&self, workspace_id: &str, directory: &str) -> Result<()> {
475 let mut watcher = self.watcher.lock().await;
476 watcher.start_monitoring(workspace_id, directory).await?;
477 Ok(())
478 }
479
480 pub async fn stop_monitoring_workspace(&self, workspace_id: &str) -> Result<()> {
482 let mut watcher = self.watcher.lock().await;
483 watcher.stop_monitoring(workspace_id).await?;
484 Ok(())
485 }
486
487 pub async fn is_workspace_monitored(&self, workspace_id: &str) -> bool {
489 let watcher = self.watcher.lock().await;
490 watcher.is_monitoring(workspace_id).await
491 }
492}
493
494#[cfg(test)]
495mod tests {
496 use super::*;
497 use tempfile::TempDir;
498
499 #[tokio::test]
500 async fn test_sync_service_creation() {
501 let temp_dir = TempDir::new().unwrap();
502 let service = SyncService::new(temp_dir.path());
503
504 assert!(!*service.running.lock().await);
505 }
506
507 #[tokio::test]
508 async fn test_sync_service_lifecycle() {
509 let temp_dir = TempDir::new().unwrap();
510 let service = SyncService::new(temp_dir.path());
511
512 service.start().await.unwrap();
514 assert!(*service.running.lock().await);
515
516 service.stop().await.unwrap();
518 assert!(!*service.running.lock().await);
519 }
520}