1use std::{
2 collections::HashSet,
3 fmt::Debug,
4 sync::Arc,
5 thread::{self, JoinHandle, sleep},
6 time::{Duration, Instant},
7};
8
9use control::{SyncControl, SyncRpc};
10use crossbeam::channel::{Receiver, Sender, TrySendError, bounded, select_biased};
11use culprit::{Culprit, Result, ResultExt};
12use graft_core::{ClientId, VolumeId};
13use job::Job;
14use parking_lot::RwLock;
15use thiserror::Error;
16use tryiter::{TryIterator, TryIteratorExt};
17
18use crate::{ClientErr, ClientPair};
19
20use super::storage::{
21 Storage, StorageErr,
22 changeset::SetSubscriber,
23 volume_state::{SyncDirection, VolumeStatus},
24};
25
26const MAX_RECENT_ERRORS: usize = 16;
27
28pub mod control;
29mod job;
30
31#[derive(Debug, Error)]
32pub enum StartupErr {
33 #[error("the Sync task is already running")]
34 AlreadyRunning,
35}
36
37#[derive(Debug, Error)]
38pub enum ShutdownErr {
39 #[error("error while shutting down Sync task")]
40 JoinError,
41
42 #[error("timeout while waiting for Sync task to cleanly shutdown")]
43 Timeout,
44
45 #[error("the Sync task is not running")]
46 TaskNotRunning,
47}
48
49#[derive(Clone, Default)]
50pub struct SyncTaskHandle {
51 inner: Arc<RwLock<Option<SyncTaskHandleInner>>>,
52}
53
54struct SyncTaskHandleInner {
55 handle: JoinHandle<()>,
56 control: Sender<SyncControl>,
57}
58
59impl SyncTaskHandle {
60 pub fn rpc(&self) -> SyncRpc {
61 let control = self
62 .inner
63 .read()
64 .as_ref()
65 .map(|inner| inner.control.clone());
66 SyncRpc::new(control)
67 }
68
69 #[allow(clippy::too_many_arguments)]
70 pub fn spawn(
71 &self,
72 cid: ClientId,
73 storage: Arc<Storage>,
74 clients: Arc<ClientPair>,
75 refresh_interval: Duration,
76 control_channel_size: usize,
77 autosync: bool,
78 thread_name: &str,
79 ) -> Result<(), StartupErr> {
80 let mut inner = self.inner.write();
81 if inner.is_some() {
82 return Err(Culprit::new(StartupErr::AlreadyRunning));
83 }
84
85 let (control_tx, control_rx) = bounded(control_channel_size);
86 let commits = storage.local_changeset().subscribe_all();
87
88 let task = SyncTask {
89 cid,
90 storage,
91 clients,
92 refresh_interval,
93 commits,
94 control: control_rx,
95 autosync,
96 recent_errors: Default::default(),
97 };
98
99 let handle = thread::Builder::new()
100 .name(thread_name.into())
101 .spawn(move || task.run())
102 .expect("failed to spawn sync task");
103
104 inner.replace(SyncTaskHandleInner { handle, control: control_tx });
105 Ok(())
106 }
107
108 pub fn shutdown_timeout(&self, timeout: Duration) -> Result<(), ShutdownErr> {
109 self.shutdown(Instant::now() + timeout)
110 }
111
112 pub fn shutdown(&self, deadline: Instant) -> Result<(), ShutdownErr> {
113 if let Some(inner) = self.inner.write().take() {
114 if inner
116 .control
117 .send_deadline(SyncControl::Shutdown, deadline)
118 .is_err()
119 {
120 return Err(Culprit::new_with_note(
121 ShutdownErr::Timeout,
122 "timeout while waiting to send Shutdown message to sync task",
123 ));
124 }
125
126 let (tx, rx) = bounded(0);
127 std::thread::spawn(move || {
128 tx.send(inner.handle.join()).unwrap();
129 });
130
131 match rx.recv_deadline(deadline) {
133 Ok(Ok(())) => {
134 tracing::debug!("sync task shutdown completed");
135 Ok(())
136 }
137 Ok(Err(err)) => {
138 tracing::error!(?err, "sync task shutdown error");
139 let msg = match err.downcast_ref::<&'static str>() {
140 Some(s) => *s,
141 None => match err.downcast_ref::<String>() {
142 Some(s) => &s[..],
143 None => "unknown panic",
144 },
145 };
146 Err(Culprit::new_with_note(
147 ShutdownErr::JoinError,
148 format!("sync task panic: {msg}"),
149 ))
150 }
151 Err(_) => {
152 tracing::warn!("timeout waiting for sync task to shutdown");
153 Err(Culprit::new(ShutdownErr::Timeout))
154 }
155 }
156 } else {
157 Err(Culprit::new(ShutdownErr::TaskNotRunning))
158 }
159 }
160}
161
162#[derive(Debug, Error)]
163pub enum SyncTaskErr {
164 #[error("client error: {0}")]
165 Client(#[from] ClientErr),
166
167 #[error("completion channel disconnected")]
168 CompletionChannelDisconnected,
169}
170
171impl From<StorageErr> for SyncTaskErr {
172 fn from(err: StorageErr) -> Self {
173 Self::Client(err.into())
174 }
175}
176
177pub struct SyncTask {
180 cid: ClientId,
181 storage: Arc<Storage>,
182 clients: Arc<ClientPair>,
183 refresh_interval: Duration,
184 commits: SetSubscriber<VolumeId>,
185 control: Receiver<SyncControl>,
186
187 autosync: bool,
190
191 recent_errors: Vec<(Instant, Culprit<SyncTaskErr>)>,
192}
193
194impl SyncTask {
195 fn run(mut self) {
196 loop {
197 match self.run_inner() {
198 Ok(()) => {
199 tracing::debug!("sync task inner loop completed without error; shutting down");
200 break;
201 }
202 Err(err) => {
203 match err.ctx() {
204 SyncTaskErr::Client(err) if err.is_network_err() || err.is_auth_err() => {
205 tracing::debug!("sync task: network error: {:?}", err)
206 }
207 _ => tracing::error!("sync task error: {:?}", err),
208 }
209
210 self.recent_errors.push((Instant::now(), err));
211 if self.recent_errors.len() > MAX_RECENT_ERRORS {
212 self.recent_errors.remove(0);
213 }
214
215 precept::expect_reachable!("error occurred in sync task");
217 sleep(Duration::from_millis(100));
218 }
219 }
220 }
221 }
222
223 fn run_inner(&mut self) -> Result<(), SyncTaskErr> {
224 loop {
225 select_biased! {
226 recv(self.control) -> control => {
227 match control.ok() {
228 None| Some(SyncControl::Shutdown) => {
229 break
230 }
231 Some(control) => self.handle_control(control)?,
232 }
233 }
234
235 recv(self.commits.ready()) -> result => {
236 if let Err(err) = result {
237 panic!("commit subscriber error: {err:?}");
238 }
239 let vids = self.commits.changed();
240 if !vids.is_empty() {
241 self.handle_commit(vids)?;
242 }
243 }
244
245 default(self.refresh_interval) => self.handle_tick()?,
246 }
247 }
248 Ok(())
249 }
250
251 fn handle_control(&mut self, msg: SyncControl) -> Result<(), SyncTaskErr> {
252 macro_rules! reply {
253 ($complete:ident, $result:expr) => {
254 match $complete.try_send($result) {
255 Ok(()) => Ok(()),
256 Err(TrySendError::Full(_)) => {
257 unreachable!("SyncControl completion channel should never be full")
258 }
259 Err(TrySendError::Disconnected(err)) => Err(Culprit::new_with_note(
260 SyncTaskErr::CompletionChannelDisconnected,
261 format!("SyncControl completion channel disconnected: {err:?}"),
262 )),
263 }
264 };
265 }
266
267 match msg {
268 SyncControl::GetAutosync { complete } => reply!(complete, self.autosync),
269 SyncControl::SetAutosync { autosync, complete } => {
270 self.autosync = autosync;
271 reply!(complete, ())
272 }
273 SyncControl::Sync { vid, direction, complete } => {
274 reply!(complete, self.sync_volume(vid, direction))
275 }
276 SyncControl::ResetToRemote { vid, complete } => {
277 reply!(complete, self.reset_volume_to_remote(vid))
278 }
279 SyncControl::DrainRecentErrors { complete } => {
280 reply!(complete, self.recent_errors.drain(..).collect())
281 }
282 SyncControl::Shutdown => {
283 unreachable!("shutdown message is handled in sync task select loop")
284 }
285 }
286 }
287
288 fn sync_volume(&mut self, vid: VolumeId, dir: SyncDirection) -> Result<(), ClientErr> {
291 if dir.matches(SyncDirection::Push) {
292 let state = self.storage.volume_state(&vid).or_into_ctx()?;
293 if state.has_pending_commits() {
294 Job::push(vid.clone(), self.cid.clone())
295 .run(&self.storage, &self.clients)
296 .or_into_culprit("error while pushing volume")?;
297 }
298 }
299
300 if dir.matches(SyncDirection::Pull) {
301 Job::pull(vid)
302 .run(&self.storage, &self.clients)
303 .or_into_culprit("error while pulling volume")?;
304 }
305
306 Ok(())
307 }
308
309 fn reset_volume_to_remote(&mut self, vid: VolumeId) -> Result<(), ClientErr> {
312 Job::pull_and_reset(vid)
313 .run(&self.storage, &self.clients)
314 .or_into_culprit("error while resetting volume to the remote")
315 }
316
317 fn handle_tick(&mut self) -> Result<(), SyncTaskErr> {
318 if !self.autosync {
319 return Ok(());
320 }
321
322 let mut jobs = self.jobs(SyncDirection::Both, None);
323 while let Some(job) = jobs.try_next()? {
324 job.run(&self.storage, &self.clients).or_into_ctx()?;
325 }
326 Ok(())
327 }
328
329 fn handle_commit(&mut self, vids: HashSet<VolumeId>) -> Result<(), SyncTaskErr> {
330 if !self.autosync {
331 return Ok(());
332 }
333
334 let mut jobs = self.jobs(SyncDirection::Push, Some(vids));
335 while let Some(job) = jobs.try_next()? {
336 job.run(&self.storage, &self.clients).or_into_ctx()?;
337 }
338 Ok(())
339 }
340
341 fn jobs(
342 &self,
343 sync: SyncDirection,
344 vids: Option<HashSet<VolumeId>>,
345 ) -> impl TryIterator<Ok = Job, Err = Culprit<SyncTaskErr>> + '_ {
346 self.storage
347 .query_volumes(sync, vids)
348 .map_err(|err| err.map_ctx(SyncTaskErr::from))
349 .try_filter_map(move |state| {
350 if state.status() != VolumeStatus::Ok {
351 return Ok(None);
353 }
354
355 let config = state.config();
356 let can_push = config.sync().matches(SyncDirection::Push);
357 let can_pull = config.sync().matches(SyncDirection::Pull);
358 let has_pending_commits = state.has_pending_commits();
359 if can_push && has_pending_commits && sync.matches(SyncDirection::Push) {
360 Ok(Some(Job::push(state.vid().clone(), self.cid.clone())))
361 } else if can_pull && sync.matches(SyncDirection::Pull) && !state.is_syncing() {
362 Ok(Some(Job::pull(state.vid().clone())))
363 } else {
364 Ok(None)
365 }
366 })
367 }
368}