1use crate::error::{Result, SyncError};
2use crate::sync::{RepositorySynchronizer, SyncConfig};
3use git2::Repository;
4use notify::{Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
5use std::path::{Path, PathBuf};
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::sync::Arc;
8use std::time::Duration;
9use tokio::sync::mpsc;
10use tokio::time;
11use tracing::{debug, error, info, warn};
12
13#[derive(Debug, Clone)]
15pub struct WatchConfig {
16 pub debounce_ms: u64,
18
19 pub min_interval_ms: u64,
21
22 pub sync_on_start: bool,
24
25 pub dry_run: bool,
27}
28
29impl Default for WatchConfig {
30 fn default() -> Self {
31 Self {
32 debounce_ms: 500,
33 min_interval_ms: 1000,
34 sync_on_start: true,
35 dry_run: false,
36 }
37 }
38}
39
40pub struct WatchManager {
42 repo_path: String,
43 sync_config: SyncConfig,
44 watch_config: WatchConfig,
45 is_syncing: Arc<AtomicBool>,
46}
47
48struct FileEventHandler {
50 repo_path: PathBuf,
51 tx: mpsc::Sender<Event>,
52}
53
54impl FileEventHandler {
55 fn new(repo_path: PathBuf, tx: mpsc::Sender<Event>) -> Self {
56 Self { repo_path, tx }
57 }
58
59 fn handle_event(&self, res: std::result::Result<Event, notify::Error>) {
60 let event = match res {
61 Ok(event) => event,
62 Err(e) => {
63 error!("Watch error: {}", e);
64 return;
65 }
66 };
67
68 debug!("Raw file event received: {:?}", event);
69
70 if !self.should_process_event(&event) {
71 return;
72 }
73
74 debug!("Event is relevant, sending to channel");
75 if let Err(e) = self.tx.blocking_send(event.clone()) {
76 error!("Failed to send event to channel: {}", e);
77 } else {
78 debug!("Event sent successfully: {:?}", event.kind);
79 }
80 }
81
82 fn should_process_event(&self, event: &Event) -> bool {
83 if self.is_git_internal(event) {
85 debug!("Ignoring git internal event");
86 return false;
87 }
88
89 let repo = match Repository::open(&self.repo_path) {
91 Ok(r) => r,
92 Err(e) => {
93 error!("Failed to open repository for gitignore check: {}", e);
94 return false;
95 }
96 };
97
98 let should_ignore = event
100 .paths
101 .iter()
102 .any(|path| self.should_ignore_path(&repo, path));
103
104 if should_ignore {
105 debug!("Ignoring gitignored file event");
106 return false;
107 }
108
109 if !self.is_relevant_change(event) {
111 debug!("Event not considered relevant: {:?}", event.kind);
112 return false;
113 }
114
115 true
116 }
117
118 fn is_git_internal(&self, event: &Event) -> bool {
120 event
121 .paths
122 .iter()
123 .any(|path| path.components().any(|c| c.as_os_str() == ".git"))
124 }
125
126 fn should_ignore_path(&self, repo: &Repository, file_path: &Path) -> bool {
128 let relative_path = match file_path.strip_prefix(&self.repo_path) {
130 Ok(p) => p,
131 Err(_) => {
132 debug!("Path {:?} is outside repo, ignoring", file_path);
133 return true;
134 }
135 };
136
137 match repo.status_should_ignore(relative_path) {
139 Ok(ignored) => {
140 if ignored {
141 debug!("Path {:?} is gitignored", relative_path);
142 }
143 ignored
144 }
145 Err(e) => {
146 debug!(
147 "Error checking gitignore status for {:?}: {}",
148 relative_path, e
149 );
150 false
151 }
152 }
153 }
154
155 fn is_relevant_change(&self, event: &Event) -> bool {
157 let is_relevant = matches!(
158 event.kind,
159 EventKind::Create(_) | EventKind::Modify(_) | EventKind::Remove(_)
160 );
161
162 debug!(
163 "is_relevant_change: kind={:?}, relevant={}",
164 event.kind, is_relevant
165 );
166
167 is_relevant
168 }
169}
170
171impl WatchManager {
172 pub fn new(
174 repo_path: impl AsRef<Path>,
175 sync_config: SyncConfig,
176 watch_config: WatchConfig,
177 ) -> Self {
178 let path_str = repo_path.as_ref().to_string_lossy();
180 let expanded = shellexpand::tilde(&path_str).to_string();
181
182 Self {
183 repo_path: expanded,
184 sync_config,
185 watch_config,
186 is_syncing: Arc::new(AtomicBool::new(false)),
187 }
188 }
189
190 pub async fn watch(&self) -> Result<()> {
192 info!("Starting watch mode for: {}", self.repo_path);
193
194 if self.watch_config.sync_on_start {
196 info!("Performing initial sync");
197 self.perform_sync().await?;
198 }
199
200 let (tx, rx) = mpsc::channel::<Event>(100);
202
203 let _watcher = self.setup_watcher(tx)?;
205
206 info!(
207 "Watching for changes (debounce: {}s)",
208 self.watch_config.debounce_ms as f64 / 1000.0
209 );
210
211 self.process_events(rx).await
213 }
214
215 fn setup_watcher(&self, tx: mpsc::Sender<Event>) -> Result<RecommendedWatcher> {
217 let repo_path_clone = PathBuf::from(&self.repo_path);
218 let handler = FileEventHandler::new(repo_path_clone, tx);
219
220 let mut watcher =
221 RecommendedWatcher::new(move |res| handler.handle_event(res), Config::default())?;
222
223 watcher.watch(Path::new(&self.repo_path), RecursiveMode::Recursive)?;
225
226 Ok(watcher)
227 }
228
229 async fn process_events(&self, mut rx: mpsc::Receiver<Event>) -> Result<()> {
231 let mut sync_state = SyncState::new(
232 self.watch_config.debounce_ms,
233 self.watch_config.min_interval_ms,
234 );
235
236 let tick_ms = self
238 .watch_config
239 .debounce_ms
240 .min(self.watch_config.min_interval_ms)
241 .max(50);
242 let mut interval = time::interval(Duration::from_millis(tick_ms));
243 interval.tick().await; loop {
246 tokio::select! {
247 biased;
248 _ = interval.tick() => {
250 self.handle_timeout(&mut sync_state).await;
251 }
252 Some(event) = rx.recv() => {
253 self.handle_file_event(event, &mut sync_state);
254 }
255 }
256 }
257 }
258
259 fn handle_file_event(&self, event: Event, sync_state: &mut SyncState) {
261 debug!("Received event from channel: {:?}", event);
262 debug!("Event kind: {:?}, paths: {:?}", event.kind, event.paths);
263
264 if self.is_relevant_change(&event) {
267 info!("Relevant change detected, marking pending sync");
268 sync_state.mark_pending();
269 } else {
270 debug!("Event not considered relevant: {:?}", event.kind);
271 }
272 }
273
274 fn is_relevant_change(&self, event: &Event) -> bool {
276 let is_relevant = matches!(
277 event.kind,
278 EventKind::Create(_) | EventKind::Modify(_) | EventKind::Remove(_)
279 );
280
281 debug!(
282 "is_relevant_change: kind={:?}, relevant={}",
283 event.kind, is_relevant
284 );
285
286 is_relevant
287 }
288
289 async fn handle_timeout(&self, sync_state: &mut SyncState) {
291 if !sync_state.should_sync() {
292 return;
293 }
294
295 if self.is_syncing.load(Ordering::Acquire) {
297 debug!("Sync already in progress, skipping");
298 return;
299 }
300
301 info!("Changes detected, triggering sync");
302 let span = tracing::info_span!(
304 "perform_sync_attempt",
305 repo = %self.repo_path,
306 branch = %self.sync_config.branch_name,
307 remote = %self.sync_config.remote_name,
308 dry_run = self.watch_config.dry_run
309 );
310 let _guard = span.enter();
311 match self.perform_sync().await {
312 Ok(()) => {
313 debug!("perform_sync succeeded");
315 sync_state.record_sync();
316 }
317 Err(e) => {
318 match &e {
320 SyncError::DetachedHead => error!(
321 "Sync failed: detached HEAD. Repository must be on a branch; will retry."
322 ),
323 SyncError::UnsafeRepositoryState { state } => error!(
324 state = %state,
325 "Sync failed: repository in unsafe state; will retry"
326 ),
327 SyncError::ManualInterventionRequired { reason } => warn!(
328 reason = %reason,
329 "Sync requires manual intervention; pending will remain set"
330 ),
331 SyncError::NoRemoteConfigured { branch } => error!(
332 branch = %branch,
333 "Sync failed: no remote configured for branch"
334 ),
335 SyncError::NetworkError(msg) => error!(
336 error = %msg,
337 "Network error during sync; will retry"
338 ),
339 SyncError::TaskError(msg) => error!(
340 error = %msg,
341 "Background task error during sync; will retry"
342 ),
343 SyncError::GitError(err) => error!(
344 code = ?err.code(),
345 klass = ?err.class(),
346 message = %err.message(),
347 "Git error during sync; will retry"
348 ),
349 other => error!(error = %other, "Sync failed; will retry"),
350 }
351 }
352 }
353 }
354
355 async fn perform_sync(&self) -> Result<()> {
357 if self.is_syncing.swap(true, Ordering::AcqRel) {
359 debug!("Sync already in progress");
360 return Ok(());
361 }
362
363 let result: Result<()> = if self.watch_config.dry_run {
365 info!("DRY RUN: Would perform sync now");
366 Ok(())
367 } else {
368 let repo_path = self.repo_path.clone();
370 let sync_config = self.sync_config.clone();
371
372 debug!("Spawning blocking sync task");
373 match tokio::task::spawn_blocking(move || {
374 let synchronizer =
376 RepositorySynchronizer::new_with_detected_branch(&repo_path, sync_config)?;
377
378 synchronizer.sync(false)
380 })
381 .await
382 {
383 Ok(inner) => inner,
384 Err(e) => {
385 error!("Join error waiting for sync task: {}", e);
386 Err(e.into())
387 }
388 }
389 };
390
391 self.is_syncing.store(false, Ordering::Release);
393
394 if let Err(ref err) = result {
395 error!(error = %err, "perform_sync finished with error");
396 } else {
397 debug!("perform_sync finished successfully");
398 }
399 result
400 }
401}
402
403struct SyncState {
405 last_sync: time::Instant,
406 pending_sync: bool,
407 min_interval: Duration,
408 debounce: Duration,
409 last_event: Option<time::Instant>,
410}
411
412impl SyncState {
413 fn new(debounce_ms: u64, min_interval_ms: u64) -> Self {
414 Self {
415 last_sync: time::Instant::now(),
416 pending_sync: false,
417 min_interval: Duration::from_millis(min_interval_ms),
418 debounce: Duration::from_millis(debounce_ms),
419 last_event: None,
420 }
421 }
422
423 fn mark_pending(&mut self) {
424 self.pending_sync = true;
425 self.last_event = Some(time::Instant::now());
426 }
427
428 fn should_sync(&self) -> bool {
429 if !self.pending_sync {
430 return false;
431 }
432
433 let since_last_sync = self.last_sync.elapsed();
435 if since_last_sync < self.min_interval {
436 debug!("Too soon since last sync, waiting");
437 return false;
438 }
439
440 if let Some(t) = self.last_event {
443 let since_last_event = t.elapsed();
444 if since_last_event < self.debounce {
445 debug!("Debounce active, but proceeding due to min-interval");
446 }
447 }
448
449 true
450 }
451
452 fn record_sync(&mut self) {
453 self.last_sync = time::Instant::now();
454 self.pending_sync = false;
455 self.last_event = None;
456 }
457}
458
459pub async fn watch_with_periodic_sync(
461 repo_path: impl AsRef<Path>,
462 sync_config: SyncConfig,
463 watch_config: WatchConfig,
464 sync_interval_ms: Option<u64>,
465) -> Result<()> {
466 let manager = WatchManager::new(repo_path, sync_config, watch_config);
467
468 if let Some(interval_ms) = sync_interval_ms {
469 info!(
471 "Periodic sync enabled (interval: {}s)",
472 interval_ms as f64 / 1000.0
473 );
474
475 let manager_clone = Arc::new(manager);
476 let manager_watch = manager_clone.clone();
477
478 let watch_handle = tokio::spawn(async move { manager_watch.watch().await });
480
481 let periodic_handle = tokio::spawn(async move {
483 let mut interval = time::interval(Duration::from_millis(interval_ms));
484 interval.tick().await; loop {
487 interval.tick().await;
488 info!("Periodic sync triggered");
489 if let Err(e) = manager_clone.perform_sync().await {
490 error!("Periodic sync failed: {}", e);
491 }
492 }
493 });
494
495 tokio::select! {
497 result = watch_handle => result?,
498 result = periodic_handle => result?,
499 }
500 } else {
501 manager.watch().await
503 }
504}