Skip to main content

git_sync_rs/
watch.rs

1mod event_filter;
2
3use self::event_filter::EventFilter;
4use crate::error::{Result, SyncError};
5use crate::sync::{RepositorySynchronizer, SyncConfig};
6#[cfg(feature = "tray")]
7use crate::tray::{GitSyncTray, TrayCommand, TrayState, TrayStatus};
8#[cfg(feature = "tray")]
9use ksni::TrayMethods;
10use notify::{Config, Event, RecommendedWatcher, RecursiveMode, Watcher};
11use std::future::pending;
12use std::path::{Path, PathBuf};
13#[cfg(feature = "tray")]
14use std::sync::atomic::AtomicU64;
15use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
16use std::sync::Arc;
17use std::time::Duration;
18use tokio::sync::mpsc;
19#[cfg(feature = "tray")]
20use tokio::sync::watch as tokio_watch;
21#[cfg(feature = "tray")]
22use tokio::sync::RwLock;
23use tokio::time;
24use tracing::{debug, error, info, warn};
25
26#[cfg(feature = "tray")]
27const TRAY_RETRY_FALLBACK_DELAY: Duration = Duration::from_secs(15);
28#[cfg(feature = "tray")]
29const TRAY_RETRY_SOON_DELAY: Duration = Duration::from_secs(1);
30
31/// Watch mode configuration
32#[derive(Debug, Clone)]
33pub struct WatchConfig {
34    /// How long to wait after changes before syncing (milliseconds)
35    pub debounce_ms: u64,
36
37    /// Minimum interval between syncs (milliseconds)
38    pub min_interval_ms: u64,
39
40    /// Whether to sync on startup
41    pub sync_on_start: bool,
42
43    /// Dry run mode - detect changes but don't sync
44    pub dry_run: bool,
45
46    /// Enable system tray indicator (requires `tray` feature)
47    pub enable_tray: bool,
48
49    /// Custom tray icon: a freedesktop icon name or a path to an image file
50    pub tray_icon: Option<String>,
51
52    /// Optional periodic sync interval in milliseconds.
53    /// When set, sync attempts are triggered even without filesystem events.
54    pub periodic_sync_interval_ms: Option<u64>,
55}
56
57impl Default for WatchConfig {
58    fn default() -> Self {
59        Self {
60            debounce_ms: 500,
61            min_interval_ms: 1000,
62            sync_on_start: true,
63            dry_run: false,
64            enable_tray: false,
65            tray_icon: None,
66            periodic_sync_interval_ms: None,
67        }
68    }
69}
70
71/// Manages file system watching and automatic synchronization
72pub struct WatchManager {
73    repo_path: String,
74    sync_config: SyncConfig,
75    watch_config: WatchConfig,
76    is_syncing: Arc<AtomicBool>,
77    sync_suspended: Arc<AtomicBool>,
78    last_successful_sync_unix_secs: Arc<AtomicI64>,
79    #[cfg(feature = "tray")]
80    last_sync_error: Arc<RwLock<Option<String>>>,
81    #[cfg(feature = "tray")]
82    sync_state_change_tx: tokio_watch::Sender<u64>,
83    #[cfg(feature = "tray")]
84    sync_state_change_seq: Arc<AtomicU64>,
85}
86
87/// Event handler for file system changes
88struct FileEventHandler {
89    repo_path: PathBuf,
90    tx: mpsc::Sender<Event>,
91}
92
93impl FileEventHandler {
94    fn new(repo_path: PathBuf, tx: mpsc::Sender<Event>) -> Self {
95        Self { repo_path, tx }
96    }
97
98    fn handle_event(&self, res: std::result::Result<Event, notify::Error>) {
99        let event = match res {
100            Ok(event) => event,
101            Err(e) => {
102                error!("Watch error: {}", e);
103                return;
104            }
105        };
106
107        debug!("Raw file event received: {:?}", event);
108
109        if !self.should_process_event(&event) {
110            return;
111        }
112
113        debug!("Event is relevant, sending to channel");
114        if let Err(e) = self.tx.blocking_send(event.clone()) {
115            error!("Failed to send event to channel: {}", e);
116        } else {
117            debug!("Event sent successfully: {:?}", event.kind);
118        }
119    }
120
121    fn should_process_event(&self, event: &Event) -> bool {
122        EventFilter::should_process_event(&self.repo_path, event)
123    }
124}
125
126impl WatchManager {
127    /// Create a new watch manager
128    pub fn new(
129        repo_path: impl AsRef<Path>,
130        sync_config: SyncConfig,
131        watch_config: WatchConfig,
132    ) -> Self {
133        // Expand tilde in path
134        let path_str = repo_path.as_ref().to_string_lossy();
135        let expanded = shellexpand::tilde(&path_str).to_string();
136        #[cfg(feature = "tray")]
137        let (sync_state_change_tx, _) = tokio_watch::channel(0);
138
139        Self {
140            repo_path: expanded,
141            sync_config,
142            watch_config,
143            is_syncing: Arc::new(AtomicBool::new(false)),
144            sync_suspended: Arc::new(AtomicBool::new(false)),
145            last_successful_sync_unix_secs: Arc::new(AtomicI64::new(0)),
146            #[cfg(feature = "tray")]
147            last_sync_error: Arc::new(RwLock::new(None)),
148            #[cfg(feature = "tray")]
149            sync_state_change_tx,
150            #[cfg(feature = "tray")]
151            sync_state_change_seq: Arc::new(AtomicU64::new(0)),
152        }
153    }
154
155    /// Start watching for changes
156    pub async fn watch(&self) -> Result<()> {
157        info!("Starting watch mode for: {}", self.repo_path);
158
159        // Sync on startup if configured
160        if self.watch_config.sync_on_start {
161            info!("Performing initial sync");
162            self.perform_sync().await?;
163        }
164
165        // Create channel for file events
166        let (tx, rx) = mpsc::channel::<Event>(100);
167
168        // Setup file watcher
169        let _watcher = self.setup_watcher(tx)?;
170
171        info!(
172            "Watching for changes (debounce: {}s)",
173            self.watch_config.debounce_ms as f64 / 1000.0
174        );
175
176        // Process events
177        self.process_events(rx).await
178    }
179
180    /// Setup the file system watcher
181    fn setup_watcher(&self, tx: mpsc::Sender<Event>) -> Result<RecommendedWatcher> {
182        let repo_path_clone = PathBuf::from(&self.repo_path);
183        let handler = FileEventHandler::new(repo_path_clone, tx);
184
185        let mut watcher =
186            RecommendedWatcher::new(move |res| handler.handle_event(res), Config::default())?;
187
188        // Watch the repository path
189        watcher.watch(Path::new(&self.repo_path), RecursiveMode::Recursive)?;
190
191        Ok(watcher)
192    }
193
194    /// Process file system events
195    async fn process_events(&self, mut rx: mpsc::Receiver<Event>) -> Result<()> {
196        let mut sync_state = SyncScheduler::new(
197            self.watch_config.debounce_ms,
198            self.watch_config.min_interval_ms,
199        );
200
201        // Periodic interval prevents starvation under continuous events
202        let tick_ms = self
203            .watch_config
204            .debounce_ms
205            .min(self.watch_config.min_interval_ms)
206            .max(50);
207        let mut interval = time::interval(Duration::from_millis(tick_ms));
208        interval.tick().await; // align first tick
209
210        let mut periodic_interval =
211            self.watch_config
212                .periodic_sync_interval_ms
213                .map(|interval_ms| {
214                    info!(
215                        "Periodic sync enabled (interval: {}s)",
216                        interval_ms as f64 / 1000.0
217                    );
218                    time::interval(Duration::from_millis(interval_ms))
219                });
220        if let Some(interval) = periodic_interval.as_mut() {
221            interval.tick().await; // Skip first immediate tick
222        }
223
224        #[cfg(feature = "tray")]
225        if self.watch_config.enable_tray {
226            return self
227                .process_events_with_tray_resilient(
228                    &mut rx,
229                    &mut sync_state,
230                    &mut interval,
231                    &mut periodic_interval,
232                )
233                .await;
234        }
235
236        self.process_events_loop(
237            &mut rx,
238            &mut sync_state,
239            &mut interval,
240            &mut periodic_interval,
241            false,
242        )
243        .await
244    }
245
246    /// Core event loop without tray
247    async fn process_events_loop(
248        &self,
249        rx: &mut mpsc::Receiver<Event>,
250        sync_state: &mut SyncScheduler,
251        interval: &mut time::Interval,
252        periodic_interval: &mut Option<time::Interval>,
253        paused: bool,
254    ) -> Result<()> {
255        loop {
256            tokio::select! {
257                biased;
258                _ = interval.tick() => {
259                    if !paused {
260                        self.handle_timeout(sync_state).await;
261                    }
262                }
263                Some(event) = rx.recv() => {
264                    if !paused {
265                        self.handle_file_event(event, sync_state);
266                    }
267                }
268                _ = Self::tick_optional_interval(periodic_interval) => {
269                    if !paused {
270                        sync_state.request_sync_now();
271                        self.handle_timeout(sync_state).await;
272                    }
273                }
274            }
275        }
276    }
277
278    /// Event loop with tray integration.
279    ///
280    /// This must never fail startup: the tray is best-effort. If we can't connect
281    /// to the graphical session / StatusNotifierWatcher, we keep running headless
282    /// and retry periodically.
283    #[cfg(feature = "tray")]
284    async fn process_events_with_tray_resilient(
285        &self,
286        rx: &mut mpsc::Receiver<Event>,
287        sync_state: &mut SyncScheduler,
288        interval: &mut time::Interval,
289        periodic_interval: &mut Option<time::Interval>,
290    ) -> Result<()> {
291        let (cmd_tx, mut cmd_rx) = tokio::sync::mpsc::unbounded_channel();
292        let mut tray_state = TrayState::new(PathBuf::from(&self.repo_path));
293        let tray_icon = self.watch_config.tray_icon.clone();
294        let mut tray_handle: Option<ksni::Handle<GitSyncTray>> = None;
295
296        let mut tray_next_attempt = time::Instant::now();
297        let mut tray_spawn_task: Option<
298            tokio::task::JoinHandle<std::result::Result<ksni::Handle<GitSyncTray>, ksni::Error>>,
299        > = None;
300        let mut dbus_bus_watch = Self::setup_dbus_session_bus_watch();
301        let mut sync_state_change_rx = self.sync_state_change_tx.subscribe();
302        let mut last_sync_text_snapshot = tray_state.last_sync_text();
303
304        loop {
305            tokio::select! {
306                biased;
307                _ = interval.tick() => {
308                    // If a spawn attempt finished, harvest it (non-blocking: is_finished checked).
309                    if let Some(task) = tray_spawn_task.as_ref() {
310                        if task.is_finished() {
311                            match tray_spawn_task.take().expect("checked Some above").await {
312                                Ok(Ok(handle)) => {
313                                    info!("System tray indicator started");
314                                    tray_handle = Some(handle);
315                                    tray_next_attempt = time::Instant::now();
316                                    // Ensure the tray reflects current state even if it changed during spawn.
317                                    self.reconcile_tray_state_from_global(
318                                        &mut tray_state,
319                                        &mut tray_handle,
320                                    )
321                                    .await;
322                                }
323                                Ok(Err(e)) => {
324                                    let retry_delay = match &e {
325                                        ksni::Error::WontShow => TRAY_RETRY_SOON_DELAY,
326                                        ksni::Error::Watcher(fdo_err)
327                                            if format!("{fdo_err:?}").contains("UnknownObject") =>
328                                        {
329                                            TRAY_RETRY_SOON_DELAY
330                                        }
331                                        _ => TRAY_RETRY_FALLBACK_DELAY,
332                                    };
333                                    warn!(
334                                        error = %e,
335                                        delay_s = retry_delay.as_secs_f64(),
336                                        "Tray unavailable; will retry"
337                                    );
338                                    tray_next_attempt = time::Instant::now() + retry_delay;
339                                }
340                                Err(e) => {
341                                    warn!(
342                                        error = %e,
343                                        delay_s = TRAY_RETRY_FALLBACK_DELAY.as_secs_f64(),
344                                        "Tray spawn task failed; will retry"
345                                    );
346                                    tray_next_attempt = time::Instant::now() + TRAY_RETRY_FALLBACK_DELAY;
347                                }
348                            }
349                        }
350                    }
351
352                    // If we don't have a tray yet, and no spawn is in flight, try again when due.
353                    if tray_handle.is_none()
354                        && tray_spawn_task.is_none()
355                        && time::Instant::now() >= tray_next_attempt
356                    {
357                        tray_spawn_task = Some(Self::spawn_tray_task(
358                            tray_state.clone(),
359                            cmd_tx.clone(),
360                            tray_icon.clone(),
361                        ));
362                    }
363
364                    self.handle_timeout_with_optional_tray(sync_state, &mut tray_state, &mut tray_handle).await;
365                    self.reconcile_tray_state_from_global(&mut tray_state, &mut tray_handle)
366                        .await;
367                    self.refresh_tray_relative_time_display(
368                        &mut tray_state,
369                        &mut tray_handle,
370                        &mut last_sync_text_snapshot,
371                    )
372                    .await;
373                }
374                Some(event) = rx.recv() => {
375                    self.handle_file_event(event, sync_state);
376                }
377                _ = Self::tick_optional_interval(periodic_interval) => {
378                    sync_state.request_sync_now();
379                    self.handle_timeout_with_optional_tray(sync_state, &mut tray_state, &mut tray_handle).await;
380                }
381                Some(cmd) = cmd_rx.recv() => {
382                    match cmd {
383                        TrayCommand::SyncNow => {
384                            if tray_state.paused {
385                                debug!("Tray: manual sync requested while suspended; ignoring");
386                            } else {
387                                info!("Tray: manual sync requested");
388                                self.do_sync_with_optional_tray_update(sync_state, &mut tray_state, &mut tray_handle).await;
389                            }
390                        }
391                        TrayCommand::Suspend => {
392                            info!("Tray: suspending all sync activity");
393                            self.set_sync_suspended(true);
394                            self.reconcile_tray_state_from_global(&mut tray_state, &mut tray_handle)
395                                .await;
396                        }
397                        TrayCommand::Resume => {
398                            info!("Tray: resuming sync activity");
399                            self.set_sync_suspended(false);
400                            self.reconcile_tray_state_from_global(&mut tray_state, &mut tray_handle)
401                                .await;
402                        }
403                        TrayCommand::Quit => {
404                            info!("Tray: quit requested");
405                            if let Some(handle) = &tray_handle {
406                                // Best-effort shutdown before exiting watch mode.
407                                handle.shutdown().await;
408                            }
409                            return Ok(());
410                        }
411                        TrayCommand::Respawn { reason } => {
412                            warn!(reason = %reason, "Tray: respawn requested");
413
414                            if let Some(task) = tray_spawn_task.take() {
415                                task.abort();
416                            }
417
418                            if let Some(handle) = tray_handle.take() {
419                                // Best-effort shutdown; if the service already stopped this is a no-op.
420                                handle.shutdown().await;
421                            }
422
423                            tray_next_attempt = time::Instant::now() + TRAY_RETRY_SOON_DELAY;
424                        }
425                    }
426                }
427                dbus_event = async {
428                    if let Some((_, rx)) = dbus_bus_watch.as_mut() {
429                        rx.recv().await
430                    } else {
431                        None
432                    }
433                }, if dbus_bus_watch.is_some() => {
434                    match dbus_event {
435                        Some(()) => {
436                            info!("Detected D-Bus session bus socket activity; retrying tray startup now");
437                            tray_next_attempt = time::Instant::now();
438
439                            if tray_handle.is_none() && tray_spawn_task.is_none() {
440                                tray_spawn_task = Some(Self::spawn_tray_task(
441                                    tray_state.clone(),
442                                    cmd_tx.clone(),
443                                    tray_icon.clone(),
444                                ));
445                            }
446                        }
447                        None => {
448                            warn!("D-Bus session bus watcher channel closed; falling back to periodic retry");
449                            dbus_bus_watch = None;
450                        }
451                    }
452                }
453                sync_state_change = sync_state_change_rx.changed() => {
454                    match sync_state_change {
455                        Ok(()) => {
456                            self.reconcile_tray_state_from_global(&mut tray_state, &mut tray_handle)
457                                .await;
458                            self.refresh_tray_relative_time_display(
459                                &mut tray_state,
460                                &mut tray_handle,
461                                &mut last_sync_text_snapshot,
462                            )
463                            .await;
464                        }
465                        Err(e) => {
466                            warn!(error = %e, "Tray sync-state update channel closed");
467                        }
468                    }
469                }
470            }
471        }
472    }
473
474    async fn tick_optional_interval(interval: &mut Option<time::Interval>) {
475        match interval {
476            Some(i) => {
477                i.tick().await;
478            }
479            None => pending::<()>().await,
480        }
481    }
482
483    #[cfg(feature = "tray")]
484    fn spawn_tray_task(
485        tray_state: TrayState,
486        cmd_tx: tokio::sync::mpsc::UnboundedSender<TrayCommand>,
487        tray_icon: Option<String>,
488    ) -> tokio::task::JoinHandle<std::result::Result<ksni::Handle<GitSyncTray>, ksni::Error>> {
489        tokio::spawn(async move {
490            let tray = GitSyncTray::new(tray_state, cmd_tx, tray_icon);
491            // Keep the service alive even if the watcher/host isn't ready yet.
492            // We handle reconnection by listening for watcher state changes and, on
493            // certain errors, respawning the tray service from the outer loop.
494            tray.assume_sni_available(true).spawn().await
495        })
496    }
497
498    #[cfg(feature = "tray")]
499    fn setup_dbus_session_bus_watch(
500    ) -> Option<(RecommendedWatcher, tokio::sync::mpsc::UnboundedReceiver<()>)> {
501        let Some(socket_path) = Self::dbus_session_bus_socket_path() else {
502            debug!("DBUS_SESSION_BUS_ADDRESS not watchable (no unix:path=...); using periodic tray retry");
503            return None;
504        };
505        Self::setup_dbus_socket_watch(socket_path)
506    }
507
508    #[cfg(feature = "tray")]
509    fn setup_dbus_socket_watch(
510        socket_path: PathBuf,
511    ) -> Option<(RecommendedWatcher, tokio::sync::mpsc::UnboundedReceiver<()>)> {
512        let Some(parent_dir) = socket_path.parent() else {
513            warn!(
514                path = %socket_path.display(),
515                "Unable to watch D-Bus session bus socket parent directory; using periodic tray retry"
516            );
517            return None;
518        };
519
520        let watched_name = socket_path.file_name().map(|n| n.to_os_string());
521        let socket_path_for_cb = socket_path.clone();
522        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
523
524        let mut watcher = match RecommendedWatcher::new(
525            move |res: std::result::Result<Event, notify::Error>| match res {
526                Ok(event) => {
527                    let matches_path = event.paths.iter().any(|path| {
528                        if *path == socket_path_for_cb {
529                            return true;
530                        }
531                        match (&watched_name, path.file_name()) {
532                            (Some(name), Some(file_name)) => file_name == name,
533                            _ => false,
534                        }
535                    });
536                    if matches_path {
537                        let _ = tx.send(());
538                    }
539                }
540                Err(e) => {
541                    warn!(error = %e, "D-Bus session bus watcher error");
542                }
543            },
544            Config::default(),
545        ) {
546            Ok(w) => w,
547            Err(e) => {
548                warn!(
549                    path = %socket_path.display(),
550                    error = %e,
551                    "Failed to create D-Bus session bus watcher; using periodic tray retry"
552                );
553                return None;
554            }
555        };
556
557        if let Err(e) = watcher.watch(parent_dir, RecursiveMode::NonRecursive) {
558            warn!(
559                path = %parent_dir.display(),
560                error = %e,
561                "Failed to watch D-Bus session bus directory; using periodic tray retry"
562            );
563            return None;
564        }
565
566        info!(
567            path = %socket_path.display(),
568            "Watching D-Bus session bus socket for tray reconnection triggers"
569        );
570        Some((watcher, rx))
571    }
572
573    #[cfg(feature = "tray")]
574    fn dbus_session_bus_socket_path() -> Option<PathBuf> {
575        let address = std::env::var("DBUS_SESSION_BUS_ADDRESS").ok()?;
576        Self::parse_dbus_unix_path(&address)
577    }
578
579    #[cfg(feature = "tray")]
580    fn parse_dbus_unix_path(address: &str) -> Option<PathBuf> {
581        // Address list format: "transport:key=value,...;transport:key=value,..."
582        for segment in address.split(';') {
583            if !segment.starts_with("unix:") {
584                continue;
585            }
586
587            let params = &segment["unix:".len()..];
588            for param in params.split(',') {
589                let Some((key, value)) = param.split_once('=') else {
590                    continue;
591                };
592                if key == "path" && !value.is_empty() {
593                    return Some(PathBuf::from(value));
594                }
595            }
596        }
597
598        None
599    }
600
601    #[cfg(feature = "tray")]
602    async fn tray_apply_state(
603        &self,
604        tray_handle: &mut Option<ksni::Handle<GitSyncTray>>,
605        tray_state: &TrayState,
606    ) {
607        let Some(handle) = tray_handle.as_ref() else {
608            return;
609        };
610
611        let state = tray_state.clone();
612        let update_result = handle
613            .update(move |t: &mut GitSyncTray| {
614                t.state = state;
615                t.bump_icon_generation();
616            })
617            .await;
618
619        if update_result.is_none() {
620            warn!("Tray: handle.update returned None - tray service may be dead; will attempt to respawn");
621            *tray_handle = None;
622        }
623    }
624
625    #[cfg(feature = "tray")]
626    async fn reconcile_tray_state_from_global(
627        &self,
628        tray_state: &mut TrayState,
629        tray_handle: &mut Option<ksni::Handle<GitSyncTray>>,
630    ) {
631        let mut changed = false;
632        let paused = self.is_sync_suspended();
633
634        if tray_state.paused != paused {
635            tray_state.paused = paused;
636            changed = true;
637        }
638
639        if !tray_state.paused {
640            let desired_status = self.desired_tray_status().await;
641            if tray_state.status != desired_status {
642                tray_state.status = desired_status.clone();
643                changed = true;
644            }
645
646            let desired_last_error = match desired_status {
647                TrayStatus::Error(msg) => Some(msg),
648                _ => None,
649            };
650            if tray_state.last_error != desired_last_error {
651                tray_state.last_error = desired_last_error;
652                changed = true;
653            }
654        }
655
656        let latest_sync = self.latest_successful_sync_datetime();
657        if tray_state.last_sync != latest_sync {
658            tray_state.last_sync = latest_sync;
659            changed = true;
660        }
661
662        if changed {
663            self.tray_apply_state(tray_handle, tray_state).await;
664        }
665    }
666
667    #[cfg(feature = "tray")]
668    async fn refresh_tray_relative_time_display(
669        &self,
670        tray_state: &mut TrayState,
671        tray_handle: &mut Option<ksni::Handle<GitSyncTray>>,
672        last_sync_text_snapshot: &mut String,
673    ) {
674        let current = tray_state.last_sync_text();
675        if &current == last_sync_text_snapshot {
676            return;
677        }
678
679        *last_sync_text_snapshot = current;
680        // State payload is unchanged, but tooltip/menu text derived from `Local::now()`
681        // advanced. Emitting an update keeps the displayed relative age fresh.
682        self.tray_apply_state(tray_handle, tray_state).await;
683    }
684
685    #[cfg(feature = "tray")]
686    async fn desired_tray_status(&self) -> TrayStatus {
687        if self.is_syncing.load(Ordering::Acquire) {
688            return TrayStatus::Syncing;
689        }
690
691        let last_error = self.last_sync_error.read().await.clone();
692        match last_error {
693            Some(msg) => TrayStatus::Error(msg),
694            None => TrayStatus::Idle,
695        }
696    }
697
698    #[cfg(feature = "tray")]
699    fn latest_successful_sync_datetime(&self) -> Option<chrono::DateTime<chrono::Local>> {
700        let unix_secs = self.last_successful_sync_unix_secs.load(Ordering::Acquire);
701        if unix_secs <= 0 {
702            return None;
703        }
704        use chrono::TimeZone;
705        chrono::Local.timestamp_opt(unix_secs, 0).single()
706    }
707
708    #[cfg(feature = "tray")]
709    fn notify_sync_state_changed(&self) {
710        let seq = self.sync_state_change_seq.fetch_add(1, Ordering::AcqRel) + 1;
711        let _ = self.sync_state_change_tx.send(seq);
712    }
713
714    /// Handle timeout with optional tray state updates
715    #[cfg(feature = "tray")]
716    async fn handle_timeout_with_optional_tray(
717        &self,
718        sync_state: &mut SyncScheduler,
719        tray_state: &mut TrayState,
720        tray_handle: &mut Option<ksni::Handle<GitSyncTray>>,
721    ) {
722        if self.is_sync_suspended() {
723            return;
724        }
725
726        if !sync_state.should_sync_now() {
727            return;
728        }
729
730        if self.is_syncing.load(Ordering::Acquire) {
731            debug!("Sync already in progress, skipping");
732            return;
733        }
734
735        self.do_sync_with_optional_tray_update(sync_state, tray_state, tray_handle)
736            .await;
737    }
738
739    /// Perform sync and update tray state accordingly (if available)
740    #[cfg(feature = "tray")]
741    async fn do_sync_with_optional_tray_update(
742        &self,
743        sync_state: &mut SyncScheduler,
744        tray_state: &mut TrayState,
745        tray_handle: &mut Option<ksni::Handle<GitSyncTray>>,
746    ) {
747        if self.is_sync_suspended() {
748            debug!("Sync is suspended, skipping tray-triggered sync");
749            return;
750        }
751
752        info!("Tray: setting status to Syncing");
753        tray_state.status = TrayStatus::Syncing;
754        self.tray_apply_state(tray_handle, tray_state).await;
755
756        let span = tracing::info_span!(
757            "perform_sync_attempt",
758            repo = %self.repo_path,
759            branch = %self.sync_config.branch_name,
760            remote = %self.sync_config.remote_name,
761            dry_run = self.watch_config.dry_run
762        );
763        let _guard = span.enter();
764
765        match self.perform_sync().await {
766            Ok(()) => {
767                info!("Tray: perform_sync succeeded, setting status to Idle");
768                sync_state.on_sync_success();
769                tray_state.status = TrayStatus::Idle;
770                tray_state.last_error = None;
771                self.reconcile_tray_state_from_global(tray_state, tray_handle)
772                    .await;
773            }
774            Err(e) => {
775                sync_state.on_sync_failure(&e);
776                let err_msg = e.to_string();
777                self.log_sync_error(&e);
778                info!("Tray: perform_sync failed, setting status to Error");
779                tray_state.status = TrayStatus::Error(err_msg.clone());
780                tray_state.last_error = Some(err_msg);
781                self.tray_apply_state(tray_handle, tray_state).await;
782            }
783        }
784    }
785
786    /// Handle a file system event
787    fn handle_file_event(&self, event: Event, sync_state: &mut SyncScheduler) {
788        debug!("Received event from channel: {:?}", event);
789        debug!("Event kind: {:?}, paths: {:?}", event.kind, event.paths);
790
791        if EventFilter::is_relevant_change(&event) {
792            info!("Relevant change detected, marking pending sync");
793            sync_state.mark_file_event();
794        } else {
795            debug!("Event not considered relevant: {:?}", event.kind);
796        }
797    }
798
799    /// Handle timeout expiration
800    async fn handle_timeout(&self, sync_state: &mut SyncScheduler) {
801        if self.is_sync_suspended() {
802            return;
803        }
804
805        if !sync_state.should_sync_now() {
806            return;
807        }
808
809        // Check if already syncing
810        if self.is_syncing.load(Ordering::Acquire) {
811            debug!("Sync already in progress, skipping");
812            return;
813        }
814
815        info!("Changes detected, triggering sync");
816        let span = tracing::info_span!(
817            "perform_sync_attempt",
818            repo = %self.repo_path,
819            branch = %self.sync_config.branch_name,
820            remote = %self.sync_config.remote_name,
821            dry_run = self.watch_config.dry_run
822        );
823        let _guard = span.enter();
824        match self.perform_sync().await {
825            Ok(()) => {
826                debug!("perform_sync succeeded");
827                sync_state.on_sync_success();
828            }
829            Err(e) => {
830                sync_state.on_sync_failure(&e);
831                self.log_sync_error(&e);
832            }
833        }
834    }
835
836    fn log_sync_error(&self, e: &SyncError) {
837        match e {
838            SyncError::DetachedHead => {
839                error!("Sync failed: detached HEAD. Repository must be on a branch; will retry.")
840            }
841            SyncError::UnsafeRepositoryState { state } => error!(
842                state = %state,
843                "Sync failed: repository in unsafe state; will retry"
844            ),
845            SyncError::ManualInterventionRequired { reason } => warn!(
846                reason = %reason,
847                "Sync requires manual intervention; pending will remain set"
848            ),
849            SyncError::NoRemoteConfigured { branch } => error!(
850                branch = %branch,
851                "Sync failed: no remote configured for branch"
852            ),
853            SyncError::NetworkError(msg) => error!(
854                error = %msg,
855                "Network error during sync; will retry"
856            ),
857            SyncError::TaskError(msg) => error!(
858                error = %msg,
859                "Background task error during sync; will retry"
860            ),
861            SyncError::GitError(err) => error!(
862                code = ?err.code(),
863                klass = ?err.class(),
864                message = %err.message(),
865                "Git error during sync; will retry"
866            ),
867            other => error!(error = %other, "Sync failed; will retry"),
868        }
869    }
870
871    /// Perform a synchronization
872    async fn perform_sync(&self) -> Result<()> {
873        if self.is_sync_suspended() {
874            debug!("Sync is suspended, skipping sync attempt");
875            return Ok(());
876        }
877
878        // Set syncing flag (lock-free)
879        if self.is_syncing.swap(true, Ordering::AcqRel) {
880            debug!("Sync already in progress");
881            return Ok(());
882        }
883        #[cfg(feature = "tray")]
884        self.notify_sync_state_changed();
885
886        // Run the sync and ensure we clear the syncing flag regardless of outcome
887        let result: Result<()> = if self.watch_config.dry_run {
888            info!("DRY RUN: Would perform sync now");
889            Ok(())
890        } else {
891            // Perform sync in blocking task
892            let repo_path = self.repo_path.clone();
893            let sync_config = self.sync_config.clone();
894
895            debug!("Spawning blocking sync task");
896            match tokio::task::spawn_blocking(move || {
897                // Create synchronizer
898                let mut synchronizer =
899                    RepositorySynchronizer::new_with_detected_branch(&repo_path, sync_config)?;
900
901                // Perform sync
902                synchronizer.sync(false)
903            })
904            .await
905            {
906                Ok(inner) => inner,
907                Err(e) => {
908                    error!("Join error waiting for sync task: {}", e);
909                    Err(e.into())
910                }
911            }
912        };
913
914        // Clear syncing flag (finally-like)
915        self.is_syncing.store(false, Ordering::Release);
916
917        if result.is_ok() {
918            self.last_successful_sync_unix_secs
919                .store(chrono::Utc::now().timestamp(), Ordering::Release);
920        }
921        #[cfg(feature = "tray")]
922        {
923            let mut last_error = self.last_sync_error.write().await;
924            *last_error = result.as_ref().err().map(ToString::to_string);
925        }
926        #[cfg(feature = "tray")]
927        self.notify_sync_state_changed();
928
929        if let Err(ref err) = result {
930            error!(error = %err, "perform_sync finished with error");
931        } else {
932            debug!("perform_sync finished successfully");
933        }
934        result
935    }
936
937    fn is_sync_suspended(&self) -> bool {
938        self.sync_suspended.load(Ordering::Acquire)
939    }
940
941    #[cfg(feature = "tray")]
942    fn set_sync_suspended(&self, suspended: bool) {
943        self.sync_suspended.store(suspended, Ordering::Release);
944        self.notify_sync_state_changed();
945    }
946}
947
948/// Deadline/backoff based scheduler for watch-triggered sync attempts.
949///
950/// Behavior:
951/// - Coalesce events via quiet debounce window.
952/// - Prevent starvation with max batch latency under continuous event streams.
953/// - Apply retry backoff by error class on failures.
954struct SyncScheduler {
955    last_sync: time::Instant,
956    pending_sync: bool,
957    immediate_requested: bool,
958    min_interval: Duration,
959    debounce: Duration,
960    max_batch_latency: Duration,
961    first_event: Option<time::Instant>,
962    last_event: Option<time::Instant>,
963    next_retry_at: Option<time::Instant>,
964    retry_backoff: Duration,
965}
966
967impl SyncScheduler {
968    const RETRY_BACKOFF_INITIAL: Duration = Duration::from_secs(1);
969    const RETRY_BACKOFF_MAX: Duration = Duration::from_secs(60);
970    const RETRY_DELAY_MANUAL: Duration = Duration::from_secs(30);
971    const RETRY_DELAY_CONFIG: Duration = Duration::from_secs(60);
972    const RETRY_DELAY_STATE: Duration = Duration::from_secs(5);
973
974    fn new(debounce_ms: u64, min_interval_ms: u64) -> Self {
975        let debounce = Duration::from_millis(debounce_ms);
976        let min_interval = Duration::from_millis(min_interval_ms);
977        let max_batch_latency = debounce
978            .saturating_mul(8)
979            .max(min_interval)
980            .max(Duration::from_millis(500));
981
982        Self {
983            last_sync: time::Instant::now(),
984            pending_sync: false,
985            immediate_requested: false,
986            min_interval,
987            debounce,
988            max_batch_latency,
989            first_event: None,
990            last_event: None,
991            next_retry_at: None,
992            retry_backoff: Self::RETRY_BACKOFF_INITIAL,
993        }
994    }
995
996    fn mark_file_event(&mut self) {
997        self.mark_file_event_at(time::Instant::now());
998    }
999
1000    fn mark_file_event_at(&mut self, now: time::Instant) {
1001        self.pending_sync = true;
1002        self.immediate_requested = false;
1003        self.first_event.get_or_insert(now);
1004        self.last_event = Some(now);
1005    }
1006
1007    fn request_sync_now(&mut self) {
1008        self.request_sync_now_at(time::Instant::now());
1009    }
1010
1011    fn request_sync_now_at(&mut self, now: time::Instant) {
1012        self.pending_sync = true;
1013        self.immediate_requested = true;
1014        self.first_event.get_or_insert(now);
1015        self.last_event.get_or_insert(now);
1016    }
1017
1018    fn should_sync_now(&self) -> bool {
1019        self.should_sync_at(time::Instant::now())
1020    }
1021
1022    fn should_sync_at(&self, now: time::Instant) -> bool {
1023        if !self.pending_sync {
1024            return false;
1025        }
1026
1027        if let Some(next_retry_at) = self.next_retry_at {
1028            if now < next_retry_at {
1029                return false;
1030            }
1031        }
1032
1033        if now.duration_since(self.last_sync) < self.min_interval {
1034            return false;
1035        }
1036
1037        if self.immediate_requested {
1038            return true;
1039        }
1040
1041        let quiet_ready = self
1042            .last_event
1043            .map(|last| now.duration_since(last) >= self.debounce)
1044            .unwrap_or(false);
1045        if quiet_ready {
1046            return true;
1047        }
1048
1049        self.first_event
1050            .map(|first| now.duration_since(first) >= self.max_batch_latency)
1051            .unwrap_or(false)
1052    }
1053
1054    fn on_sync_success(&mut self) {
1055        self.on_sync_success_at(time::Instant::now());
1056    }
1057
1058    fn on_sync_success_at(&mut self, now: time::Instant) {
1059        self.last_sync = now;
1060        self.pending_sync = false;
1061        self.immediate_requested = false;
1062        self.first_event = None;
1063        self.last_event = None;
1064        self.next_retry_at = None;
1065        self.retry_backoff = Self::RETRY_BACKOFF_INITIAL;
1066    }
1067
1068    fn on_sync_failure(&mut self, error: &SyncError) {
1069        self.on_sync_failure_at(error, time::Instant::now());
1070    }
1071
1072    fn on_sync_failure_at(&mut self, error: &SyncError, now: time::Instant) {
1073        self.last_sync = now;
1074        self.pending_sync = true;
1075        self.immediate_requested = false;
1076
1077        let delay = self.retry_delay_for(error);
1078        self.next_retry_at = Some(now + delay);
1079        debug!(
1080            delay_s = delay.as_secs_f64(),
1081            error = %error,
1082            "Sync failure scheduled with retry backoff"
1083        );
1084    }
1085
1086    fn retry_delay_for(&mut self, error: &SyncError) -> Duration {
1087        match error {
1088            SyncError::ManualInterventionRequired { .. } | SyncError::HookRejected { .. } => {
1089                Self::RETRY_DELAY_MANUAL
1090            }
1091            SyncError::NoRemoteConfigured { .. }
1092            | SyncError::RemoteBranchNotFound { .. }
1093            | SyncError::NotARepository { .. } => Self::RETRY_DELAY_CONFIG,
1094            SyncError::DetachedHead | SyncError::UnsafeRepositoryState { .. } => {
1095                Self::RETRY_DELAY_STATE
1096            }
1097            _ => {
1098                let delay = self.retry_backoff;
1099                self.retry_backoff = self
1100                    .retry_backoff
1101                    .saturating_mul(2)
1102                    .min(Self::RETRY_BACKOFF_MAX);
1103                delay
1104            }
1105        }
1106    }
1107}
1108
1109/// Run watch mode with periodic sync.
1110pub async fn watch_with_periodic_sync(
1111    repo_path: impl AsRef<Path>,
1112    sync_config: SyncConfig,
1113    mut watch_config: WatchConfig,
1114    sync_interval_ms: Option<u64>,
1115) -> Result<()> {
1116    watch_config.periodic_sync_interval_ms = sync_interval_ms;
1117    let manager = WatchManager::new(repo_path, sync_config, watch_config);
1118    manager.watch().await
1119}
1120
1121#[cfg(test)]
1122mod scheduler_tests {
1123    use super::SyncScheduler;
1124    use crate::error::SyncError;
1125    use tokio::time::{Duration, Instant};
1126
1127    #[test]
1128    fn scheduler_waits_for_quiet_period_before_syncing() {
1129        let mut scheduler = SyncScheduler::new(200, 100);
1130        let base = Instant::now();
1131        scheduler.last_sync = base;
1132        scheduler.mark_file_event_at(base);
1133
1134        assert!(!scheduler.should_sync_at(base));
1135        assert!(!scheduler.should_sync_at(base + Duration::from_millis(120)));
1136        assert!(scheduler.should_sync_at(base + Duration::from_millis(220)));
1137    }
1138
1139    #[test]
1140    fn scheduler_uses_max_batch_latency_to_prevent_starvation() {
1141        let mut scheduler = SyncScheduler::new(500, 100);
1142        let base = Instant::now();
1143        scheduler.last_sync = base;
1144        scheduler.mark_file_event_at(base);
1145
1146        // Keep sending events faster than debounce; sync should still eventually fire.
1147        for i in 1..40 {
1148            let t = base + Duration::from_millis(100 * i);
1149            scheduler.mark_file_event_at(t);
1150            assert!(
1151                !scheduler.should_sync_at(t),
1152                "Scheduler should still wait before max-batch threshold"
1153            );
1154        }
1155
1156        let ready_at = base + Duration::from_millis(4000);
1157        scheduler.mark_file_event_at(ready_at);
1158        assert!(
1159            scheduler.should_sync_at(ready_at),
1160            "Scheduler should fire at max-batch latency under continuous events"
1161        );
1162    }
1163
1164    #[test]
1165    fn scheduler_applies_retry_backoff_and_resets_on_success() {
1166        let mut scheduler = SyncScheduler::new(0, 0);
1167        let base = Instant::now();
1168        scheduler.last_sync = base;
1169        scheduler.mark_file_event_at(base);
1170        assert!(scheduler.should_sync_at(base));
1171
1172        scheduler.on_sync_failure_at(&SyncError::NetworkError("transient".to_string()), base);
1173        assert!(!scheduler.should_sync_at(base + Duration::from_millis(999)));
1174        assert!(scheduler.should_sync_at(base + Duration::from_millis(1000)));
1175
1176        let second = base + Duration::from_millis(1000);
1177        scheduler.on_sync_failure_at(&SyncError::NetworkError("transient".to_string()), second);
1178        assert!(!scheduler.should_sync_at(second + Duration::from_secs(1)));
1179        assert!(scheduler.should_sync_at(second + Duration::from_secs(2)));
1180
1181        scheduler.on_sync_success_at(second + Duration::from_secs(2));
1182        let next = second + Duration::from_secs(2);
1183        scheduler.mark_file_event_at(next);
1184        assert!(scheduler.should_sync_at(next));
1185    }
1186
1187    #[test]
1188    fn scheduler_uses_longer_retry_for_manual_intervention_errors() {
1189        let mut scheduler = SyncScheduler::new(0, 0);
1190        let base = Instant::now();
1191        scheduler.last_sync = base;
1192        scheduler.mark_file_event_at(base);
1193        assert!(scheduler.should_sync_at(base));
1194
1195        scheduler.on_sync_failure_at(
1196            &SyncError::ManualInterventionRequired {
1197                reason: "conflict".to_string(),
1198            },
1199            base,
1200        );
1201        assert!(!scheduler.should_sync_at(base + Duration::from_secs(29)));
1202        assert!(scheduler.should_sync_at(base + Duration::from_secs(30)));
1203    }
1204
1205    #[test]
1206    fn request_sync_now_bypasses_debounce_but_respects_min_interval() {
1207        let mut scheduler = SyncScheduler::new(10_000, 500);
1208        let base = Instant::now();
1209        scheduler.last_sync = base;
1210
1211        scheduler.request_sync_now_at(base + Duration::from_millis(100));
1212        assert!(!scheduler.should_sync_at(base + Duration::from_millis(499)));
1213        assert!(scheduler.should_sync_at(base + Duration::from_millis(500)));
1214    }
1215
1216    #[test]
1217    fn request_sync_now_does_not_bypass_retry_backoff() {
1218        let mut scheduler = SyncScheduler::new(0, 0);
1219        let base = Instant::now();
1220        scheduler.last_sync = base;
1221        scheduler.mark_file_event_at(base);
1222        assert!(scheduler.should_sync_at(base));
1223
1224        scheduler.on_sync_failure_at(&SyncError::NetworkError("transient".to_string()), base);
1225        scheduler.request_sync_now_at(base + Duration::from_millis(100));
1226        assert!(!scheduler.should_sync_at(base + Duration::from_millis(999)));
1227        assert!(scheduler.should_sync_at(base + Duration::from_millis(1000)));
1228    }
1229}
1230
1231#[cfg(all(test, feature = "tray"))]
1232mod tests {
1233    use super::{WatchConfig, WatchManager};
1234    use crate::sync::SyncConfig;
1235    use std::fs::File;
1236    use std::sync::atomic::Ordering;
1237    use tempfile::tempdir;
1238    use tokio::time::{timeout, Duration};
1239
1240    #[test]
1241    fn parse_dbus_unix_path_finds_path_with_extra_parameters() {
1242        let address =
1243            "unix:abstract=/tmp/dbus-XXXX,guid=abcdef;unix:path=/tmp/dbus-test-socket,guid=1234";
1244        let parsed = WatchManager::parse_dbus_unix_path(address);
1245        assert_eq!(
1246            parsed.as_deref(),
1247            Some(std::path::Path::new("/tmp/dbus-test-socket"))
1248        );
1249    }
1250
1251    #[test]
1252    fn parse_dbus_unix_path_ignores_malformed_parts() {
1253        let address = "unix:guid=abc,broken,other=123,another;unix:path=/tmp/dbus-test";
1254        let parsed = WatchManager::parse_dbus_unix_path(address);
1255        assert_eq!(
1256            parsed.as_deref(),
1257            Some(std::path::Path::new("/tmp/dbus-test"))
1258        );
1259    }
1260
1261    #[tokio::test]
1262    async fn setup_dbus_socket_watch_emits_on_socket_file_activity() {
1263        let dir = tempdir().expect("create tempdir");
1264        let socket_path = dir.path().join("bus");
1265
1266        let (_watcher, mut rx) = WatchManager::setup_dbus_socket_watch(socket_path.clone())
1267            .expect("watcher should initialize for valid path");
1268
1269        // Creating the bus socket path (or regular file in tests) should trigger a retry signal.
1270        File::create(&socket_path).expect("create watched file");
1271
1272        let received = timeout(Duration::from_secs(2), rx.recv())
1273            .await
1274            .expect("timed out waiting for watcher event");
1275        assert_eq!(received, Some(()));
1276    }
1277
1278    #[tokio::test]
1279    async fn perform_sync_is_noop_when_suspended() {
1280        let manager = WatchManager::new(
1281            "/tmp/not-a-repo",
1282            SyncConfig::default(),
1283            WatchConfig::default(),
1284        );
1285        manager.set_sync_suspended(true);
1286
1287        manager
1288            .perform_sync()
1289            .await
1290            .expect("suspended sync should be a no-op");
1291
1292        assert_eq!(
1293            manager
1294                .last_successful_sync_unix_secs
1295                .load(Ordering::Acquire),
1296            0
1297        );
1298        assert!(!manager.is_syncing.load(Ordering::Acquire));
1299    }
1300}