atuin_daemon/components/
sync.rs1use std::time::Duration;
6
7use eyre::Result;
8use rand::Rng;
9use tokio::sync::mpsc;
10use tokio::time::{self, MissedTickBehavior};
11
12use atuin_client::{history::store::HistoryStore, record::sync, settings::Settings};
13use atuin_dotfiles::store::{AliasStore, var::VarStore};
14
15use crate::{
16 daemon::{Component, DaemonHandle},
17 events::DaemonEvent,
18};
19
20enum SyncCommand {
22 ForceSync,
24 Stop,
26}
27
28#[derive(Clone, Copy, PartialEq, Eq)]
30enum SyncState {
31 Idle,
33 Retrying,
36}
37
38pub struct SyncComponent {
46 task_handle: Option<tokio::task::JoinHandle<()>>,
47 command_tx: Option<mpsc::Sender<SyncCommand>>,
48}
49
50impl SyncComponent {
51 pub fn new() -> Self {
53 Self {
54 task_handle: None,
55 command_tx: None,
56 }
57 }
58}
59
60impl Default for SyncComponent {
61 fn default() -> Self {
62 Self::new()
63 }
64}
65
66#[tonic::async_trait]
67impl Component for SyncComponent {
68 fn name(&self) -> &'static str {
69 "sync"
70 }
71
72 async fn start(&mut self, handle: DaemonHandle) -> Result<()> {
73 let (cmd_tx, cmd_rx) = mpsc::channel(16);
74 self.command_tx = Some(cmd_tx);
75
76 self.task_handle = Some(tokio::spawn(sync_loop(handle, cmd_rx)));
78
79 tracing::info!("sync component started");
80 Ok(())
81 }
82
83 async fn handle_event(&mut self, event: &DaemonEvent) -> Result<()> {
84 if let DaemonEvent::ForceSync = event {
85 tracing::info!("force sync requested");
86 if let Some(tx) = &self.command_tx {
87 let _ = tx.send(SyncCommand::ForceSync).await;
88 }
89 }
90 Ok(())
91 }
92
93 async fn stop(&mut self) -> Result<()> {
94 if let Some(tx) = &self.command_tx {
95 let _ = tx.send(SyncCommand::Stop).await;
96 }
97 if let Some(handle) = self.task_handle.take() {
98 let _ = tokio::time::timeout(std::time::Duration::from_secs(5), handle).await;
100 }
101 tracing::info!("sync component stopped");
102 Ok(())
103 }
104}
105
106async fn sync_loop(handle: DaemonHandle, mut cmd_rx: mpsc::Receiver<SyncCommand>) {
111 tracing::info!("sync loop starting");
112
113 let settings = handle.settings().await.clone();
115 let host_id = match Settings::host_id().await {
116 Ok(id) => id,
117 Err(e) => {
118 tracing::error!("failed to get host id, sync disabled: {e}");
119 return;
120 }
121 };
122
123 let encryption_key = *handle.encryption_key();
125 let history_store = HistoryStore::new(handle.store().clone(), host_id, encryption_key);
126 let alias_store = AliasStore::new(handle.store().clone(), host_id, encryption_key);
127 let var_store = VarStore::new(handle.store().clone(), host_id, encryption_key);
128
129 let max_interval: f64 = 60.0 * 30.0 + rand::thread_rng().gen_range(0.0..60.0);
131
132 let mut ticker = time::interval(time::Duration::from_secs(settings.daemon.sync_frequency));
133
134 ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
137
138 let mut sync_state = SyncState::Idle;
139
140 loop {
141 tokio::select! {
142 _ = ticker.tick() => {
143 let settings = handle.settings().await;
144
145 if !settings.auto_sync && sync_state == SyncState::Idle {
148 tracing::debug!("auto_sync disabled, skipping periodic sync tick");
149 continue;
150 }
151
152 sync_state = do_sync_tick(
153 &handle,
154 &history_store,
155 &alias_store,
156 &var_store,
157 &mut ticker,
158 max_interval,
159 &settings,
160 ).await;
161 }
162 cmd = cmd_rx.recv() => {
163 match cmd {
164 Some(SyncCommand::ForceSync) => {
165 tracing::info!("executing force sync");
166 let settings = handle.settings().await;
167 sync_state = do_sync_tick(
168 &handle,
169 &history_store,
170 &alias_store,
171 &var_store,
172 &mut ticker,
173 max_interval,
174 &settings,
175 ).await;
176 }
177 Some(SyncCommand::Stop) | None => {
178 tracing::info!("sync loop stopping");
179 break;
180 }
181 }
182 }
183 }
184 }
185}
186
187async fn do_sync_tick(
191 handle: &DaemonHandle,
192 history_store: &HistoryStore,
193 alias_store: &AliasStore,
194 var_store: &VarStore,
195 ticker: &mut time::Interval,
196 max_interval: f64,
197 settings: &Settings,
198) -> SyncState {
199 tracing::info!("sync tick");
200
201 let logged_in = match settings.logged_in().await {
203 Ok(v) => v,
204 Err(e) => {
205 tracing::warn!("failed to check login status, skipping sync tick: {e}");
206 return SyncState::Idle;
207 }
208 };
209
210 if !logged_in {
211 tracing::debug!("not logged in, skipping sync tick");
212 return SyncState::Idle;
213 }
214
215 let res = sync::sync(settings, handle.store(), handle.encryption_key()).await;
217
218 match res {
219 Err(e) => {
220 tracing::error!("sync tick failed with {e}");
221
222 handle.emit(DaemonEvent::SyncFailed {
224 error: e.to_string(),
225 });
226
227 let mut rng = rand::thread_rng();
229 let mut new_interval = ticker.period().as_secs_f64() * rng.gen_range(2.0..2.2);
230
231 if new_interval > max_interval {
232 new_interval = max_interval;
233 }
234
235 *ticker = time::interval_at(
236 tokio::time::Instant::now() + Duration::from_secs(new_interval as u64),
237 time::Duration::from_secs(new_interval as u64),
238 );
239 ticker.reset_after(time::Duration::from_secs(new_interval as u64));
240 ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
241
242 tracing::error!("backing off, next sync tick in {new_interval}");
243
244 SyncState::Retrying
245 }
246 Ok((uploaded_count, downloaded_records)) => {
247 tracing::info!(
248 uploaded = uploaded_count,
249 downloaded = downloaded_records.len(),
250 "sync complete"
251 );
252
253 if let Err(e) = history_store
255 .incremental_build(handle.history_db(), &downloaded_records)
256 .await
257 {
258 tracing::error!("failed to build history from downloaded records: {e}");
259 }
260
261 handle.emit(DaemonEvent::RecordsAdded(downloaded_records.clone()));
263
264 handle.emit(DaemonEvent::SyncCompleted {
266 uploaded: uploaded_count as usize,
267 downloaded: downloaded_records.len(),
268 });
269
270 if let Err(e) = alias_store.build().await {
272 tracing::error!("failed to rebuild alias store: {e}");
273 }
274 if let Err(e) = var_store.build().await {
275 tracing::error!("failed to rebuild var store: {e}");
276 }
277
278 if ticker.period().as_secs() != settings.daemon.sync_frequency {
280 *ticker = time::interval_at(
281 tokio::time::Instant::now()
282 + Duration::from_secs(settings.daemon.sync_frequency),
283 time::Duration::from_secs(settings.daemon.sync_frequency),
284 );
285 ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
286 }
287
288 if let Err(e) = Settings::save_sync_time().await {
290 tracing::error!("failed to save sync time: {e}");
291 }
292
293 SyncState::Idle
294 }
295 }
296}