use std::time::Duration;
use notify::{recommended_watcher, Event, EventKind, RecursiveMode, Watcher};
use tracing::{debug, error, info};
use crate::infra::error::SyncError;
use super::sync_logic::sync_directories;
use super::report::SyncReport;
use super::types::SyncParameters;
pub async fn watch_task(
params: &SyncParameters,
delay_ms: u64,
) -> anyhow::Result<SyncReport, SyncError> {
let mut total_report = SyncReport::default();
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
let mut watcher =
recommended_watcher(move |res: Result<Event, notify::Error>| {
match res {
Ok(event) => {
match event.kind {
EventKind::Modify(_) | EventKind::Create(_) | EventKind::Remove(_) => {
let _ = tx.send(event);
}
_ => {
debug!(event = ?event, "Ignored file system event");
}
}
}
Err(error) => {
error!("📁 File watch error: {}", error)
}
}
})
.map_err(|e| anyhow::anyhow!("Failed to create file watcher: {}", e))?;
watcher
.watch(¶ms.source, RecursiveMode::Recursive)
.map_err(|e| {
anyhow::anyhow!(
"Failed to watch directory '{}': {}",
params.source.display(),
e
)
})?;
info!(
"Started watching: {} → {}",
params.source.display(),
params.target.display()
);
loop {
if rx.recv().await.is_none() {
info!("Watcher channel closed, exiting...");
break; }
debug!(
"Change detected, starting debounce period of {}ms...",
delay_ms
);
loop {
match tokio::time::timeout(Duration::from_millis(delay_ms), rx.recv()).await {
Ok(Some(_)) => {
debug!("Another change detected, restarting debounce timer...");
continue; }
Ok(None) => {
info!("Watcher channel closed during debounce.");
return Ok(total_report); }
Err(_) => {
debug!("Debounce period ended with no further changes.");
break; }
}
}
debug!("📁 Detected stable changes → syncing...");
match sync_directories(¶ms).await {
Ok(report) => {
debug!("✅ Sync completed successfully");
total_report.copied.extend(report.copied);
total_report.errors.extend(report.errors);
}
Err(e) => {
error!(
error = ?e,
source = %params.source.display(),
target = %params.target.display(),
"Sync failed during watch"
);
total_report
.errors
.push((params.source.clone(), e.to_string()));
}
}
}
Ok(total_report)
}