1use crate::workspace_persistence::WorkspacePersistence;
7use crate::{Error, Result};
8use notify::{Config, Event, RecommendedWatcher, Watcher};
9use std::collections::HashMap;
10use std::path::{Path, PathBuf};
11use std::sync::Arc;
12use tokio::sync::{mpsc, Mutex};
13use tokio::time::{timeout, Duration};
14use tracing::{debug, error, info, warn};
15
16pub struct SyncWatcher {
18 watchers: HashMap<String, RecommendedWatcher>,
20 running: Arc<Mutex<bool>>,
22 persistence: Arc<WorkspacePersistence>,
24}
25
26#[derive(Debug, Clone)]
27pub enum SyncEvent {
28 FileCreated {
30 workspace_id: String,
31 path: PathBuf,
32 content: String,
33 },
34 FileModified {
36 workspace_id: String,
37 path: PathBuf,
38 content: String,
39 },
40 FileDeleted { workspace_id: String, path: PathBuf },
42 DirectoryChanged {
44 workspace_id: String,
45 changes: Vec<FileChange>,
46 },
47}
48
49#[derive(Debug, Clone)]
50pub struct FileChange {
51 pub path: PathBuf,
52 pub kind: ChangeKind,
53 pub content: Option<String>,
54}
55
56#[derive(Debug, Clone)]
57pub enum ChangeKind {
58 Created,
59 Modified,
60 Deleted,
61}
62
63impl SyncWatcher {
64 pub fn new<P: AsRef<Path>>(workspace_dir: P) -> Self {
66 let persistence = Arc::new(WorkspacePersistence::new(workspace_dir));
67
68 Self {
69 watchers: HashMap::new(),
70 running: Arc::new(Mutex::new(false)),
71 persistence,
72 }
73 }
74
75 pub async fn start_monitoring(&mut self, workspace_id: &str, directory: &str) -> Result<()> {
77 let directory_path = PathBuf::from(directory);
78
79 if !directory_path.exists() {
81 std::fs::create_dir_all(&directory_path)
82 .map_err(|e| Error::generic(format!("Failed to create sync directory: {}", e)))?;
83 }
84
85 let (tx, mut rx) = mpsc::channel(100);
86 let workspace_id_string = workspace_id.to_string();
87 let workspace_id_for_watcher = workspace_id_string.clone();
88 let workspace_id_for_processing = workspace_id_string.clone();
89 let directory_path_clone = directory_path.clone();
90 let directory_path_for_processing = directory_path.clone();
91 let directory_str = directory.to_string();
92
93 let config = Config::default()
94 .with_poll_interval(Duration::from_secs(1))
95 .with_compare_contents(true);
96
97 let mut watcher = RecommendedWatcher::new(
98 move |res: notify::Result<Event>| {
99 if let Ok(event) = res {
100 debug!("File system event: {:?}", event);
101 let tx_clone = tx.clone();
102 let workspace_id_clone = workspace_id_string.clone();
103 let dir_clone = directory_path_clone.clone();
104
105 tokio::spawn(async move {
106 if let Err(e) = Self::handle_fs_event(
107 &tx_clone,
108 &workspace_id_clone,
109 &dir_clone,
110 &event,
111 )
112 .await
113 {
114 error!("Failed to handle file system event: {}", e);
115 }
116 });
117 }
118 },
119 config,
120 )
121 .map_err(|e| Error::generic(format!("Failed to create file watcher: {}", e)))?;
122
123 watcher
125 .watch(&directory_path, notify::RecursiveMode::Recursive)
126 .map_err(|e| Error::generic(format!("Failed to watch directory: {}", e)))?;
127
128 self.watchers.insert(workspace_id_for_watcher, watcher);
130
131 let persistence_clone = self.persistence.clone();
133 let is_running = self.running.clone();
134
135 tokio::spawn(async move {
136 info!(
137 "Started monitoring workspace {} in directory {}",
138 workspace_id_for_processing, directory_str
139 );
140 println!(
141 "đ Monitoring workspace '{}' in directory: {}",
142 workspace_id_for_processing, directory_str
143 );
144
145 while *is_running.lock().await {
146 match timeout(Duration::from_millis(100), rx.recv()).await {
147 Ok(Some(event)) => {
148 if let Err(e) = Self::process_sync_event(
149 &persistence_clone,
150 &workspace_id_for_processing,
151 &directory_path_for_processing,
152 event,
153 )
154 .await
155 {
156 error!("Failed to process sync event: {}", e);
157 eprintln!("â Sync error: {}", e);
158 }
159 }
160 Ok(None) => break, Err(_) => continue, }
163 }
164
165 info!(
166 "Stopped monitoring workspace {} in directory {}",
167 workspace_id_for_processing, directory_str
168 );
169 println!(
170 "âšī¸ Stopped monitoring workspace '{}' in directory: {}",
171 workspace_id_for_processing, directory_str
172 );
173 });
174
175 Ok(())
176 }
177
178 pub async fn stop_monitoring(&mut self, workspace_id: &str) -> Result<()> {
180 if let Some(watcher) = self.watchers.remove(workspace_id) {
181 drop(watcher);
183 }
184
185 Ok(())
186 }
187
188 pub async fn stop_all(&mut self) -> Result<()> {
190 *self.running.lock().await = false;
191 self.watchers.clear();
192 Ok(())
193 }
194
195 async fn handle_fs_event(
197 tx: &mpsc::Sender<SyncEvent>,
198 workspace_id: &str,
199 base_dir: &Path,
200 event: &Event,
201 ) -> Result<()> {
202 let mut changes = Vec::new();
203
204 for path in &event.paths {
205 let relative_path = path.strip_prefix(base_dir).unwrap_or(path);
207
208 if relative_path.starts_with(".")
210 || relative_path
211 .file_name()
212 .map(|n| n.to_string_lossy().starts_with("."))
213 .unwrap_or(false)
214 {
215 continue;
216 }
217
218 if let Some(extension) = path.extension() {
220 if extension != "yaml" && extension != "yml" {
221 continue;
222 }
223 }
224
225 match event.kind {
226 notify::EventKind::Create(_) => {
227 if let Ok(content) = tokio::fs::read_to_string(&path).await {
228 changes.push(FileChange {
229 path: relative_path.to_path_buf(),
230 kind: ChangeKind::Created,
231 content: Some(content),
232 });
233 }
234 }
235 notify::EventKind::Modify(_) => {
236 if let Ok(content) = tokio::fs::read_to_string(&path).await {
237 changes.push(FileChange {
238 path: relative_path.to_path_buf(),
239 kind: ChangeKind::Modified,
240 content: Some(content),
241 });
242 }
243 }
244 notify::EventKind::Remove(_) => {
245 changes.push(FileChange {
246 path: relative_path.to_path_buf(),
247 kind: ChangeKind::Deleted,
248 content: None,
249 });
250 }
251 _ => {}
252 }
253 }
254
255 if !changes.is_empty() {
256 let _ = tx
257 .send(SyncEvent::DirectoryChanged {
258 workspace_id: workspace_id.to_string(),
259 changes,
260 })
261 .await;
262 }
263
264 Ok(())
265 }
266
267 async fn process_sync_event(
269 persistence: &WorkspacePersistence,
270 _workspace_id: &str,
271 _directory: &Path,
272 event: SyncEvent,
273 ) -> Result<()> {
274 if let SyncEvent::DirectoryChanged {
275 workspace_id,
276 changes,
277 } = event
278 {
279 info!("Processing {} file changes for workspace {}", changes.len(), workspace_id);
280
281 if !changes.is_empty() {
282 println!(
283 "đ Detected {} file change{} in workspace '{}'",
284 changes.len(),
285 if changes.len() == 1 { "" } else { "s" },
286 workspace_id
287 );
288 }
289
290 for change in changes {
291 match change.kind {
292 ChangeKind::Created => {
293 println!(" â Created: {}", change.path.display());
294 if let Some(content) = change.content {
295 if let Err(e) = Self::import_yaml_content(
296 persistence,
297 &workspace_id,
298 &change.path,
299 &content,
300 )
301 .await
302 {
303 warn!("Failed to import file {}: {}", change.path.display(), e);
304 eprintln!(" â ī¸ Failed to import: {}", e);
305 } else {
306 println!(" â
Successfully imported");
307 }
308 }
309 }
310 ChangeKind::Modified => {
311 println!(" đ Modified: {}", change.path.display());
312 if let Some(content) = change.content {
313 if let Err(e) = Self::import_yaml_content(
314 persistence,
315 &workspace_id,
316 &change.path,
317 &content,
318 )
319 .await
320 {
321 warn!("Failed to import file {}: {}", change.path.display(), e);
322 eprintln!(" â ī¸ Failed to import: {}", e);
323 } else {
324 println!(" â
Successfully updated");
325 }
326 }
327 }
328 ChangeKind::Deleted => {
329 println!(" đī¸ Deleted: {}", change.path.display());
330 println!(" âšī¸ Auto-deletion from workspace is disabled");
331 debug!("File deleted: {}", change.path.display());
332 }
335 }
336 }
337 }
338
339 Ok(())
340 }
341
342 async fn import_yaml_content(
344 persistence: &WorkspacePersistence,
345 workspace_id: &str,
346 path: &Path,
347 content: &str,
348 ) -> Result<()> {
349 let workspace = persistence.load_workspace(workspace_id).await?;
351
352 if !matches!(workspace.get_sync_direction(), crate::workspace::SyncDirection::Bidirectional)
354 {
355 debug!("Workspace {} is not configured for bidirectional sync", workspace_id);
356 return Ok(());
357 }
358
359 if let Ok(_export) =
361 serde_yaml::from_str::<crate::workspace_persistence::WorkspaceExport>(content)
362 {
363 info!(
366 "Detected workspace export for {}, skipping full import to avoid conflicts",
367 workspace_id
368 );
369 debug!("Skipping workspace export to avoid conflicts");
370 return Ok(());
371 }
372
373 if let Ok(request) = serde_yaml::from_str::<crate::workspace::MockRequest>(content) {
375 debug!("Importing request {} from {}", request.name, path.display());
377
378 let mut workspace = persistence.load_workspace(workspace_id).await?;
379 workspace.add_request(request)?;
381 persistence.save_workspace(&workspace).await?;
382
383 info!(
384 "Successfully imported request from {} into workspace {}",
385 path.display(),
386 workspace_id
387 );
388 } else {
389 debug!("Content in {} is not a recognized format, skipping", path.display());
390 return Err(Error::generic(
391 "File is not a recognized format (expected MockRequest YAML)".to_string(),
392 ));
393 }
394
395 Ok(())
396 }
397
398 pub async fn is_monitoring(&self, workspace_id: &str) -> bool {
400 self.watchers.contains_key(workspace_id)
401 }
402
403 pub fn get_monitored_workspaces(&self) -> Vec<String> {
405 self.watchers.keys().cloned().collect()
406 }
407}
408
409impl Drop for SyncWatcher {
410 fn drop(&mut self) {
411 }
414}
415
416pub struct SyncService {
418 watcher: Arc<Mutex<SyncWatcher>>,
419 running: Arc<Mutex<bool>>,
420}
421
422impl SyncService {
423 pub fn new<P: AsRef<Path>>(workspace_dir: P) -> Self {
425 let watcher = Arc::new(Mutex::new(SyncWatcher::new(workspace_dir)));
426
427 Self {
428 watcher,
429 running: Arc::new(Mutex::new(false)),
430 }
431 }
432
433 pub async fn start(&self) -> Result<()> {
435 let mut running = self.running.lock().await;
436 *running = true;
437 info!("Sync service started");
438 Ok(())
439 }
440
441 pub async fn stop(&self) -> Result<()> {
443 let mut running = self.running.lock().await;
444 *running = false;
445
446 let mut watcher = self.watcher.lock().await;
447 watcher.stop_all().await?;
448 info!("Sync service stopped");
449 Ok(())
450 }
451
452 pub async fn monitor_workspace(&self, workspace_id: &str, directory: &str) -> Result<()> {
454 let mut watcher = self.watcher.lock().await;
455 watcher.start_monitoring(workspace_id, directory).await?;
456 Ok(())
457 }
458
459 pub async fn stop_monitoring_workspace(&self, workspace_id: &str) -> Result<()> {
461 let mut watcher = self.watcher.lock().await;
462 watcher.stop_monitoring(workspace_id).await?;
463 Ok(())
464 }
465
466 pub async fn is_workspace_monitored(&self, workspace_id: &str) -> bool {
468 let watcher = self.watcher.lock().await;
469 watcher.is_monitoring(workspace_id).await
470 }
471}
472
473#[cfg(test)]
474mod tests {
475 use super::*;
476 use tempfile::TempDir;
477
478 #[tokio::test]
479 async fn test_sync_service_creation() {
480 let temp_dir = TempDir::new().unwrap();
481 let service = SyncService::new(temp_dir.path());
482
483 assert!(!*service.running.lock().await);
484 }
485
486 #[tokio::test]
487 async fn test_sync_service_lifecycle() {
488 let temp_dir = TempDir::new().unwrap();
489 let service = SyncService::new(temp_dir.path());
490
491 service.start().await.unwrap();
493 assert!(*service.running.lock().await);
494
495 service.stop().await.unwrap();
497 assert!(!*service.running.lock().await);
498 }
499}