git_sync_rs/
watch.rs

1use crate::error::{Result, SyncError};
2use crate::sync::{RepositorySynchronizer, SyncConfig};
3use git2::Repository;
4use notify::{Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
5use std::path::{Path, PathBuf};
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::sync::Arc;
8use std::time::Duration;
9use tokio::sync::mpsc;
10use tokio::time;
11use tracing::{debug, error, info, warn};
12
13/// Watch mode configuration
14#[derive(Debug, Clone)]
15pub struct WatchConfig {
16    /// How long to wait after changes before syncing (milliseconds)
17    pub debounce_ms: u64,
18
19    /// Minimum interval between syncs (milliseconds)
20    pub min_interval_ms: u64,
21
22    /// Whether to sync on startup
23    pub sync_on_start: bool,
24
25    /// Dry run mode - detect changes but don't sync
26    pub dry_run: bool,
27}
28
29impl Default for WatchConfig {
30    fn default() -> Self {
31        Self {
32            debounce_ms: 500,
33            min_interval_ms: 1000,
34            sync_on_start: true,
35            dry_run: false,
36        }
37    }
38}
39
40/// Manages file system watching and automatic synchronization
41pub struct WatchManager {
42    repo_path: String,
43    sync_config: SyncConfig,
44    watch_config: WatchConfig,
45    is_syncing: Arc<AtomicBool>,
46}
47
48/// Event handler for file system changes
49struct FileEventHandler {
50    repo_path: PathBuf,
51    tx: mpsc::Sender<Event>,
52}
53
54impl FileEventHandler {
55    fn new(repo_path: PathBuf, tx: mpsc::Sender<Event>) -> Self {
56        Self { repo_path, tx }
57    }
58
59    fn handle_event(&self, res: std::result::Result<Event, notify::Error>) {
60        let event = match res {
61            Ok(event) => event,
62            Err(e) => {
63                error!("Watch error: {}", e);
64                return;
65            }
66        };
67
68        debug!("Raw file event received: {:?}", event);
69
70        if !self.should_process_event(&event) {
71            return;
72        }
73
74        debug!("Event is relevant, sending to channel");
75        if let Err(e) = self.tx.blocking_send(event.clone()) {
76            error!("Failed to send event to channel: {}", e);
77        } else {
78            debug!("Event sent successfully: {:?}", event.kind);
79        }
80    }
81
82    fn should_process_event(&self, event: &Event) -> bool {
83        // Ignore git directory changes
84        if self.is_git_internal(event) {
85            debug!("Ignoring git internal event");
86            return false;
87        }
88
89        // Open repository for gitignore check
90        let repo = match Repository::open(&self.repo_path) {
91            Ok(r) => r,
92            Err(e) => {
93                error!("Failed to open repository for gitignore check: {}", e);
94                return false;
95            }
96        };
97
98        // Check if any path in the event should be ignored
99        let should_ignore = event
100            .paths
101            .iter()
102            .any(|path| self.should_ignore_path(&repo, path));
103
104        if should_ignore {
105            debug!("Ignoring gitignored file event");
106            return false;
107        }
108
109        // Check if this is a relevant change type
110        if !self.is_relevant_change(event) {
111            debug!("Event not considered relevant: {:?}", event.kind);
112            return false;
113        }
114
115        true
116    }
117
118    /// Check if an event is related to git internals
119    fn is_git_internal(&self, event: &Event) -> bool {
120        event
121            .paths
122            .iter()
123            .any(|path| path.components().any(|c| c.as_os_str() == ".git"))
124    }
125
126    /// Check if a path should be ignored according to gitignore rules
127    fn should_ignore_path(&self, repo: &Repository, file_path: &Path) -> bool {
128        // Make path relative to repo root
129        let relative_path = match file_path.strip_prefix(&self.repo_path) {
130            Ok(p) => p,
131            Err(_) => {
132                debug!("Path {:?} is outside repo, ignoring", file_path);
133                return true;
134            }
135        };
136
137        // Check if the path is ignored by git
138        match repo.status_should_ignore(relative_path) {
139            Ok(ignored) => {
140                if ignored {
141                    debug!("Path {:?} is gitignored", relative_path);
142                }
143                ignored
144            }
145            Err(e) => {
146                debug!(
147                    "Error checking gitignore status for {:?}: {}",
148                    relative_path, e
149                );
150                false
151            }
152        }
153    }
154
155    /// Check if an event represents a relevant change
156    fn is_relevant_change(&self, event: &Event) -> bool {
157        let is_relevant = matches!(
158            event.kind,
159            EventKind::Create(_) | EventKind::Modify(_) | EventKind::Remove(_)
160        );
161
162        debug!(
163            "is_relevant_change: kind={:?}, relevant={}",
164            event.kind, is_relevant
165        );
166
167        is_relevant
168    }
169}
170
171impl WatchManager {
172    /// Create a new watch manager
173    pub fn new(
174        repo_path: impl AsRef<Path>,
175        sync_config: SyncConfig,
176        watch_config: WatchConfig,
177    ) -> Self {
178        // Expand tilde in path
179        let path_str = repo_path.as_ref().to_string_lossy();
180        let expanded = shellexpand::tilde(&path_str).to_string();
181
182        Self {
183            repo_path: expanded,
184            sync_config,
185            watch_config,
186            is_syncing: Arc::new(AtomicBool::new(false)),
187        }
188    }
189
190    /// Start watching for changes
191    pub async fn watch(&self) -> Result<()> {
192        info!("Starting watch mode for: {}", self.repo_path);
193
194        // Sync on startup if configured
195        if self.watch_config.sync_on_start {
196            info!("Performing initial sync");
197            self.perform_sync().await?;
198        }
199
200        // Create channel for file events
201        let (tx, rx) = mpsc::channel::<Event>(100);
202
203        // Setup file watcher
204        let _watcher = self.setup_watcher(tx)?;
205
206        info!(
207            "Watching for changes (debounce: {}s)",
208            self.watch_config.debounce_ms as f64 / 1000.0
209        );
210
211        // Process events
212        self.process_events(rx).await
213    }
214
215    /// Setup the file system watcher
216    fn setup_watcher(&self, tx: mpsc::Sender<Event>) -> Result<RecommendedWatcher> {
217        let repo_path_clone = PathBuf::from(&self.repo_path);
218        let handler = FileEventHandler::new(repo_path_clone, tx);
219
220        let mut watcher =
221            RecommendedWatcher::new(move |res| handler.handle_event(res), Config::default())?;
222
223        // Watch the repository path
224        watcher.watch(Path::new(&self.repo_path), RecursiveMode::Recursive)?;
225
226        Ok(watcher)
227    }
228
229    /// Process file system events
230    async fn process_events(&self, mut rx: mpsc::Receiver<Event>) -> Result<()> {
231        let mut sync_state = SyncState::new(
232            self.watch_config.debounce_ms,
233            self.watch_config.min_interval_ms,
234        );
235
236        // Periodic interval prevents starvation under continuous events
237        let tick_ms = self
238            .watch_config
239            .debounce_ms
240            .min(self.watch_config.min_interval_ms)
241            .max(50);
242        let mut interval = time::interval(Duration::from_millis(tick_ms));
243        interval.tick().await; // align first tick
244
245        loop {
246            tokio::select! {
247                biased;
248                // Give the interval a chance first to avoid starvation
249                _ = interval.tick() => {
250                    self.handle_timeout(&mut sync_state).await;
251                }
252                Some(event) = rx.recv() => {
253                    self.handle_file_event(event, &mut sync_state);
254                }
255            }
256        }
257    }
258
259    /// Handle a file system event
260    fn handle_file_event(&self, event: Event, sync_state: &mut SyncState) {
261        debug!("Received event from channel: {:?}", event);
262        debug!("Event kind: {:?}, paths: {:?}", event.kind, event.paths);
263
264        // Use FileEventHandler's method to check relevance
265        // We can't easily share this without restructuring, so for now keep it simple
266        if self.is_relevant_change(&event) {
267            info!("Relevant change detected, marking pending sync");
268            sync_state.mark_pending();
269        } else {
270            debug!("Event not considered relevant: {:?}", event.kind);
271        }
272    }
273
274    /// Check if an event represents a relevant change
275    fn is_relevant_change(&self, event: &Event) -> bool {
276        let is_relevant = matches!(
277            event.kind,
278            EventKind::Create(_) | EventKind::Modify(_) | EventKind::Remove(_)
279        );
280
281        debug!(
282            "is_relevant_change: kind={:?}, relevant={}",
283            event.kind, is_relevant
284        );
285
286        is_relevant
287    }
288
289    /// Handle timeout expiration
290    async fn handle_timeout(&self, sync_state: &mut SyncState) {
291        if !sync_state.should_sync() {
292            return;
293        }
294
295        // Check if already syncing
296        if self.is_syncing.load(Ordering::Acquire) {
297            debug!("Sync already in progress, skipping");
298            return;
299        }
300
301        info!("Changes detected, triggering sync");
302        // Attach a span with useful fields for diagnostics
303        let span = tracing::info_span!(
304            "perform_sync_attempt",
305            repo = %self.repo_path,
306            branch = %self.sync_config.branch_name,
307            remote = %self.sync_config.remote_name,
308            dry_run = self.watch_config.dry_run
309        );
310        let _guard = span.enter();
311        match self.perform_sync().await {
312            Ok(()) => {
313                // Only record successful syncs
314                debug!("perform_sync succeeded");
315                sync_state.record_sync();
316            }
317            Err(e) => {
318                // Do not clear pending flag on failure; allow retry
319                match &e {
320                    SyncError::DetachedHead => error!(
321                        "Sync failed: detached HEAD. Repository must be on a branch; will retry."
322                    ),
323                    SyncError::UnsafeRepositoryState { state } => error!(
324                        state = %state,
325                        "Sync failed: repository in unsafe state; will retry"
326                    ),
327                    SyncError::ManualInterventionRequired { reason } => warn!(
328                        reason = %reason,
329                        "Sync requires manual intervention; pending will remain set"
330                    ),
331                    SyncError::NoRemoteConfigured { branch } => error!(
332                        branch = %branch,
333                        "Sync failed: no remote configured for branch"
334                    ),
335                    SyncError::NetworkError(msg) => error!(
336                        error = %msg,
337                        "Network error during sync; will retry"
338                    ),
339                    SyncError::TaskError(msg) => error!(
340                        error = %msg,
341                        "Background task error during sync; will retry"
342                    ),
343                    SyncError::GitError(err) => error!(
344                        code = ?err.code(),
345                        klass = ?err.class(),
346                        message = %err.message(),
347                        "Git error during sync; will retry"
348                    ),
349                    other => error!(error = %other, "Sync failed; will retry"),
350                }
351            }
352        }
353    }
354
355    /// Perform a synchronization
356    async fn perform_sync(&self) -> Result<()> {
357        // Set syncing flag (lock-free)
358        if self.is_syncing.swap(true, Ordering::AcqRel) {
359            debug!("Sync already in progress");
360            return Ok(());
361        }
362
363        // Run the sync and ensure we clear the syncing flag regardless of outcome
364        let result: Result<()> = if self.watch_config.dry_run {
365            info!("DRY RUN: Would perform sync now");
366            Ok(())
367        } else {
368            // Perform sync in blocking task
369            let repo_path = self.repo_path.clone();
370            let sync_config = self.sync_config.clone();
371
372            debug!("Spawning blocking sync task");
373            match tokio::task::spawn_blocking(move || {
374                // Create synchronizer
375                let synchronizer =
376                    RepositorySynchronizer::new_with_detected_branch(&repo_path, sync_config)?;
377
378                // Perform sync
379                synchronizer.sync(false)
380            })
381            .await
382            {
383                Ok(inner) => inner,
384                Err(e) => {
385                    error!("Join error waiting for sync task: {}", e);
386                    Err(e.into())
387                }
388            }
389        };
390
391        // Clear syncing flag (finally-like)
392        self.is_syncing.store(false, Ordering::Release);
393
394        if let Err(ref err) = result {
395            error!(error = %err, "perform_sync finished with error");
396        } else {
397            debug!("perform_sync finished successfully");
398        }
399        result
400    }
401}
402
403/// State for managing sync timing
404struct SyncState {
405    last_sync: time::Instant,
406    pending_sync: bool,
407    min_interval: Duration,
408    debounce: Duration,
409    last_event: Option<time::Instant>,
410}
411
412impl SyncState {
413    fn new(debounce_ms: u64, min_interval_ms: u64) -> Self {
414        Self {
415            last_sync: time::Instant::now(),
416            pending_sync: false,
417            min_interval: Duration::from_millis(min_interval_ms),
418            debounce: Duration::from_millis(debounce_ms),
419            last_event: None,
420        }
421    }
422
423    fn mark_pending(&mut self) {
424        self.pending_sync = true;
425        self.last_event = Some(time::Instant::now());
426    }
427
428    fn should_sync(&self) -> bool {
429        if !self.pending_sync {
430            return false;
431        }
432
433        // Enforce minimum interval between syncs (throttle ensures progress)
434        let since_last_sync = self.last_sync.elapsed();
435        if since_last_sync < self.min_interval {
436            debug!("Too soon since last sync, waiting");
437            return false;
438        }
439
440        // Prefer to wait for quiet period, but do not starve: if min_interval
441        // has elapsed, allow sync even if events keep arriving.
442        if let Some(t) = self.last_event {
443            let since_last_event = t.elapsed();
444            if since_last_event < self.debounce {
445                debug!("Debounce active, but proceeding due to min-interval");
446            }
447        }
448
449        true
450    }
451
452    fn record_sync(&mut self) {
453        self.last_sync = time::Instant::now();
454        self.pending_sync = false;
455        self.last_event = None;
456    }
457}
458
459/// Run watch mode with periodic sync
460pub async fn watch_with_periodic_sync(
461    repo_path: impl AsRef<Path>,
462    sync_config: SyncConfig,
463    watch_config: WatchConfig,
464    sync_interval_ms: Option<u64>,
465) -> Result<()> {
466    let manager = WatchManager::new(repo_path, sync_config, watch_config);
467
468    if let Some(interval_ms) = sync_interval_ms {
469        // Run with periodic sync
470        info!(
471            "Periodic sync enabled (interval: {}s)",
472            interval_ms as f64 / 1000.0
473        );
474
475        let manager_clone = Arc::new(manager);
476        let manager_watch = manager_clone.clone();
477
478        // Start watch task
479        let watch_handle = tokio::spawn(async move { manager_watch.watch().await });
480
481        // Start periodic sync task
482        let periodic_handle = tokio::spawn(async move {
483            let mut interval = time::interval(Duration::from_millis(interval_ms));
484            interval.tick().await; // Skip first immediate tick
485
486            loop {
487                interval.tick().await;
488                info!("Periodic sync triggered");
489                if let Err(e) = manager_clone.perform_sync().await {
490                    error!("Periodic sync failed: {}", e);
491                }
492            }
493        });
494
495        // Wait for either task to finish (they shouldn't normally)
496        tokio::select! {
497            result = watch_handle => result?,
498            result = periodic_handle => result?,
499        }
500    } else {
501        // Just run watch mode
502        manager.watch().await
503    }
504}