Skip to main content

atuin_daemon/components/
sync.rs

1//! Sync component.
2//!
3//! Handles periodic synchronization with the Atuin cloud server.
4
5use std::time::Duration;
6
7use eyre::Result;
8use rand::Rng;
9use tokio::sync::mpsc;
10use tokio::time::{self, MissedTickBehavior};
11
12use atuin_client::{history::store::HistoryStore, record::sync, settings::Settings};
13use atuin_dotfiles::store::{AliasStore, var::VarStore};
14
15use crate::{
16    daemon::{Component, DaemonHandle},
17    events::DaemonEvent,
18};
19
20/// Commands that can be sent to the sync task.
21enum SyncCommand {
22    /// Trigger an immediate sync.
23    ForceSync,
24    /// Stop the sync loop.
25    Stop,
26}
27
28/// Sync state - tracks whether we're in normal operation or retrying after failure.
29#[derive(Clone, Copy, PartialEq, Eq)]
30enum SyncState {
31    /// Normal operation. Periodic syncs only run if auto_sync is enabled.
32    Idle,
33    /// Retrying after a sync failure. Retries continue regardless of auto_sync
34    /// until the sync succeeds.
35    Retrying,
36}
37
38/// Sync component - handles periodic cloud synchronization.
39///
40/// This component:
41/// - Runs a background sync loop on a configurable interval
42/// - Implements exponential backoff on sync failures
43/// - Responds to ForceSync events for immediate sync
44/// - Emits SyncCompleted/SyncFailed events
45pub struct SyncComponent {
46    task_handle: Option<tokio::task::JoinHandle<()>>,
47    command_tx: Option<mpsc::Sender<SyncCommand>>,
48}
49
50impl SyncComponent {
51    /// Create a new sync component.
52    pub fn new() -> Self {
53        Self {
54            task_handle: None,
55            command_tx: None,
56        }
57    }
58}
59
60impl Default for SyncComponent {
61    fn default() -> Self {
62        Self::new()
63    }
64}
65
66#[tonic::async_trait]
67impl Component for SyncComponent {
68    fn name(&self) -> &'static str {
69        "sync"
70    }
71
72    async fn start(&mut self, handle: DaemonHandle) -> Result<()> {
73        let (cmd_tx, cmd_rx) = mpsc::channel(16);
74        self.command_tx = Some(cmd_tx);
75
76        // Spawn the sync loop with its own copy of the handle
77        self.task_handle = Some(tokio::spawn(sync_loop(handle, cmd_rx)));
78
79        tracing::info!("sync component started");
80        Ok(())
81    }
82
83    async fn handle_event(&mut self, event: &DaemonEvent) -> Result<()> {
84        if let DaemonEvent::ForceSync = event {
85            tracing::info!("force sync requested");
86            if let Some(tx) = &self.command_tx {
87                let _ = tx.send(SyncCommand::ForceSync).await;
88            }
89        }
90        Ok(())
91    }
92
93    async fn stop(&mut self) -> Result<()> {
94        if let Some(tx) = &self.command_tx {
95            let _ = tx.send(SyncCommand::Stop).await;
96        }
97        if let Some(handle) = self.task_handle.take() {
98            // Give the task a moment to shut down gracefully
99            let _ = tokio::time::timeout(std::time::Duration::from_secs(5), handle).await;
100        }
101        tracing::info!("sync component stopped");
102        Ok(())
103    }
104}
105
106/// The main sync loop.
107///
108/// This runs in a spawned task and handles periodic sync as well as
109/// force sync requests.
110async fn sync_loop(handle: DaemonHandle, mut cmd_rx: mpsc::Receiver<SyncCommand>) {
111    tracing::info!("sync loop starting");
112
113    // Clone settings since we need them across await points
114    let settings = handle.settings().await.clone();
115    let host_id = match Settings::host_id().await {
116        Ok(id) => id,
117        Err(e) => {
118            tracing::error!("failed to get host id, sync disabled: {e}");
119            return;
120        }
121    };
122
123    // Create the stores we need
124    let encryption_key = *handle.encryption_key();
125    let history_store = HistoryStore::new(handle.store().clone(), host_id, encryption_key);
126    let alias_store = AliasStore::new(handle.store().clone(), host_id, encryption_key);
127    let var_store = VarStore::new(handle.store().clone(), host_id, encryption_key);
128
129    // Don't backoff by more than 30 mins (with a random jitter of up to 1 min)
130    let max_interval: f64 = 60.0 * 30.0 + rand::thread_rng().gen_range(0.0..60.0);
131
132    let mut ticker = time::interval(time::Duration::from_secs(settings.daemon.sync_frequency));
133
134    // IMPORTANT: without this, if we miss ticks because a sync takes ages or is otherwise delayed,
135    // we may end up running a lot of syncs in a hot loop.
136    ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
137
138    let mut sync_state = SyncState::Idle;
139
140    loop {
141        tokio::select! {
142            _ = ticker.tick() => {
143                let settings = handle.settings().await;
144
145                // Skip periodic ticks if auto_sync is disabled AND we're not retrying
146                // a previous failure. Retries must continue regardless of auto_sync.
147                if !settings.auto_sync && sync_state == SyncState::Idle {
148                    tracing::debug!("auto_sync disabled, skipping periodic sync tick");
149                    continue;
150                }
151
152                sync_state = do_sync_tick(
153                    &handle,
154                    &history_store,
155                    &alias_store,
156                    &var_store,
157                    &mut ticker,
158                    max_interval,
159                    &settings,
160                ).await;
161            }
162            cmd = cmd_rx.recv() => {
163                match cmd {
164                    Some(SyncCommand::ForceSync) => {
165                        tracing::info!("executing force sync");
166                        let settings = handle.settings().await;
167                        sync_state = do_sync_tick(
168                            &handle,
169                            &history_store,
170                            &alias_store,
171                            &var_store,
172                            &mut ticker,
173                            max_interval,
174                            &settings,
175                        ).await;
176                    }
177                    Some(SyncCommand::Stop) | None => {
178                        tracing::info!("sync loop stopping");
179                        break;
180                    }
181                }
182            }
183        }
184    }
185}
186
187/// Execute a single sync tick.
188///
189/// Returns the new sync state: `Idle` on success, `Retrying` on failure.
190async fn do_sync_tick(
191    handle: &DaemonHandle,
192    history_store: &HistoryStore,
193    alias_store: &AliasStore,
194    var_store: &VarStore,
195    ticker: &mut time::Interval,
196    max_interval: f64,
197    settings: &Settings,
198) -> SyncState {
199    tracing::info!("sync tick");
200
201    // Check if logged in
202    let logged_in = match settings.logged_in().await {
203        Ok(v) => v,
204        Err(e) => {
205            tracing::warn!("failed to check login status, skipping sync tick: {e}");
206            return SyncState::Idle;
207        }
208    };
209
210    if !logged_in {
211        tracing::debug!("not logged in, skipping sync tick");
212        return SyncState::Idle;
213    }
214
215    // Perform the sync
216    let res = sync::sync(settings, handle.store(), handle.encryption_key()).await;
217
218    match res {
219        Err(e) => {
220            tracing::error!("sync tick failed with {e}");
221
222            // Emit failure event
223            handle.emit(DaemonEvent::SyncFailed {
224                error: e.to_string(),
225            });
226
227            // Exponential backoff
228            let mut rng = rand::thread_rng();
229            let mut new_interval = ticker.period().as_secs_f64() * rng.gen_range(2.0..2.2);
230
231            if new_interval > max_interval {
232                new_interval = max_interval;
233            }
234
235            *ticker = time::interval_at(
236                tokio::time::Instant::now() + Duration::from_secs(new_interval as u64),
237                time::Duration::from_secs(new_interval as u64),
238            );
239            ticker.reset_after(time::Duration::from_secs(new_interval as u64));
240            ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
241
242            tracing::error!("backing off, next sync tick in {new_interval}");
243
244            SyncState::Retrying
245        }
246        Ok((uploaded_count, downloaded_records)) => {
247            tracing::info!(
248                uploaded = uploaded_count,
249                downloaded = downloaded_records.len(),
250                "sync complete"
251            );
252
253            // Build history from downloaded records
254            if let Err(e) = history_store
255                .incremental_build(handle.history_db(), &downloaded_records)
256                .await
257            {
258                tracing::error!("failed to build history from downloaded records: {e}");
259            }
260
261            // Emit the records added event (for search indexing)
262            handle.emit(DaemonEvent::RecordsAdded(downloaded_records.clone()));
263
264            // Emit sync completed event
265            handle.emit(DaemonEvent::SyncCompleted {
266                uploaded: uploaded_count as usize,
267                downloaded: downloaded_records.len(),
268            });
269
270            // Rebuild alias and var stores
271            if let Err(e) = alias_store.build().await {
272                tracing::error!("failed to rebuild alias store: {e}");
273            }
274            if let Err(e) = var_store.build().await {
275                tracing::error!("failed to rebuild var store: {e}");
276            }
277
278            // Reset backoff on success
279            if ticker.period().as_secs() != settings.daemon.sync_frequency {
280                *ticker = time::interval_at(
281                    tokio::time::Instant::now()
282                        + Duration::from_secs(settings.daemon.sync_frequency),
283                    time::Duration::from_secs(settings.daemon.sync_frequency),
284                );
285                ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
286            }
287
288            // Store sync time
289            if let Err(e) = Settings::save_sync_time().await {
290                tracing::error!("failed to save sync time: {e}");
291            }
292
293            SyncState::Idle
294        }
295    }
296}