1use crate::workspace_persistence::WorkspacePersistence;
7use crate::{Error, Result};
8use notify::{Config, Event, RecommendedWatcher, Watcher};
9use std::collections::HashMap;
10use std::path::{Path, PathBuf};
11use std::sync::Arc;
12use tokio::sync::{mpsc, Mutex};
13use tokio::time::{timeout, Duration};
14use tracing::{debug, error, info, warn};
15
16pub struct SyncWatcher {
18 watchers: HashMap<String, RecommendedWatcher>,
20 running: Arc<Mutex<bool>>,
22 persistence: Arc<WorkspacePersistence>,
24}
25
26#[derive(Debug, Clone)]
28pub enum SyncEvent {
29 FileCreated {
31 workspace_id: String,
33 path: PathBuf,
35 content: String,
37 },
38 FileModified {
40 workspace_id: String,
42 path: PathBuf,
44 content: String,
46 },
47 FileDeleted {
49 workspace_id: String,
51 path: PathBuf,
53 },
54 DirectoryChanged {
56 workspace_id: String,
58 changes: Vec<FileChange>,
60 },
61}
62
63#[derive(Debug, Clone)]
65pub struct FileChange {
66 pub path: PathBuf,
68 pub kind: ChangeKind,
70 pub content: Option<String>,
72}
73
74#[derive(Debug, Clone)]
76pub enum ChangeKind {
77 Created,
79 Modified,
81 Deleted,
83}
84
85impl SyncWatcher {
86 pub fn new<P: AsRef<Path>>(workspace_dir: P) -> Self {
88 let persistence = Arc::new(WorkspacePersistence::new(workspace_dir));
89
90 Self {
91 watchers: HashMap::new(),
92 running: Arc::new(Mutex::new(false)),
93 persistence,
94 }
95 }
96
97 pub async fn start_monitoring(&mut self, workspace_id: &str, directory: &str) -> Result<()> {
99 let directory_path = PathBuf::from(directory);
100
101 if !directory_path.exists() {
103 std::fs::create_dir_all(&directory_path)
104 .map_err(|e| Error::generic(format!("Failed to create sync directory: {}", e)))?;
105 }
106
107 let (tx, mut rx) = mpsc::channel(100);
108 let workspace_id_string = workspace_id.to_string();
109 let workspace_id_for_watcher = workspace_id_string.clone();
110 let workspace_id_for_processing = workspace_id_string.clone();
111 let directory_path_clone = directory_path.clone();
112 let directory_path_for_processing = directory_path.clone();
113 let directory_str = directory.to_string();
114
115 let config = Config::default()
116 .with_poll_interval(Duration::from_secs(1))
117 .with_compare_contents(true);
118
119 let mut watcher = RecommendedWatcher::new(
120 move |res: notify::Result<Event>| {
121 if let Ok(event) = res {
122 debug!("File system event: {:?}", event);
123 let tx_clone = tx.clone();
124 let workspace_id_clone = workspace_id_string.clone();
125 let dir_clone = directory_path_clone.clone();
126
127 tokio::spawn(async move {
128 if let Err(e) = Self::handle_fs_event(
129 &tx_clone,
130 &workspace_id_clone,
131 &dir_clone,
132 &event,
133 )
134 .await
135 {
136 error!("Failed to handle file system event: {}", e);
137 }
138 });
139 }
140 },
141 config,
142 )
143 .map_err(|e| Error::generic(format!("Failed to create file watcher: {}", e)))?;
144
145 watcher
147 .watch(&directory_path, notify::RecursiveMode::Recursive)
148 .map_err(|e| Error::generic(format!("Failed to watch directory: {}", e)))?;
149
150 self.watchers.insert(workspace_id_for_watcher, watcher);
152
153 let persistence_clone = self.persistence.clone();
155 let is_running = self.running.clone();
156
157 tokio::spawn(async move {
158 info!(
159 "Started monitoring workspace {} in directory {}",
160 workspace_id_for_processing, directory_str
161 );
162 println!(
163 "đ Monitoring workspace '{}' in directory: {}",
164 workspace_id_for_processing, directory_str
165 );
166
167 while *is_running.lock().await {
168 match timeout(Duration::from_millis(100), rx.recv()).await {
169 Ok(Some(event)) => {
170 if let Err(e) = Self::process_sync_event(
171 &persistence_clone,
172 &workspace_id_for_processing,
173 &directory_path_for_processing,
174 event,
175 )
176 .await
177 {
178 error!("Failed to process sync event: {}", e);
179 eprintln!("â Sync error: {}", e);
180 }
181 }
182 Ok(None) => break, Err(_) => continue, }
185 }
186
187 info!(
188 "Stopped monitoring workspace {} in directory {}",
189 workspace_id_for_processing, directory_str
190 );
191 println!(
192 "âšī¸ Stopped monitoring workspace '{}' in directory: {}",
193 workspace_id_for_processing, directory_str
194 );
195 });
196
197 Ok(())
198 }
199
200 pub async fn stop_monitoring(&mut self, workspace_id: &str) -> Result<()> {
202 if let Some(watcher) = self.watchers.remove(workspace_id) {
203 drop(watcher);
205 }
206
207 Ok(())
208 }
209
210 pub async fn stop_all(&mut self) -> Result<()> {
212 *self.running.lock().await = false;
213 self.watchers.clear();
214 Ok(())
215 }
216
217 async fn handle_fs_event(
219 tx: &mpsc::Sender<SyncEvent>,
220 workspace_id: &str,
221 base_dir: &Path,
222 event: &Event,
223 ) -> Result<()> {
224 let mut changes = Vec::new();
225
226 for path in &event.paths {
227 let relative_path = path.strip_prefix(base_dir).unwrap_or(path);
229
230 if relative_path.starts_with(".")
232 || relative_path
233 .file_name()
234 .map(|n| n.to_string_lossy().starts_with("."))
235 .unwrap_or(false)
236 {
237 continue;
238 }
239
240 if let Some(extension) = path.extension() {
242 if extension != "yaml" && extension != "yml" {
243 continue;
244 }
245 }
246
247 match event.kind {
248 notify::EventKind::Create(_) => {
249 if let Ok(content) = tokio::fs::read_to_string(&path).await {
250 changes.push(FileChange {
251 path: relative_path.to_path_buf(),
252 kind: ChangeKind::Created,
253 content: Some(content),
254 });
255 }
256 }
257 notify::EventKind::Modify(_) => {
258 if let Ok(content) = tokio::fs::read_to_string(&path).await {
259 changes.push(FileChange {
260 path: relative_path.to_path_buf(),
261 kind: ChangeKind::Modified,
262 content: Some(content),
263 });
264 }
265 }
266 notify::EventKind::Remove(_) => {
267 changes.push(FileChange {
268 path: relative_path.to_path_buf(),
269 kind: ChangeKind::Deleted,
270 content: None,
271 });
272 }
273 _ => {}
274 }
275 }
276
277 if !changes.is_empty() {
278 let _ = tx
279 .send(SyncEvent::DirectoryChanged {
280 workspace_id: workspace_id.to_string(),
281 changes,
282 })
283 .await;
284 }
285
286 Ok(())
287 }
288
289 async fn process_sync_event(
291 persistence: &WorkspacePersistence,
292 _workspace_id: &str,
293 _directory: &Path,
294 event: SyncEvent,
295 ) -> Result<()> {
296 if let SyncEvent::DirectoryChanged {
297 workspace_id,
298 changes,
299 } = event
300 {
301 info!("Processing {} file changes for workspace {}", changes.len(), workspace_id);
302
303 if !changes.is_empty() {
304 println!(
305 "đ Detected {} file change{} in workspace '{}'",
306 changes.len(),
307 if changes.len() == 1 { "" } else { "s" },
308 workspace_id
309 );
310 }
311
312 for change in changes {
313 match change.kind {
314 ChangeKind::Created => {
315 println!(" â Created: {}", change.path.display());
316 if let Some(content) = change.content {
317 if let Err(e) = Self::import_yaml_content(
318 persistence,
319 &workspace_id,
320 &change.path,
321 &content,
322 )
323 .await
324 {
325 warn!("Failed to import file {}: {}", change.path.display(), e);
326 eprintln!(" â ī¸ Failed to import: {}", e);
327 } else {
328 println!(" â
Successfully imported");
329 }
330 }
331 }
332 ChangeKind::Modified => {
333 println!(" đ Modified: {}", change.path.display());
334 if let Some(content) = change.content {
335 if let Err(e) = Self::import_yaml_content(
336 persistence,
337 &workspace_id,
338 &change.path,
339 &content,
340 )
341 .await
342 {
343 warn!("Failed to import file {}: {}", change.path.display(), e);
344 eprintln!(" â ī¸ Failed to import: {}", e);
345 } else {
346 println!(" â
Successfully updated");
347 }
348 }
349 }
350 ChangeKind::Deleted => {
351 println!(" đī¸ Deleted: {}", change.path.display());
352 println!(" âšī¸ Auto-deletion from workspace is disabled");
353 debug!("File deleted: {}", change.path.display());
354 }
357 }
358 }
359 }
360
361 Ok(())
362 }
363
364 async fn import_yaml_content(
366 persistence: &WorkspacePersistence,
367 workspace_id: &str,
368 path: &Path,
369 content: &str,
370 ) -> Result<()> {
371 let workspace = persistence.load_workspace(workspace_id).await?;
373
374 if !matches!(workspace.get_sync_direction(), crate::workspace::SyncDirection::Bidirectional)
376 {
377 debug!("Workspace {} is not configured for bidirectional sync", workspace_id);
378 return Ok(());
379 }
380
381 if let Ok(_export) =
383 serde_yaml::from_str::<crate::workspace_persistence::WorkspaceExport>(content)
384 {
385 info!(
388 "Detected workspace export for {}, skipping full import to avoid conflicts",
389 workspace_id
390 );
391 debug!("Skipping workspace export to avoid conflicts");
392 return Ok(());
393 }
394
395 if let Ok(request) = serde_yaml::from_str::<crate::workspace::MockRequest>(content) {
397 debug!("Importing request {} from {}", request.name, path.display());
399
400 let mut workspace = persistence.load_workspace(workspace_id).await?;
401 workspace.add_request(request)?;
403 persistence.save_workspace(&workspace).await?;
404
405 info!(
406 "Successfully imported request from {} into workspace {}",
407 path.display(),
408 workspace_id
409 );
410 } else {
411 debug!("Content in {} is not a recognized format, skipping", path.display());
412 return Err(Error::generic(
413 "File is not a recognized format (expected MockRequest YAML)".to_string(),
414 ));
415 }
416
417 Ok(())
418 }
419
420 pub async fn is_monitoring(&self, workspace_id: &str) -> bool {
422 self.watchers.contains_key(workspace_id)
423 }
424
425 pub fn get_monitored_workspaces(&self) -> Vec<String> {
427 self.watchers.keys().cloned().collect()
428 }
429}
430
431impl Drop for SyncWatcher {
432 fn drop(&mut self) {
433 }
436}
437
438pub struct SyncService {
440 watcher: Arc<Mutex<SyncWatcher>>,
441 running: Arc<Mutex<bool>>,
442}
443
444impl SyncService {
445 pub fn new<P: AsRef<Path>>(workspace_dir: P) -> Self {
447 let watcher = Arc::new(Mutex::new(SyncWatcher::new(workspace_dir)));
448
449 Self {
450 watcher,
451 running: Arc::new(Mutex::new(false)),
452 }
453 }
454
455 pub async fn start(&self) -> Result<()> {
457 let mut running = self.running.lock().await;
458 *running = true;
459 info!("Sync service started");
460 Ok(())
461 }
462
463 pub async fn stop(&self) -> Result<()> {
465 let mut running = self.running.lock().await;
466 *running = false;
467
468 let mut watcher = self.watcher.lock().await;
469 watcher.stop_all().await?;
470 info!("Sync service stopped");
471 Ok(())
472 }
473
474 pub async fn monitor_workspace(&self, workspace_id: &str, directory: &str) -> Result<()> {
476 let mut watcher = self.watcher.lock().await;
477 watcher.start_monitoring(workspace_id, directory).await?;
478 Ok(())
479 }
480
481 pub async fn stop_monitoring_workspace(&self, workspace_id: &str) -> Result<()> {
483 let mut watcher = self.watcher.lock().await;
484 watcher.stop_monitoring(workspace_id).await?;
485 Ok(())
486 }
487
488 pub async fn is_workspace_monitored(&self, workspace_id: &str) -> bool {
490 let watcher = self.watcher.lock().await;
491 watcher.is_monitoring(workspace_id).await
492 }
493}
494
495#[cfg(test)]
496mod tests {
497 use super::*;
498 use tempfile::TempDir;
499
500 #[tokio::test]
501 async fn test_sync_service_creation() {
502 let temp_dir = TempDir::new().unwrap();
503 let service = SyncService::new(temp_dir.path());
504
505 assert!(!*service.running.lock().await);
506 }
507
508 #[tokio::test]
509 async fn test_sync_service_lifecycle() {
510 let temp_dir = TempDir::new().unwrap();
511 let service = SyncService::new(temp_dir.path());
512
513 service.start().await.unwrap();
515 assert!(*service.running.lock().await);
516
517 service.stop().await.unwrap();
519 assert!(!*service.running.lock().await);
520 }
521}