1mod event_filter;
2
3use self::event_filter::EventFilter;
4use crate::error::{Result, SyncError};
5use crate::sync::{RepositorySynchronizer, SyncConfig};
6#[cfg(feature = "tray")]
7use crate::tray::{GitSyncTray, TrayCommand, TrayState, TrayStatus};
8#[cfg(feature = "tray")]
9use ksni::TrayMethods;
10use notify::{Config, Event, RecommendedWatcher, RecursiveMode, Watcher};
11use std::future::pending;
12use std::path::{Path, PathBuf};
13#[cfg(feature = "tray")]
14use std::sync::atomic::AtomicU64;
15use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
16use std::sync::Arc;
17use std::time::Duration;
18use tokio::sync::mpsc;
19#[cfg(feature = "tray")]
20use tokio::sync::watch as tokio_watch;
21#[cfg(feature = "tray")]
22use tokio::sync::RwLock;
23use tokio::time;
24use tracing::{debug, error, info, warn};
25
26#[cfg(feature = "tray")]
27const TRAY_RETRY_FALLBACK_DELAY: Duration = Duration::from_secs(15);
28#[cfg(feature = "tray")]
29const TRAY_RETRY_SOON_DELAY: Duration = Duration::from_secs(1);
30
31#[derive(Debug, Clone)]
33pub struct WatchConfig {
34 pub debounce_ms: u64,
36
37 pub min_interval_ms: u64,
39
40 pub sync_on_start: bool,
42
43 pub dry_run: bool,
45
46 pub enable_tray: bool,
48
49 pub tray_icon: Option<String>,
51
52 pub periodic_sync_interval_ms: Option<u64>,
55}
56
57impl Default for WatchConfig {
58 fn default() -> Self {
59 Self {
60 debounce_ms: 500,
61 min_interval_ms: 1000,
62 sync_on_start: true,
63 dry_run: false,
64 enable_tray: false,
65 tray_icon: None,
66 periodic_sync_interval_ms: None,
67 }
68 }
69}
70
71pub struct WatchManager {
73 repo_path: String,
74 sync_config: SyncConfig,
75 watch_config: WatchConfig,
76 is_syncing: Arc<AtomicBool>,
77 sync_suspended: Arc<AtomicBool>,
78 last_successful_sync_unix_secs: Arc<AtomicI64>,
79 #[cfg(feature = "tray")]
80 last_sync_error: Arc<RwLock<Option<String>>>,
81 #[cfg(feature = "tray")]
82 sync_state_change_tx: tokio_watch::Sender<u64>,
83 #[cfg(feature = "tray")]
84 sync_state_change_seq: Arc<AtomicU64>,
85}
86
87struct FileEventHandler {
89 repo_path: PathBuf,
90 tx: mpsc::Sender<Event>,
91}
92
93impl FileEventHandler {
94 fn new(repo_path: PathBuf, tx: mpsc::Sender<Event>) -> Self {
95 Self { repo_path, tx }
96 }
97
98 fn handle_event(&self, res: std::result::Result<Event, notify::Error>) {
99 let event = match res {
100 Ok(event) => event,
101 Err(e) => {
102 error!("Watch error: {}", e);
103 return;
104 }
105 };
106
107 debug!("Raw file event received: {:?}", event);
108
109 if !self.should_process_event(&event) {
110 return;
111 }
112
113 debug!("Event is relevant, sending to channel");
114 if let Err(e) = self.tx.blocking_send(event.clone()) {
115 error!("Failed to send event to channel: {}", e);
116 } else {
117 debug!("Event sent successfully: {:?}", event.kind);
118 }
119 }
120
121 fn should_process_event(&self, event: &Event) -> bool {
122 EventFilter::should_process_event(&self.repo_path, event)
123 }
124}
125
126impl WatchManager {
127 pub fn new(
129 repo_path: impl AsRef<Path>,
130 sync_config: SyncConfig,
131 watch_config: WatchConfig,
132 ) -> Self {
133 let path_str = repo_path.as_ref().to_string_lossy();
135 let expanded = shellexpand::tilde(&path_str).to_string();
136 #[cfg(feature = "tray")]
137 let (sync_state_change_tx, _) = tokio_watch::channel(0);
138
139 Self {
140 repo_path: expanded,
141 sync_config,
142 watch_config,
143 is_syncing: Arc::new(AtomicBool::new(false)),
144 sync_suspended: Arc::new(AtomicBool::new(false)),
145 last_successful_sync_unix_secs: Arc::new(AtomicI64::new(0)),
146 #[cfg(feature = "tray")]
147 last_sync_error: Arc::new(RwLock::new(None)),
148 #[cfg(feature = "tray")]
149 sync_state_change_tx,
150 #[cfg(feature = "tray")]
151 sync_state_change_seq: Arc::new(AtomicU64::new(0)),
152 }
153 }
154
155 pub async fn watch(&self) -> Result<()> {
157 info!("Starting watch mode for: {}", self.repo_path);
158
159 if self.watch_config.sync_on_start {
161 info!("Performing initial sync");
162 self.perform_sync().await?;
163 }
164
165 let (tx, rx) = mpsc::channel::<Event>(100);
167
168 let _watcher = self.setup_watcher(tx)?;
170
171 info!(
172 "Watching for changes (debounce: {}s)",
173 self.watch_config.debounce_ms as f64 / 1000.0
174 );
175
176 self.process_events(rx).await
178 }
179
180 fn setup_watcher(&self, tx: mpsc::Sender<Event>) -> Result<RecommendedWatcher> {
182 let repo_path_clone = PathBuf::from(&self.repo_path);
183 let handler = FileEventHandler::new(repo_path_clone, tx);
184
185 let mut watcher =
186 RecommendedWatcher::new(move |res| handler.handle_event(res), Config::default())?;
187
188 watcher.watch(Path::new(&self.repo_path), RecursiveMode::Recursive)?;
190
191 Ok(watcher)
192 }
193
194 async fn process_events(&self, mut rx: mpsc::Receiver<Event>) -> Result<()> {
196 let mut sync_state = SyncScheduler::new(
197 self.watch_config.debounce_ms,
198 self.watch_config.min_interval_ms,
199 );
200
201 let tick_ms = self
203 .watch_config
204 .debounce_ms
205 .min(self.watch_config.min_interval_ms)
206 .max(50);
207 let mut interval = time::interval(Duration::from_millis(tick_ms));
208 interval.tick().await; let mut periodic_interval =
211 self.watch_config
212 .periodic_sync_interval_ms
213 .map(|interval_ms| {
214 info!(
215 "Periodic sync enabled (interval: {}s)",
216 interval_ms as f64 / 1000.0
217 );
218 time::interval(Duration::from_millis(interval_ms))
219 });
220 if let Some(interval) = periodic_interval.as_mut() {
221 interval.tick().await; }
223
224 #[cfg(feature = "tray")]
225 if self.watch_config.enable_tray {
226 return self
227 .process_events_with_tray_resilient(
228 &mut rx,
229 &mut sync_state,
230 &mut interval,
231 &mut periodic_interval,
232 )
233 .await;
234 }
235
236 self.process_events_loop(
237 &mut rx,
238 &mut sync_state,
239 &mut interval,
240 &mut periodic_interval,
241 false,
242 )
243 .await
244 }
245
246 async fn process_events_loop(
248 &self,
249 rx: &mut mpsc::Receiver<Event>,
250 sync_state: &mut SyncScheduler,
251 interval: &mut time::Interval,
252 periodic_interval: &mut Option<time::Interval>,
253 paused: bool,
254 ) -> Result<()> {
255 loop {
256 tokio::select! {
257 biased;
258 _ = interval.tick() => {
259 if !paused {
260 self.handle_timeout(sync_state).await;
261 }
262 }
263 Some(event) = rx.recv() => {
264 if !paused {
265 self.handle_file_event(event, sync_state);
266 }
267 }
268 _ = Self::tick_optional_interval(periodic_interval) => {
269 if !paused {
270 sync_state.request_sync_now();
271 self.handle_timeout(sync_state).await;
272 }
273 }
274 }
275 }
276 }
277
278 #[cfg(feature = "tray")]
284 async fn process_events_with_tray_resilient(
285 &self,
286 rx: &mut mpsc::Receiver<Event>,
287 sync_state: &mut SyncScheduler,
288 interval: &mut time::Interval,
289 periodic_interval: &mut Option<time::Interval>,
290 ) -> Result<()> {
291 let (cmd_tx, mut cmd_rx) = tokio::sync::mpsc::unbounded_channel();
292 let mut tray_state = TrayState::new(PathBuf::from(&self.repo_path));
293 let tray_icon = self.watch_config.tray_icon.clone();
294 let mut tray_handle: Option<ksni::Handle<GitSyncTray>> = None;
295
296 let mut tray_next_attempt = time::Instant::now();
297 let mut tray_spawn_task: Option<
298 tokio::task::JoinHandle<std::result::Result<ksni::Handle<GitSyncTray>, ksni::Error>>,
299 > = None;
300 let mut dbus_bus_watch = Self::setup_dbus_session_bus_watch();
301 let mut sync_state_change_rx = self.sync_state_change_tx.subscribe();
302 let mut last_sync_text_snapshot = tray_state.last_sync_text();
303
304 loop {
305 tokio::select! {
306 biased;
307 _ = interval.tick() => {
308 if let Some(task) = tray_spawn_task.as_ref() {
310 if task.is_finished() {
311 match tray_spawn_task.take().expect("checked Some above").await {
312 Ok(Ok(handle)) => {
313 info!("System tray indicator started");
314 tray_handle = Some(handle);
315 tray_next_attempt = time::Instant::now();
316 self.reconcile_tray_state_from_global(
318 &mut tray_state,
319 &mut tray_handle,
320 )
321 .await;
322 }
323 Ok(Err(e)) => {
324 let retry_delay = match &e {
325 ksni::Error::WontShow => TRAY_RETRY_SOON_DELAY,
326 ksni::Error::Watcher(fdo_err)
327 if format!("{fdo_err:?}").contains("UnknownObject") =>
328 {
329 TRAY_RETRY_SOON_DELAY
330 }
331 _ => TRAY_RETRY_FALLBACK_DELAY,
332 };
333 warn!(
334 error = %e,
335 delay_s = retry_delay.as_secs_f64(),
336 "Tray unavailable; will retry"
337 );
338 tray_next_attempt = time::Instant::now() + retry_delay;
339 }
340 Err(e) => {
341 warn!(
342 error = %e,
343 delay_s = TRAY_RETRY_FALLBACK_DELAY.as_secs_f64(),
344 "Tray spawn task failed; will retry"
345 );
346 tray_next_attempt = time::Instant::now() + TRAY_RETRY_FALLBACK_DELAY;
347 }
348 }
349 }
350 }
351
352 if tray_handle.is_none()
354 && tray_spawn_task.is_none()
355 && time::Instant::now() >= tray_next_attempt
356 {
357 tray_spawn_task = Some(Self::spawn_tray_task(
358 tray_state.clone(),
359 cmd_tx.clone(),
360 tray_icon.clone(),
361 ));
362 }
363
364 self.handle_timeout_with_optional_tray(sync_state, &mut tray_state, &mut tray_handle).await;
365 self.reconcile_tray_state_from_global(&mut tray_state, &mut tray_handle)
366 .await;
367 self.refresh_tray_relative_time_display(
368 &mut tray_state,
369 &mut tray_handle,
370 &mut last_sync_text_snapshot,
371 )
372 .await;
373 }
374 Some(event) = rx.recv() => {
375 self.handle_file_event(event, sync_state);
376 }
377 _ = Self::tick_optional_interval(periodic_interval) => {
378 sync_state.request_sync_now();
379 self.handle_timeout_with_optional_tray(sync_state, &mut tray_state, &mut tray_handle).await;
380 }
381 Some(cmd) = cmd_rx.recv() => {
382 match cmd {
383 TrayCommand::SyncNow => {
384 if tray_state.paused {
385 debug!("Tray: manual sync requested while suspended; ignoring");
386 } else {
387 info!("Tray: manual sync requested");
388 self.do_sync_with_optional_tray_update(sync_state, &mut tray_state, &mut tray_handle).await;
389 }
390 }
391 TrayCommand::Suspend => {
392 info!("Tray: suspending all sync activity");
393 self.set_sync_suspended(true);
394 self.reconcile_tray_state_from_global(&mut tray_state, &mut tray_handle)
395 .await;
396 }
397 TrayCommand::Resume => {
398 info!("Tray: resuming sync activity");
399 self.set_sync_suspended(false);
400 self.reconcile_tray_state_from_global(&mut tray_state, &mut tray_handle)
401 .await;
402 }
403 TrayCommand::Quit => {
404 info!("Tray: quit requested");
405 if let Some(handle) = &tray_handle {
406 handle.shutdown().await;
408 }
409 return Ok(());
410 }
411 TrayCommand::Respawn { reason } => {
412 warn!(reason = %reason, "Tray: respawn requested");
413
414 if let Some(task) = tray_spawn_task.take() {
415 task.abort();
416 }
417
418 if let Some(handle) = tray_handle.take() {
419 handle.shutdown().await;
421 }
422
423 tray_next_attempt = time::Instant::now() + TRAY_RETRY_SOON_DELAY;
424 }
425 }
426 }
427 dbus_event = async {
428 if let Some((_, rx)) = dbus_bus_watch.as_mut() {
429 rx.recv().await
430 } else {
431 None
432 }
433 }, if dbus_bus_watch.is_some() => {
434 match dbus_event {
435 Some(()) => {
436 info!("Detected D-Bus session bus socket activity; retrying tray startup now");
437 tray_next_attempt = time::Instant::now();
438
439 if tray_handle.is_none() && tray_spawn_task.is_none() {
440 tray_spawn_task = Some(Self::spawn_tray_task(
441 tray_state.clone(),
442 cmd_tx.clone(),
443 tray_icon.clone(),
444 ));
445 }
446 }
447 None => {
448 warn!("D-Bus session bus watcher channel closed; falling back to periodic retry");
449 dbus_bus_watch = None;
450 }
451 }
452 }
453 sync_state_change = sync_state_change_rx.changed() => {
454 match sync_state_change {
455 Ok(()) => {
456 self.reconcile_tray_state_from_global(&mut tray_state, &mut tray_handle)
457 .await;
458 self.refresh_tray_relative_time_display(
459 &mut tray_state,
460 &mut tray_handle,
461 &mut last_sync_text_snapshot,
462 )
463 .await;
464 }
465 Err(e) => {
466 warn!(error = %e, "Tray sync-state update channel closed");
467 }
468 }
469 }
470 }
471 }
472 }
473
474 async fn tick_optional_interval(interval: &mut Option<time::Interval>) {
475 match interval {
476 Some(i) => {
477 i.tick().await;
478 }
479 None => pending::<()>().await,
480 }
481 }
482
483 #[cfg(feature = "tray")]
484 fn spawn_tray_task(
485 tray_state: TrayState,
486 cmd_tx: tokio::sync::mpsc::UnboundedSender<TrayCommand>,
487 tray_icon: Option<String>,
488 ) -> tokio::task::JoinHandle<std::result::Result<ksni::Handle<GitSyncTray>, ksni::Error>> {
489 tokio::spawn(async move {
490 let tray = GitSyncTray::new(tray_state, cmd_tx, tray_icon);
491 tray.assume_sni_available(true).spawn().await
495 })
496 }
497
498 #[cfg(feature = "tray")]
499 fn setup_dbus_session_bus_watch(
500 ) -> Option<(RecommendedWatcher, tokio::sync::mpsc::UnboundedReceiver<()>)> {
501 let Some(socket_path) = Self::dbus_session_bus_socket_path() else {
502 debug!("DBUS_SESSION_BUS_ADDRESS not watchable (no unix:path=...); using periodic tray retry");
503 return None;
504 };
505 Self::setup_dbus_socket_watch(socket_path)
506 }
507
508 #[cfg(feature = "tray")]
509 fn setup_dbus_socket_watch(
510 socket_path: PathBuf,
511 ) -> Option<(RecommendedWatcher, tokio::sync::mpsc::UnboundedReceiver<()>)> {
512 let Some(parent_dir) = socket_path.parent() else {
513 warn!(
514 path = %socket_path.display(),
515 "Unable to watch D-Bus session bus socket parent directory; using periodic tray retry"
516 );
517 return None;
518 };
519
520 let watched_name = socket_path.file_name().map(|n| n.to_os_string());
521 let socket_path_for_cb = socket_path.clone();
522 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
523
524 let mut watcher = match RecommendedWatcher::new(
525 move |res: std::result::Result<Event, notify::Error>| match res {
526 Ok(event) => {
527 let matches_path = event.paths.iter().any(|path| {
528 if *path == socket_path_for_cb {
529 return true;
530 }
531 match (&watched_name, path.file_name()) {
532 (Some(name), Some(file_name)) => file_name == name,
533 _ => false,
534 }
535 });
536 if matches_path {
537 let _ = tx.send(());
538 }
539 }
540 Err(e) => {
541 warn!(error = %e, "D-Bus session bus watcher error");
542 }
543 },
544 Config::default(),
545 ) {
546 Ok(w) => w,
547 Err(e) => {
548 warn!(
549 path = %socket_path.display(),
550 error = %e,
551 "Failed to create D-Bus session bus watcher; using periodic tray retry"
552 );
553 return None;
554 }
555 };
556
557 if let Err(e) = watcher.watch(parent_dir, RecursiveMode::NonRecursive) {
558 warn!(
559 path = %parent_dir.display(),
560 error = %e,
561 "Failed to watch D-Bus session bus directory; using periodic tray retry"
562 );
563 return None;
564 }
565
566 info!(
567 path = %socket_path.display(),
568 "Watching D-Bus session bus socket for tray reconnection triggers"
569 );
570 Some((watcher, rx))
571 }
572
573 #[cfg(feature = "tray")]
574 fn dbus_session_bus_socket_path() -> Option<PathBuf> {
575 let address = std::env::var("DBUS_SESSION_BUS_ADDRESS").ok()?;
576 Self::parse_dbus_unix_path(&address)
577 }
578
579 #[cfg(feature = "tray")]
580 fn parse_dbus_unix_path(address: &str) -> Option<PathBuf> {
581 for segment in address.split(';') {
583 if !segment.starts_with("unix:") {
584 continue;
585 }
586
587 let params = &segment["unix:".len()..];
588 for param in params.split(',') {
589 let Some((key, value)) = param.split_once('=') else {
590 continue;
591 };
592 if key == "path" && !value.is_empty() {
593 return Some(PathBuf::from(value));
594 }
595 }
596 }
597
598 None
599 }
600
601 #[cfg(feature = "tray")]
602 async fn tray_apply_state(
603 &self,
604 tray_handle: &mut Option<ksni::Handle<GitSyncTray>>,
605 tray_state: &TrayState,
606 ) {
607 let Some(handle) = tray_handle.as_ref() else {
608 return;
609 };
610
611 let state = tray_state.clone();
612 let update_result = handle
613 .update(move |t: &mut GitSyncTray| {
614 t.state = state;
615 t.bump_icon_generation();
616 })
617 .await;
618
619 if update_result.is_none() {
620 warn!("Tray: handle.update returned None - tray service may be dead; will attempt to respawn");
621 *tray_handle = None;
622 }
623 }
624
625 #[cfg(feature = "tray")]
626 async fn reconcile_tray_state_from_global(
627 &self,
628 tray_state: &mut TrayState,
629 tray_handle: &mut Option<ksni::Handle<GitSyncTray>>,
630 ) {
631 let mut changed = false;
632 let paused = self.is_sync_suspended();
633
634 if tray_state.paused != paused {
635 tray_state.paused = paused;
636 changed = true;
637 }
638
639 if !tray_state.paused {
640 let desired_status = self.desired_tray_status().await;
641 if tray_state.status != desired_status {
642 tray_state.status = desired_status.clone();
643 changed = true;
644 }
645
646 let desired_last_error = match desired_status {
647 TrayStatus::Error(msg) => Some(msg),
648 _ => None,
649 };
650 if tray_state.last_error != desired_last_error {
651 tray_state.last_error = desired_last_error;
652 changed = true;
653 }
654 }
655
656 let latest_sync = self.latest_successful_sync_datetime();
657 if tray_state.last_sync != latest_sync {
658 tray_state.last_sync = latest_sync;
659 changed = true;
660 }
661
662 if changed {
663 self.tray_apply_state(tray_handle, tray_state).await;
664 }
665 }
666
667 #[cfg(feature = "tray")]
668 async fn refresh_tray_relative_time_display(
669 &self,
670 tray_state: &mut TrayState,
671 tray_handle: &mut Option<ksni::Handle<GitSyncTray>>,
672 last_sync_text_snapshot: &mut String,
673 ) {
674 let current = tray_state.last_sync_text();
675 if ¤t == last_sync_text_snapshot {
676 return;
677 }
678
679 *last_sync_text_snapshot = current;
680 self.tray_apply_state(tray_handle, tray_state).await;
683 }
684
685 #[cfg(feature = "tray")]
686 async fn desired_tray_status(&self) -> TrayStatus {
687 if self.is_syncing.load(Ordering::Acquire) {
688 return TrayStatus::Syncing;
689 }
690
691 let last_error = self.last_sync_error.read().await.clone();
692 match last_error {
693 Some(msg) => TrayStatus::Error(msg),
694 None => TrayStatus::Idle,
695 }
696 }
697
698 #[cfg(feature = "tray")]
699 fn latest_successful_sync_datetime(&self) -> Option<chrono::DateTime<chrono::Local>> {
700 let unix_secs = self.last_successful_sync_unix_secs.load(Ordering::Acquire);
701 if unix_secs <= 0 {
702 return None;
703 }
704 use chrono::TimeZone;
705 chrono::Local.timestamp_opt(unix_secs, 0).single()
706 }
707
708 #[cfg(feature = "tray")]
709 fn notify_sync_state_changed(&self) {
710 let seq = self.sync_state_change_seq.fetch_add(1, Ordering::AcqRel) + 1;
711 let _ = self.sync_state_change_tx.send(seq);
712 }
713
714 #[cfg(feature = "tray")]
716 async fn handle_timeout_with_optional_tray(
717 &self,
718 sync_state: &mut SyncScheduler,
719 tray_state: &mut TrayState,
720 tray_handle: &mut Option<ksni::Handle<GitSyncTray>>,
721 ) {
722 if self.is_sync_suspended() {
723 return;
724 }
725
726 if !sync_state.should_sync_now() {
727 return;
728 }
729
730 if self.is_syncing.load(Ordering::Acquire) {
731 debug!("Sync already in progress, skipping");
732 return;
733 }
734
735 self.do_sync_with_optional_tray_update(sync_state, tray_state, tray_handle)
736 .await;
737 }
738
739 #[cfg(feature = "tray")]
741 async fn do_sync_with_optional_tray_update(
742 &self,
743 sync_state: &mut SyncScheduler,
744 tray_state: &mut TrayState,
745 tray_handle: &mut Option<ksni::Handle<GitSyncTray>>,
746 ) {
747 if self.is_sync_suspended() {
748 debug!("Sync is suspended, skipping tray-triggered sync");
749 return;
750 }
751
752 info!("Tray: setting status to Syncing");
753 tray_state.status = TrayStatus::Syncing;
754 self.tray_apply_state(tray_handle, tray_state).await;
755
756 let span = tracing::info_span!(
757 "perform_sync_attempt",
758 repo = %self.repo_path,
759 branch = %self.sync_config.branch_name,
760 remote = %self.sync_config.remote_name,
761 dry_run = self.watch_config.dry_run
762 );
763 let _guard = span.enter();
764
765 match self.perform_sync().await {
766 Ok(()) => {
767 info!("Tray: perform_sync succeeded, setting status to Idle");
768 sync_state.on_sync_success();
769 tray_state.status = TrayStatus::Idle;
770 tray_state.last_error = None;
771 self.reconcile_tray_state_from_global(tray_state, tray_handle)
772 .await;
773 }
774 Err(e) => {
775 sync_state.on_sync_failure(&e);
776 let err_msg = e.to_string();
777 self.log_sync_error(&e);
778 info!("Tray: perform_sync failed, setting status to Error");
779 tray_state.status = TrayStatus::Error(err_msg.clone());
780 tray_state.last_error = Some(err_msg);
781 self.tray_apply_state(tray_handle, tray_state).await;
782 }
783 }
784 }
785
786 fn handle_file_event(&self, event: Event, sync_state: &mut SyncScheduler) {
788 debug!("Received event from channel: {:?}", event);
789 debug!("Event kind: {:?}, paths: {:?}", event.kind, event.paths);
790
791 if EventFilter::is_relevant_change(&event) {
792 info!("Relevant change detected, marking pending sync");
793 sync_state.mark_file_event();
794 } else {
795 debug!("Event not considered relevant: {:?}", event.kind);
796 }
797 }
798
799 async fn handle_timeout(&self, sync_state: &mut SyncScheduler) {
801 if self.is_sync_suspended() {
802 return;
803 }
804
805 if !sync_state.should_sync_now() {
806 return;
807 }
808
809 if self.is_syncing.load(Ordering::Acquire) {
811 debug!("Sync already in progress, skipping");
812 return;
813 }
814
815 info!("Changes detected, triggering sync");
816 let span = tracing::info_span!(
817 "perform_sync_attempt",
818 repo = %self.repo_path,
819 branch = %self.sync_config.branch_name,
820 remote = %self.sync_config.remote_name,
821 dry_run = self.watch_config.dry_run
822 );
823 let _guard = span.enter();
824 match self.perform_sync().await {
825 Ok(()) => {
826 debug!("perform_sync succeeded");
827 sync_state.on_sync_success();
828 }
829 Err(e) => {
830 sync_state.on_sync_failure(&e);
831 self.log_sync_error(&e);
832 }
833 }
834 }
835
836 fn log_sync_error(&self, e: &SyncError) {
837 match e {
838 SyncError::DetachedHead => {
839 error!("Sync failed: detached HEAD. Repository must be on a branch; will retry.")
840 }
841 SyncError::UnsafeRepositoryState { state } => error!(
842 state = %state,
843 "Sync failed: repository in unsafe state; will retry"
844 ),
845 SyncError::ManualInterventionRequired { reason } => warn!(
846 reason = %reason,
847 "Sync requires manual intervention; pending will remain set"
848 ),
849 SyncError::NoRemoteConfigured { branch } => error!(
850 branch = %branch,
851 "Sync failed: no remote configured for branch"
852 ),
853 SyncError::NetworkError(msg) => error!(
854 error = %msg,
855 "Network error during sync; will retry"
856 ),
857 SyncError::TaskError(msg) => error!(
858 error = %msg,
859 "Background task error during sync; will retry"
860 ),
861 SyncError::GitError(err) => error!(
862 code = ?err.code(),
863 klass = ?err.class(),
864 message = %err.message(),
865 "Git error during sync; will retry"
866 ),
867 other => error!(error = %other, "Sync failed; will retry"),
868 }
869 }
870
871 async fn perform_sync(&self) -> Result<()> {
873 if self.is_sync_suspended() {
874 debug!("Sync is suspended, skipping sync attempt");
875 return Ok(());
876 }
877
878 if self.is_syncing.swap(true, Ordering::AcqRel) {
880 debug!("Sync already in progress");
881 return Ok(());
882 }
883 #[cfg(feature = "tray")]
884 self.notify_sync_state_changed();
885
886 let result: Result<()> = if self.watch_config.dry_run {
888 info!("DRY RUN: Would perform sync now");
889 Ok(())
890 } else {
891 let repo_path = self.repo_path.clone();
893 let sync_config = self.sync_config.clone();
894
895 debug!("Spawning blocking sync task");
896 match tokio::task::spawn_blocking(move || {
897 let mut synchronizer =
899 RepositorySynchronizer::new_with_detected_branch(&repo_path, sync_config)?;
900
901 synchronizer.sync(false)
903 })
904 .await
905 {
906 Ok(inner) => inner,
907 Err(e) => {
908 error!("Join error waiting for sync task: {}", e);
909 Err(e.into())
910 }
911 }
912 };
913
914 self.is_syncing.store(false, Ordering::Release);
916
917 if result.is_ok() {
918 self.last_successful_sync_unix_secs
919 .store(chrono::Utc::now().timestamp(), Ordering::Release);
920 }
921 #[cfg(feature = "tray")]
922 {
923 let mut last_error = self.last_sync_error.write().await;
924 *last_error = result.as_ref().err().map(ToString::to_string);
925 }
926 #[cfg(feature = "tray")]
927 self.notify_sync_state_changed();
928
929 if let Err(ref err) = result {
930 error!(error = %err, "perform_sync finished with error");
931 } else {
932 debug!("perform_sync finished successfully");
933 }
934 result
935 }
936
937 fn is_sync_suspended(&self) -> bool {
938 self.sync_suspended.load(Ordering::Acquire)
939 }
940
941 #[cfg(feature = "tray")]
942 fn set_sync_suspended(&self, suspended: bool) {
943 self.sync_suspended.store(suspended, Ordering::Release);
944 self.notify_sync_state_changed();
945 }
946}
947
948struct SyncScheduler {
955 last_sync: time::Instant,
956 pending_sync: bool,
957 immediate_requested: bool,
958 min_interval: Duration,
959 debounce: Duration,
960 max_batch_latency: Duration,
961 first_event: Option<time::Instant>,
962 last_event: Option<time::Instant>,
963 next_retry_at: Option<time::Instant>,
964 retry_backoff: Duration,
965}
966
967impl SyncScheduler {
968 const RETRY_BACKOFF_INITIAL: Duration = Duration::from_secs(1);
969 const RETRY_BACKOFF_MAX: Duration = Duration::from_secs(60);
970 const RETRY_DELAY_MANUAL: Duration = Duration::from_secs(30);
971 const RETRY_DELAY_CONFIG: Duration = Duration::from_secs(60);
972 const RETRY_DELAY_STATE: Duration = Duration::from_secs(5);
973
974 fn new(debounce_ms: u64, min_interval_ms: u64) -> Self {
975 let debounce = Duration::from_millis(debounce_ms);
976 let min_interval = Duration::from_millis(min_interval_ms);
977 let max_batch_latency = debounce
978 .saturating_mul(8)
979 .max(min_interval)
980 .max(Duration::from_millis(500));
981
982 Self {
983 last_sync: time::Instant::now(),
984 pending_sync: false,
985 immediate_requested: false,
986 min_interval,
987 debounce,
988 max_batch_latency,
989 first_event: None,
990 last_event: None,
991 next_retry_at: None,
992 retry_backoff: Self::RETRY_BACKOFF_INITIAL,
993 }
994 }
995
996 fn mark_file_event(&mut self) {
997 self.mark_file_event_at(time::Instant::now());
998 }
999
1000 fn mark_file_event_at(&mut self, now: time::Instant) {
1001 self.pending_sync = true;
1002 self.immediate_requested = false;
1003 self.first_event.get_or_insert(now);
1004 self.last_event = Some(now);
1005 }
1006
1007 fn request_sync_now(&mut self) {
1008 self.request_sync_now_at(time::Instant::now());
1009 }
1010
1011 fn request_sync_now_at(&mut self, now: time::Instant) {
1012 self.pending_sync = true;
1013 self.immediate_requested = true;
1014 self.first_event.get_or_insert(now);
1015 self.last_event.get_or_insert(now);
1016 }
1017
1018 fn should_sync_now(&self) -> bool {
1019 self.should_sync_at(time::Instant::now())
1020 }
1021
1022 fn should_sync_at(&self, now: time::Instant) -> bool {
1023 if !self.pending_sync {
1024 return false;
1025 }
1026
1027 if let Some(next_retry_at) = self.next_retry_at {
1028 if now < next_retry_at {
1029 return false;
1030 }
1031 }
1032
1033 if now.duration_since(self.last_sync) < self.min_interval {
1034 return false;
1035 }
1036
1037 if self.immediate_requested {
1038 return true;
1039 }
1040
1041 let quiet_ready = self
1042 .last_event
1043 .map(|last| now.duration_since(last) >= self.debounce)
1044 .unwrap_or(false);
1045 if quiet_ready {
1046 return true;
1047 }
1048
1049 self.first_event
1050 .map(|first| now.duration_since(first) >= self.max_batch_latency)
1051 .unwrap_or(false)
1052 }
1053
1054 fn on_sync_success(&mut self) {
1055 self.on_sync_success_at(time::Instant::now());
1056 }
1057
1058 fn on_sync_success_at(&mut self, now: time::Instant) {
1059 self.last_sync = now;
1060 self.pending_sync = false;
1061 self.immediate_requested = false;
1062 self.first_event = None;
1063 self.last_event = None;
1064 self.next_retry_at = None;
1065 self.retry_backoff = Self::RETRY_BACKOFF_INITIAL;
1066 }
1067
1068 fn on_sync_failure(&mut self, error: &SyncError) {
1069 self.on_sync_failure_at(error, time::Instant::now());
1070 }
1071
1072 fn on_sync_failure_at(&mut self, error: &SyncError, now: time::Instant) {
1073 self.last_sync = now;
1074 self.pending_sync = true;
1075 self.immediate_requested = false;
1076
1077 let delay = self.retry_delay_for(error);
1078 self.next_retry_at = Some(now + delay);
1079 debug!(
1080 delay_s = delay.as_secs_f64(),
1081 error = %error,
1082 "Sync failure scheduled with retry backoff"
1083 );
1084 }
1085
1086 fn retry_delay_for(&mut self, error: &SyncError) -> Duration {
1087 match error {
1088 SyncError::ManualInterventionRequired { .. } | SyncError::HookRejected { .. } => {
1089 Self::RETRY_DELAY_MANUAL
1090 }
1091 SyncError::NoRemoteConfigured { .. }
1092 | SyncError::RemoteBranchNotFound { .. }
1093 | SyncError::NotARepository { .. } => Self::RETRY_DELAY_CONFIG,
1094 SyncError::DetachedHead | SyncError::UnsafeRepositoryState { .. } => {
1095 Self::RETRY_DELAY_STATE
1096 }
1097 _ => {
1098 let delay = self.retry_backoff;
1099 self.retry_backoff = self
1100 .retry_backoff
1101 .saturating_mul(2)
1102 .min(Self::RETRY_BACKOFF_MAX);
1103 delay
1104 }
1105 }
1106 }
1107}
1108
1109pub async fn watch_with_periodic_sync(
1111 repo_path: impl AsRef<Path>,
1112 sync_config: SyncConfig,
1113 mut watch_config: WatchConfig,
1114 sync_interval_ms: Option<u64>,
1115) -> Result<()> {
1116 watch_config.periodic_sync_interval_ms = sync_interval_ms;
1117 let manager = WatchManager::new(repo_path, sync_config, watch_config);
1118 manager.watch().await
1119}
1120
1121#[cfg(test)]
1122mod scheduler_tests {
1123 use super::SyncScheduler;
1124 use crate::error::SyncError;
1125 use tokio::time::{Duration, Instant};
1126
1127 #[test]
1128 fn scheduler_waits_for_quiet_period_before_syncing() {
1129 let mut scheduler = SyncScheduler::new(200, 100);
1130 let base = Instant::now();
1131 scheduler.last_sync = base;
1132 scheduler.mark_file_event_at(base);
1133
1134 assert!(!scheduler.should_sync_at(base));
1135 assert!(!scheduler.should_sync_at(base + Duration::from_millis(120)));
1136 assert!(scheduler.should_sync_at(base + Duration::from_millis(220)));
1137 }
1138
1139 #[test]
1140 fn scheduler_uses_max_batch_latency_to_prevent_starvation() {
1141 let mut scheduler = SyncScheduler::new(500, 100);
1142 let base = Instant::now();
1143 scheduler.last_sync = base;
1144 scheduler.mark_file_event_at(base);
1145
1146 for i in 1..40 {
1148 let t = base + Duration::from_millis(100 * i);
1149 scheduler.mark_file_event_at(t);
1150 assert!(
1151 !scheduler.should_sync_at(t),
1152 "Scheduler should still wait before max-batch threshold"
1153 );
1154 }
1155
1156 let ready_at = base + Duration::from_millis(4000);
1157 scheduler.mark_file_event_at(ready_at);
1158 assert!(
1159 scheduler.should_sync_at(ready_at),
1160 "Scheduler should fire at max-batch latency under continuous events"
1161 );
1162 }
1163
1164 #[test]
1165 fn scheduler_applies_retry_backoff_and_resets_on_success() {
1166 let mut scheduler = SyncScheduler::new(0, 0);
1167 let base = Instant::now();
1168 scheduler.last_sync = base;
1169 scheduler.mark_file_event_at(base);
1170 assert!(scheduler.should_sync_at(base));
1171
1172 scheduler.on_sync_failure_at(&SyncError::NetworkError("transient".to_string()), base);
1173 assert!(!scheduler.should_sync_at(base + Duration::from_millis(999)));
1174 assert!(scheduler.should_sync_at(base + Duration::from_millis(1000)));
1175
1176 let second = base + Duration::from_millis(1000);
1177 scheduler.on_sync_failure_at(&SyncError::NetworkError("transient".to_string()), second);
1178 assert!(!scheduler.should_sync_at(second + Duration::from_secs(1)));
1179 assert!(scheduler.should_sync_at(second + Duration::from_secs(2)));
1180
1181 scheduler.on_sync_success_at(second + Duration::from_secs(2));
1182 let next = second + Duration::from_secs(2);
1183 scheduler.mark_file_event_at(next);
1184 assert!(scheduler.should_sync_at(next));
1185 }
1186
1187 #[test]
1188 fn scheduler_uses_longer_retry_for_manual_intervention_errors() {
1189 let mut scheduler = SyncScheduler::new(0, 0);
1190 let base = Instant::now();
1191 scheduler.last_sync = base;
1192 scheduler.mark_file_event_at(base);
1193 assert!(scheduler.should_sync_at(base));
1194
1195 scheduler.on_sync_failure_at(
1196 &SyncError::ManualInterventionRequired {
1197 reason: "conflict".to_string(),
1198 },
1199 base,
1200 );
1201 assert!(!scheduler.should_sync_at(base + Duration::from_secs(29)));
1202 assert!(scheduler.should_sync_at(base + Duration::from_secs(30)));
1203 }
1204
1205 #[test]
1206 fn request_sync_now_bypasses_debounce_but_respects_min_interval() {
1207 let mut scheduler = SyncScheduler::new(10_000, 500);
1208 let base = Instant::now();
1209 scheduler.last_sync = base;
1210
1211 scheduler.request_sync_now_at(base + Duration::from_millis(100));
1212 assert!(!scheduler.should_sync_at(base + Duration::from_millis(499)));
1213 assert!(scheduler.should_sync_at(base + Duration::from_millis(500)));
1214 }
1215
1216 #[test]
1217 fn request_sync_now_does_not_bypass_retry_backoff() {
1218 let mut scheduler = SyncScheduler::new(0, 0);
1219 let base = Instant::now();
1220 scheduler.last_sync = base;
1221 scheduler.mark_file_event_at(base);
1222 assert!(scheduler.should_sync_at(base));
1223
1224 scheduler.on_sync_failure_at(&SyncError::NetworkError("transient".to_string()), base);
1225 scheduler.request_sync_now_at(base + Duration::from_millis(100));
1226 assert!(!scheduler.should_sync_at(base + Duration::from_millis(999)));
1227 assert!(scheduler.should_sync_at(base + Duration::from_millis(1000)));
1228 }
1229}
1230
1231#[cfg(all(test, feature = "tray"))]
1232mod tests {
1233 use super::{WatchConfig, WatchManager};
1234 use crate::sync::SyncConfig;
1235 use std::fs::File;
1236 use std::sync::atomic::Ordering;
1237 use tempfile::tempdir;
1238 use tokio::time::{timeout, Duration};
1239
1240 #[test]
1241 fn parse_dbus_unix_path_finds_path_with_extra_parameters() {
1242 let address =
1243 "unix:abstract=/tmp/dbus-XXXX,guid=abcdef;unix:path=/tmp/dbus-test-socket,guid=1234";
1244 let parsed = WatchManager::parse_dbus_unix_path(address);
1245 assert_eq!(
1246 parsed.as_deref(),
1247 Some(std::path::Path::new("/tmp/dbus-test-socket"))
1248 );
1249 }
1250
1251 #[test]
1252 fn parse_dbus_unix_path_ignores_malformed_parts() {
1253 let address = "unix:guid=abc,broken,other=123,another;unix:path=/tmp/dbus-test";
1254 let parsed = WatchManager::parse_dbus_unix_path(address);
1255 assert_eq!(
1256 parsed.as_deref(),
1257 Some(std::path::Path::new("/tmp/dbus-test"))
1258 );
1259 }
1260
1261 #[tokio::test]
1262 async fn setup_dbus_socket_watch_emits_on_socket_file_activity() {
1263 let dir = tempdir().expect("create tempdir");
1264 let socket_path = dir.path().join("bus");
1265
1266 let (_watcher, mut rx) = WatchManager::setup_dbus_socket_watch(socket_path.clone())
1267 .expect("watcher should initialize for valid path");
1268
1269 File::create(&socket_path).expect("create watched file");
1271
1272 let received = timeout(Duration::from_secs(2), rx.recv())
1273 .await
1274 .expect("timed out waiting for watcher event");
1275 assert_eq!(received, Some(()));
1276 }
1277
1278 #[tokio::test]
1279 async fn perform_sync_is_noop_when_suspended() {
1280 let manager = WatchManager::new(
1281 "/tmp/not-a-repo",
1282 SyncConfig::default(),
1283 WatchConfig::default(),
1284 );
1285 manager.set_sync_suspended(true);
1286
1287 manager
1288 .perform_sync()
1289 .await
1290 .expect("suspended sync should be a no-op");
1291
1292 assert_eq!(
1293 manager
1294 .last_successful_sync_unix_secs
1295 .load(Ordering::Acquire),
1296 0
1297 );
1298 assert!(!manager.is_syncing.load(Ordering::Acquire));
1299 }
1300}