graft_client/runtime/
sync.rs

1use std::{
2    collections::HashSet,
3    fmt::Debug,
4    sync::Arc,
5    thread::{self, JoinHandle, sleep},
6    time::{Duration, Instant},
7};
8
9use control::{SyncControl, SyncRpc};
10use crossbeam::channel::{Receiver, Sender, TrySendError, bounded, select_biased};
11use culprit::{Culprit, Result, ResultExt};
12use graft_core::{ClientId, VolumeId};
13use job::Job;
14use parking_lot::RwLock;
15use thiserror::Error;
16use tryiter::{TryIterator, TryIteratorExt};
17
18use crate::{ClientErr, ClientPair};
19
20use super::storage::{
21    Storage, StorageErr,
22    changeset::SetSubscriber,
23    volume_state::{SyncDirection, VolumeStatus},
24};
25
26const MAX_RECENT_ERRORS: usize = 16;
27
28pub mod control;
29mod job;
30
31#[derive(Debug, Error)]
32pub enum StartupErr {
33    #[error("the Sync task is already running")]
34    AlreadyRunning,
35}
36
37#[derive(Debug, Error)]
38pub enum ShutdownErr {
39    #[error("error while shutting down Sync task")]
40    JoinError,
41
42    #[error("timeout while waiting for Sync task to cleanly shutdown")]
43    Timeout,
44
45    #[error("the Sync task is not running")]
46    TaskNotRunning,
47}
48
49#[derive(Clone, Default)]
50pub struct SyncTaskHandle {
51    inner: Arc<RwLock<Option<SyncTaskHandleInner>>>,
52}
53
54struct SyncTaskHandleInner {
55    handle: JoinHandle<()>,
56    control: Sender<SyncControl>,
57}
58
59impl SyncTaskHandle {
60    pub fn rpc(&self) -> SyncRpc {
61        let control = self
62            .inner
63            .read()
64            .as_ref()
65            .map(|inner| inner.control.clone());
66        SyncRpc::new(control)
67    }
68
69    #[allow(clippy::too_many_arguments)]
70    pub fn spawn(
71        &self,
72        cid: ClientId,
73        storage: Arc<Storage>,
74        clients: Arc<ClientPair>,
75        refresh_interval: Duration,
76        control_channel_size: usize,
77        autosync: bool,
78        thread_name: &str,
79    ) -> Result<(), StartupErr> {
80        let mut inner = self.inner.write();
81        if inner.is_some() {
82            return Err(Culprit::new(StartupErr::AlreadyRunning));
83        }
84
85        let (control_tx, control_rx) = bounded(control_channel_size);
86        let commits = storage.local_changeset().subscribe_all();
87
88        let task = SyncTask {
89            cid,
90            storage,
91            clients,
92            refresh_interval,
93            commits,
94            control: control_rx,
95            autosync,
96            recent_errors: Default::default(),
97        };
98
99        let handle = thread::Builder::new()
100            .name(thread_name.into())
101            .spawn(move || task.run())
102            .expect("failed to spawn sync task");
103
104        inner.replace(SyncTaskHandleInner { handle, control: control_tx });
105        Ok(())
106    }
107
108    pub fn shutdown_timeout(&self, timeout: Duration) -> Result<(), ShutdownErr> {
109        self.shutdown(Instant::now() + timeout)
110    }
111
112    pub fn shutdown(&self, deadline: Instant) -> Result<(), ShutdownErr> {
113        if let Some(inner) = self.inner.write().take() {
114            // drop the control channel to trigger shutdown
115            if inner
116                .control
117                .send_deadline(SyncControl::Shutdown, deadline)
118                .is_err()
119            {
120                return Err(Culprit::new_with_note(
121                    ShutdownErr::Timeout,
122                    "timeout while waiting to send Shutdown message to sync task",
123                ));
124            }
125
126            let (tx, rx) = bounded(0);
127            std::thread::spawn(move || {
128                tx.send(inner.handle.join()).unwrap();
129            });
130
131            // wait for the thread to complete or the timeout to elapse
132            match rx.recv_deadline(deadline) {
133                Ok(Ok(())) => {
134                    tracing::debug!("sync task shutdown completed");
135                    Ok(())
136                }
137                Ok(Err(err)) => {
138                    tracing::error!(?err, "sync task shutdown error");
139                    let msg = match err.downcast_ref::<&'static str>() {
140                        Some(s) => *s,
141                        None => match err.downcast_ref::<String>() {
142                            Some(s) => &s[..],
143                            None => "unknown panic",
144                        },
145                    };
146                    Err(Culprit::new_with_note(
147                        ShutdownErr::JoinError,
148                        format!("sync task panic: {msg}"),
149                    ))
150                }
151                Err(_) => {
152                    tracing::warn!("timeout waiting for sync task to shutdown");
153                    Err(Culprit::new(ShutdownErr::Timeout))
154                }
155            }
156        } else {
157            Err(Culprit::new(ShutdownErr::TaskNotRunning))
158        }
159    }
160}
161
162#[derive(Debug, Error)]
163pub enum SyncTaskErr {
164    #[error("client error: {0}")]
165    Client(#[from] ClientErr),
166
167    #[error("completion channel disconnected")]
168    CompletionChannelDisconnected,
169}
170
171impl From<StorageErr> for SyncTaskErr {
172    fn from(err: StorageErr) -> Self {
173        Self::Client(err.into())
174    }
175}
176
177/// A `SyncTask` is a background task which continuously syncs volumes to and from
178/// a Graft service.
179pub struct SyncTask {
180    cid: ClientId,
181    storage: Arc<Storage>,
182    clients: Arc<ClientPair>,
183    refresh_interval: Duration,
184    commits: SetSubscriber<VolumeId>,
185    control: Receiver<SyncControl>,
186
187    /// when autosync is true, volumes will be automatically pushed and pulled
188    /// to the server when they change or every `refresh_interval`.
189    autosync: bool,
190
191    recent_errors: Vec<(Instant, Culprit<SyncTaskErr>)>,
192}
193
194impl SyncTask {
195    fn run(mut self) {
196        loop {
197            match self.run_inner() {
198                Ok(()) => {
199                    tracing::debug!("sync task inner loop completed without error; shutting down");
200                    break;
201                }
202                Err(err) => {
203                    match err.ctx() {
204                        SyncTaskErr::Client(err) if err.is_network_err() || err.is_auth_err() => {
205                            tracing::debug!("sync task: network error: {:?}", err)
206                        }
207                        _ => tracing::error!("sync task error: {:?}", err),
208                    }
209
210                    self.recent_errors.push((Instant::now(), err));
211                    if self.recent_errors.len() > MAX_RECENT_ERRORS {
212                        self.recent_errors.remove(0);
213                    }
214
215                    // we want to explore system states that include sync task errors
216                    precept::expect_reachable!("error occurred in sync task");
217                    sleep(Duration::from_millis(100));
218                }
219            }
220        }
221    }
222
223    fn run_inner(&mut self) -> Result<(), SyncTaskErr> {
224        loop {
225            select_biased! {
226                recv(self.control) -> control => {
227                    match control.ok() {
228                        None| Some(SyncControl::Shutdown) => {
229                            break
230                        }
231                        Some(control) => self.handle_control(control)?,
232                    }
233                }
234
235                recv(self.commits.ready()) -> result => {
236                    if let Err(err) = result {
237                        panic!("commit subscriber error: {err:?}");
238                    }
239                    let vids = self.commits.changed();
240                    if !vids.is_empty() {
241                        self.handle_commit(vids)?;
242                    }
243                }
244
245                default(self.refresh_interval) => self.handle_tick()?,
246            }
247        }
248        Ok(())
249    }
250
251    fn handle_control(&mut self, msg: SyncControl) -> Result<(), SyncTaskErr> {
252        macro_rules! reply {
253            ($complete:ident, $result:expr) => {
254                match $complete.try_send($result) {
255                    Ok(()) => Ok(()),
256                    Err(TrySendError::Full(_)) => {
257                        unreachable!("SyncControl completion channel should never be full")
258                    }
259                    Err(TrySendError::Disconnected(err)) => Err(Culprit::new_with_note(
260                        SyncTaskErr::CompletionChannelDisconnected,
261                        format!("SyncControl completion channel disconnected: {err:?}"),
262                    )),
263                }
264            };
265        }
266
267        match msg {
268            SyncControl::GetAutosync { complete } => reply!(complete, self.autosync),
269            SyncControl::SetAutosync { autosync, complete } => {
270                self.autosync = autosync;
271                reply!(complete, ())
272            }
273            SyncControl::Sync { vid, direction, complete } => {
274                reply!(complete, self.sync_volume(vid, direction))
275            }
276            SyncControl::ResetToRemote { vid, complete } => {
277                reply!(complete, self.reset_volume_to_remote(vid))
278            }
279            SyncControl::DrainRecentErrors { complete } => {
280                reply!(complete, self.recent_errors.drain(..).collect())
281            }
282            SyncControl::Shutdown => {
283                unreachable!("shutdown message is handled in sync task select loop")
284            }
285        }
286    }
287
288    /// Synchronously sync a volume with the remote
289    /// If dir is `SyncDirection::Both`, this function will push before it pulls
290    fn sync_volume(&mut self, vid: VolumeId, dir: SyncDirection) -> Result<(), ClientErr> {
291        if dir.matches(SyncDirection::Push) {
292            let state = self.storage.volume_state(&vid).or_into_ctx()?;
293            if state.has_pending_commits() {
294                Job::push(vid.clone(), self.cid.clone())
295                    .run(&self.storage, &self.clients)
296                    .or_into_culprit("error while pushing volume")?;
297            }
298        }
299
300        if dir.matches(SyncDirection::Pull) {
301            Job::pull(vid)
302                .run(&self.storage, &self.clients)
303                .or_into_culprit("error while pulling volume")?;
304        }
305
306        Ok(())
307    }
308
309    /// Reset the volume to the remote. This will cause all pending commits to
310    /// be rolled back and the volume status to be cleared.
311    fn reset_volume_to_remote(&mut self, vid: VolumeId) -> Result<(), ClientErr> {
312        Job::pull_and_reset(vid)
313            .run(&self.storage, &self.clients)
314            .or_into_culprit("error while resetting volume to the remote")
315    }
316
317    fn handle_tick(&mut self) -> Result<(), SyncTaskErr> {
318        if !self.autosync {
319            return Ok(());
320        }
321
322        let mut jobs = self.jobs(SyncDirection::Both, None);
323        while let Some(job) = jobs.try_next()? {
324            job.run(&self.storage, &self.clients).or_into_ctx()?;
325        }
326        Ok(())
327    }
328
329    fn handle_commit(&mut self, vids: HashSet<VolumeId>) -> Result<(), SyncTaskErr> {
330        if !self.autosync {
331            return Ok(());
332        }
333
334        let mut jobs = self.jobs(SyncDirection::Push, Some(vids));
335        while let Some(job) = jobs.try_next()? {
336            job.run(&self.storage, &self.clients).or_into_ctx()?;
337        }
338        Ok(())
339    }
340
341    fn jobs(
342        &self,
343        sync: SyncDirection,
344        vids: Option<HashSet<VolumeId>>,
345    ) -> impl TryIterator<Ok = Job, Err = Culprit<SyncTaskErr>> + '_ {
346        self.storage
347            .query_volumes(sync, vids)
348            .map_err(|err| err.map_ctx(SyncTaskErr::from))
349            .try_filter_map(move |state| {
350                if state.status() != VolumeStatus::Ok {
351                    // volume must be healthy
352                    return Ok(None);
353                }
354
355                let config = state.config();
356                let can_push = config.sync().matches(SyncDirection::Push);
357                let can_pull = config.sync().matches(SyncDirection::Pull);
358                let has_pending_commits = state.has_pending_commits();
359                if can_push && has_pending_commits && sync.matches(SyncDirection::Push) {
360                    Ok(Some(Job::push(state.vid().clone(), self.cid.clone())))
361                } else if can_pull && sync.matches(SyncDirection::Pull) && !state.is_syncing() {
362                    Ok(Some(Job::pull(state.vid().clone())))
363                } else {
364                    Ok(None)
365                }
366            })
367    }
368}