git_sync_rs/
watch.rs

1use crate::error::Result;
2use crate::sync::{RepositorySynchronizer, SyncConfig};
3use git2::Repository;
4use notify::{Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
5use std::path::{Path, PathBuf};
6use std::sync::Arc;
7use std::time::Duration;
8use tokio::sync::mpsc;
9use tokio::sync::Mutex;
10use tokio::time;
11use tracing::{debug, error, info};
12
13/// Watch mode configuration
14#[derive(Debug, Clone)]
15pub struct WatchConfig {
16    /// How long to wait after changes before syncing (milliseconds)
17    pub debounce_ms: u64,
18
19    /// Minimum interval between syncs (milliseconds)
20    pub min_interval_ms: u64,
21
22    /// Whether to sync on startup
23    pub sync_on_start: bool,
24
25    /// Dry run mode - detect changes but don't sync
26    pub dry_run: bool,
27}
28
29impl Default for WatchConfig {
30    fn default() -> Self {
31        Self {
32            debounce_ms: 500,
33            min_interval_ms: 1000,
34            sync_on_start: true,
35            dry_run: false,
36        }
37    }
38}
39
40/// Manages file system watching and automatic synchronization
41pub struct WatchManager {
42    repo_path: String,
43    sync_config: SyncConfig,
44    watch_config: WatchConfig,
45    is_syncing: Arc<Mutex<bool>>,
46}
47
48/// Event handler for file system changes
49struct FileEventHandler {
50    repo_path: PathBuf,
51    tx: mpsc::Sender<Event>,
52}
53
54impl FileEventHandler {
55    fn new(repo_path: PathBuf, tx: mpsc::Sender<Event>) -> Self {
56        Self { repo_path, tx }
57    }
58
59    fn handle_event(&self, res: std::result::Result<Event, notify::Error>) {
60        let event = match res {
61            Ok(event) => event,
62            Err(e) => {
63                error!("Watch error: {}", e);
64                return;
65            }
66        };
67
68        debug!("Raw file event received: {:?}", event);
69
70        if !self.should_process_event(&event) {
71            return;
72        }
73
74        debug!("Event is relevant, sending to channel");
75        if let Err(e) = self.tx.blocking_send(event.clone()) {
76            error!("Failed to send event to channel: {}", e);
77        } else {
78            debug!("Event sent successfully: {:?}", event.kind);
79        }
80    }
81
82    fn should_process_event(&self, event: &Event) -> bool {
83        // Ignore git directory changes
84        if self.is_git_internal(event) {
85            debug!("Ignoring git internal event");
86            return false;
87        }
88
89        // Open repository for gitignore check
90        let repo = match Repository::open(&self.repo_path) {
91            Ok(r) => r,
92            Err(e) => {
93                error!("Failed to open repository for gitignore check: {}", e);
94                return false;
95            }
96        };
97
98        // Check if any path in the event should be ignored
99        let should_ignore = event
100            .paths
101            .iter()
102            .any(|path| self.should_ignore_path(&repo, path));
103
104        if should_ignore {
105            debug!("Ignoring gitignored file event");
106            return false;
107        }
108
109        // Check if this is a relevant change type
110        if !self.is_relevant_change(event) {
111            debug!("Event not considered relevant: {:?}", event.kind);
112            return false;
113        }
114
115        true
116    }
117
118    /// Check if an event is related to git internals
119    fn is_git_internal(&self, event: &Event) -> bool {
120        event
121            .paths
122            .iter()
123            .any(|path| path.components().any(|c| c.as_os_str() == ".git"))
124    }
125
126    /// Check if a path should be ignored according to gitignore rules
127    fn should_ignore_path(&self, repo: &Repository, file_path: &Path) -> bool {
128        // Make path relative to repo root
129        let relative_path = match file_path.strip_prefix(&self.repo_path) {
130            Ok(p) => p,
131            Err(_) => {
132                debug!("Path {:?} is outside repo, ignoring", file_path);
133                return true;
134            }
135        };
136
137        // Check if the path is ignored by git
138        match repo.status_should_ignore(relative_path) {
139            Ok(ignored) => {
140                if ignored {
141                    debug!("Path {:?} is gitignored", relative_path);
142                }
143                ignored
144            }
145            Err(e) => {
146                debug!(
147                    "Error checking gitignore status for {:?}: {}",
148                    relative_path, e
149                );
150                false
151            }
152        }
153    }
154
155    /// Check if an event represents a relevant change
156    fn is_relevant_change(&self, event: &Event) -> bool {
157        let is_relevant = matches!(
158            event.kind,
159            EventKind::Create(_) | EventKind::Modify(_) | EventKind::Remove(_)
160        );
161
162        debug!(
163            "is_relevant_change: kind={:?}, relevant={}",
164            event.kind, is_relevant
165        );
166
167        is_relevant
168    }
169}
170
171impl WatchManager {
172    /// Create a new watch manager
173    pub fn new(
174        repo_path: impl AsRef<Path>,
175        sync_config: SyncConfig,
176        watch_config: WatchConfig,
177    ) -> Self {
178        // Expand tilde in path
179        let path_str = repo_path.as_ref().to_string_lossy();
180        let expanded = shellexpand::tilde(&path_str).to_string();
181
182        Self {
183            repo_path: expanded,
184            sync_config,
185            watch_config,
186            is_syncing: Arc::new(Mutex::new(false)),
187        }
188    }
189
190    /// Start watching for changes
191    pub async fn watch(&self) -> Result<()> {
192        info!("Starting watch mode for: {}", self.repo_path);
193
194        // Sync on startup if configured
195        if self.watch_config.sync_on_start {
196            info!("Performing initial sync");
197            self.perform_sync().await?;
198        }
199
200        // Create channel for file events
201        let (tx, rx) = mpsc::channel::<Event>(100);
202
203        // Setup file watcher
204        let _watcher = self.setup_watcher(tx)?;
205
206        info!(
207            "Watching for changes (debounce: {}s)",
208            self.watch_config.debounce_ms as f64 / 1000.0
209        );
210
211        // Process events
212        self.process_events(rx).await
213    }
214
215    /// Setup the file system watcher
216    fn setup_watcher(&self, tx: mpsc::Sender<Event>) -> Result<RecommendedWatcher> {
217        let repo_path_clone = PathBuf::from(&self.repo_path);
218        let handler = FileEventHandler::new(repo_path_clone, tx);
219
220        let mut watcher =
221            RecommendedWatcher::new(move |res| handler.handle_event(res), Config::default())?;
222
223        // Watch the repository path
224        watcher.watch(Path::new(&self.repo_path), RecursiveMode::Recursive)?;
225
226        Ok(watcher)
227    }
228
229    /// Process file system events
230    async fn process_events(&self, mut rx: mpsc::Receiver<Event>) -> Result<()> {
231        let mut sync_state = SyncState::new(
232            self.watch_config.debounce_ms,
233            self.watch_config.min_interval_ms,
234        );
235
236        loop {
237            let timeout = time::sleep(Duration::from_millis(self.watch_config.debounce_ms));
238            tokio::pin!(timeout);
239
240            tokio::select! {
241                Some(event) = rx.recv() => {
242                    self.handle_file_event(event, &mut sync_state);
243                }
244                _ = &mut timeout => {
245                    self.handle_timeout(&mut sync_state).await;
246                }
247            }
248        }
249    }
250
251    /// Handle a file system event
252    fn handle_file_event(&self, event: Event, sync_state: &mut SyncState) {
253        debug!("Received event from channel: {:?}", event);
254        debug!("Event kind: {:?}, paths: {:?}", event.kind, event.paths);
255
256        // Use FileEventHandler's method to check relevance
257        // We can't easily share this without restructuring, so for now keep it simple
258        if self.is_relevant_change(&event) {
259            info!("Relevant change detected, marking pending sync");
260            sync_state.mark_pending();
261        } else {
262            debug!("Event not considered relevant: {:?}", event.kind);
263        }
264    }
265
266    /// Check if an event represents a relevant change
267    fn is_relevant_change(&self, event: &Event) -> bool {
268        let is_relevant = matches!(
269            event.kind,
270            EventKind::Create(_) | EventKind::Modify(_) | EventKind::Remove(_)
271        );
272
273        debug!(
274            "is_relevant_change: kind={:?}, relevant={}",
275            event.kind, is_relevant
276        );
277
278        is_relevant
279    }
280
281    /// Handle timeout expiration
282    async fn handle_timeout(&self, sync_state: &mut SyncState) {
283        if !sync_state.should_sync() {
284            return;
285        }
286
287        // Check if already syncing
288        let is_syncing = self.is_syncing.lock().await;
289        if *is_syncing {
290            debug!("Sync already in progress, skipping");
291            return;
292        }
293        drop(is_syncing); // Release lock before syncing
294
295        info!("Changes detected, triggering sync");
296        if let Err(e) = self.perform_sync().await {
297            error!("Sync failed: {}", e);
298        }
299
300        sync_state.record_sync();
301    }
302
303    /// Perform a synchronization
304    async fn perform_sync(&self) -> Result<()> {
305        // Set syncing flag
306        {
307            let mut is_syncing = self.is_syncing.lock().await;
308            if *is_syncing {
309                debug!("Sync already in progress");
310                return Ok(());
311            }
312            *is_syncing = true;
313        }
314
315        if self.watch_config.dry_run {
316            info!("DRY RUN: Would perform sync now");
317            // Clear syncing flag
318            {
319                let mut is_syncing = self.is_syncing.lock().await;
320                *is_syncing = false;
321            }
322            return Ok(());
323        }
324
325        // Perform sync in blocking task
326        let repo_path = self.repo_path.clone();
327        let sync_config = self.sync_config.clone();
328
329        tokio::task::spawn_blocking(move || {
330            // Create synchronizer
331            let synchronizer =
332                RepositorySynchronizer::new_with_detected_branch(&repo_path, sync_config)?;
333
334            // Perform sync
335            synchronizer.sync(false)
336        })
337        .await??;
338
339        // Clear syncing flag
340        {
341            let mut is_syncing = self.is_syncing.lock().await;
342            *is_syncing = false;
343        }
344
345        Ok(())
346    }
347}
348
349/// State for managing sync timing
350struct SyncState {
351    last_sync: time::Instant,
352    pending_sync: bool,
353    min_interval: Duration,
354}
355
356impl SyncState {
357    fn new(_debounce_ms: u64, min_interval_ms: u64) -> Self {
358        Self {
359            last_sync: time::Instant::now(),
360            pending_sync: false,
361            min_interval: Duration::from_millis(min_interval_ms),
362        }
363    }
364
365    fn mark_pending(&mut self) {
366        self.pending_sync = true;
367    }
368
369    fn should_sync(&self) -> bool {
370        if !self.pending_sync {
371            return false;
372        }
373
374        let elapsed = self.last_sync.elapsed();
375        if elapsed < self.min_interval {
376            debug!("Too soon since last sync, waiting");
377            return false;
378        }
379
380        true
381    }
382
383    fn record_sync(&mut self) {
384        self.last_sync = time::Instant::now();
385        self.pending_sync = false;
386    }
387}
388
389/// Run watch mode with periodic sync
390pub async fn watch_with_periodic_sync(
391    repo_path: impl AsRef<Path>,
392    sync_config: SyncConfig,
393    watch_config: WatchConfig,
394    sync_interval_ms: Option<u64>,
395) -> Result<()> {
396    let manager = WatchManager::new(repo_path, sync_config, watch_config);
397
398    if let Some(interval_ms) = sync_interval_ms {
399        // Run with periodic sync
400        info!(
401            "Periodic sync enabled (interval: {}s)",
402            interval_ms as f64 / 1000.0
403        );
404
405        let manager_clone = Arc::new(manager);
406        let manager_watch = manager_clone.clone();
407
408        // Start watch task
409        let watch_handle = tokio::spawn(async move { manager_watch.watch().await });
410
411        // Start periodic sync task
412        let periodic_handle = tokio::spawn(async move {
413            let mut interval = time::interval(Duration::from_millis(interval_ms));
414            interval.tick().await; // Skip first immediate tick
415
416            loop {
417                interval.tick().await;
418                info!("Periodic sync triggered");
419                if let Err(e) = manager_clone.perform_sync().await {
420                    error!("Periodic sync failed: {}", e);
421                }
422            }
423        });
424
425        // Wait for either task to finish (they shouldn't normally)
426        tokio::select! {
427            result = watch_handle => result?,
428            result = periodic_handle => result?,
429        }
430    } else {
431        // Just run watch mode
432        manager.watch().await
433    }
434}