msy 0.4.6

Modern musl rsync alternative - Fast, parallel file synchronization
Documentation
use crate::sync::SyncEngine;
use crate::transport::Transport;
use anyhow::Result;
use notify::{Event, RecommendedWatcher, RecursiveMode, Watcher};
use std::path::PathBuf;
use std::sync::mpsc::{RecvTimeoutError, channel};
use std::time::{Duration, Instant};
use tokio::signal;

#[cfg(test)]
use crate::cli::SymlinkMode;
#[cfg(test)]
use crate::integrity::ChecksumType;

pub struct WatchMode<T: Transport> {
	engine: SyncEngine<T>,
	source: PathBuf,
	destination: PathBuf,
	debounce: Duration,
}

impl<T: Transport + 'static> WatchMode<T> {
	pub fn new(engine: SyncEngine<T>, source: PathBuf, destination: PathBuf, debounce: Duration) -> Self {
		Self { engine, source, destination, debounce }
	}

	pub async fn watch(&self) -> Result<()> {
		// Initial sync
		tracing::info!("Running initial sync...");
		self.engine.sync(&self.source, &self.destination).await?;

		// Set up file watcher
		let (tx, rx) = channel();
		let mut watcher: RecommendedWatcher = notify::recommended_watcher(tx)?;
		watcher.watch(&self.source, RecursiveMode::Recursive)?;

		println!("\n🔍 Watching {} for changes (Ctrl+C to stop)...\n", self.source.display());

		// Event loop with debouncing
		let mut pending_changes = Vec::new();
		let mut last_sync = Instant::now();

		// Set up Ctrl+C handler
		let ctrl_c = signal::ctrl_c();
		tokio::pin!(ctrl_c);

		loop {
			// Check for Ctrl+C
			tokio::select! {
				_ = &mut ctrl_c => {
					println!("\nâšī¸  Stopping watch mode...");
					break;
				}
				_ = tokio::time::sleep(Duration::from_millis(10)) => {
					// Continue to check file events
				}
			}

			// Process file system events
			match rx.recv_timeout(Duration::from_millis(100)) {
				Ok(Ok(event)) => {
					// Filter out events we don't care about
					if self.should_sync_event(&event) {
						pending_changes.push(event);
					}
				}
				Ok(Err(e)) => {
					tracing::error!("Watch error: {}", e);
					// Force sync on error to ensure consistency
					pending_changes.push(Event::new(notify::EventKind::Other));
				}
				Err(RecvTimeoutError::Timeout) => {
					// Check if we should sync (debounce timeout reached)
					if !pending_changes.is_empty() && last_sync.elapsed() >= self.debounce {
						tracing::info!("Detected {} changes, syncing...", pending_changes.len());
						println!("📝 Changes detected, syncing...");

						match self.engine.sync(&self.source, &self.destination).await {
							Ok(_) => {
								println!("✓ Sync complete\n");
							}
							Err(e) => {
								eprintln!("✗ Sync failed: {}\n", e);
							}
						}

						pending_changes.clear();
						last_sync = Instant::now();
					}
				}
				Err(RecvTimeoutError::Disconnected) => {
					tracing::error!("File watcher disconnected unexpectedly");
					eprintln!("❌ File watcher stopped. Exiting.");
					break;
				}
			}
		}

		Ok(())
	}

	fn should_sync_event(&self, event: &Event) -> bool {
		use notify::EventKind;

		match event.kind {
			// File created, modified, or removed
			EventKind::Create(_) | EventKind::Modify(_) | EventKind::Remove(_) | EventKind::Other => true,
			// Ignore metadata-only changes (access time, etc.)
			_ => false,
		}
	}
}

#[cfg(test)]
mod tests {
	use super::*;
	use crate::transport::local::LocalTransport;
	use std::fs;
	use tempfile::TempDir;

	#[test]
	fn test_watch_mode_creation() {
		let temp = TempDir::new().unwrap();
		let source = temp.path().join("src");
		let destination = temp.path().join("dst");
		fs::create_dir_all(&source).unwrap();
		fs::create_dir_all(&destination).unwrap();

		let transport = LocalTransport::new();
		let engine = SyncEngine::new(
			transport,
			false,                              // dry_run
			false,                              // diff_mode
			false,                              // delete
			50,                                 // delete_threshold
			false,                              // trash
			false,                              // force_delete
			true,                               // quiet
			10,                                 // parallel
			100,                                // max_errors
			None,                               // min_size
			None,                               // max_size
			crate::filter::FilterEngine::new(), // filter_engine
			None,                               // bwlimit
			false,                              // resume
			10,                                 // checkpoint_files
			100,                                // checkpoint_bytes
			false,                              // json
			ChecksumType::None,                 // verification_mode
			false,                              // verify_on_write
			SymlinkMode::Preserve,              // symlink_mode
			false,                              // preserve_xattrs
			false,                              // preserve_hardlinks
			false,                              // preserve_acls
			false,                              // preserve_flags
			false,                              // per_file_progress
			false,                              // ignore_times
			false,                              // size_only
			false,                              // checksum
			false,                              // update_only
			false,                              // ignore_existing
			false,                              // use_cache
			false,                              // clear_cache
			false,                              // checksum_db
			false,                              // clear_checksum_db
			false,                              // prune_checksum_db
			false,                              // dest_is_remote
			false,                              // perf
		);

		let watch_mode = WatchMode::new(engine, source.clone(), destination.clone(), Duration::from_millis(500));

		assert_eq!(watch_mode.source, source);
		assert_eq!(watch_mode.destination, destination);
		assert_eq!(watch_mode.debounce, Duration::from_millis(500));
	}

	#[test]
	fn test_should_sync_event() {
		use notify::{Event, EventKind};

		let temp = TempDir::new().unwrap();
		let source = temp.path().join("src");
		let destination = temp.path().join("dst");
		fs::create_dir_all(&source).unwrap();
		fs::create_dir_all(&destination).unwrap();

		let transport = LocalTransport::new();
		let engine = SyncEngine::new(
			transport,
			false, // dry_run
			false, // diff_mode
			false, // delete
			50,    // delete_threshold
			false, // trash
			false, // force_delete
			true,
			10,
			100, // max_errors
			None,
			None,
			crate::filter::FilterEngine::new(),
			None,
			false,
			10,
			100,
			false,
			ChecksumType::None,
			false,
			SymlinkMode::Preserve,
			false,
			false,
			false,
			false, // preserve_flags
			false, // per_file_progress
			false, // ignore_times
			false, // size_only
			false, // checksum
			false, // update_only
			false, // ignore_existing
			false, // use_cache
			false, // clear_cache
			false, // checksum_db
			false, // clear_checksum_db
			false, // prune_checksum_db
			false, // dest_is_remote
			false, // perf
		);

		let watch_mode = WatchMode::new(engine, source, destination, Duration::from_millis(500));

		// Should sync on create, modify, remove
		let create_event = Event::new(EventKind::Create(notify::event::CreateKind::File));
		assert!(watch_mode.should_sync_event(&create_event));

		let modify_event = Event::new(EventKind::Modify(notify::event::ModifyKind::Data(notify::event::DataChange::Any)));
		assert!(watch_mode.should_sync_event(&modify_event));

		let remove_event = Event::new(EventKind::Remove(notify::event::RemoveKind::File));
		assert!(watch_mode.should_sync_event(&remove_event));

		// Should not sync on access events
		let access_event = Event::new(EventKind::Access(notify::event::AccessKind::Read));
		assert!(!watch_mode.should_sync_event(&access_event));
	}
}