graft_client/runtime/sync/
control.rs1use std::time::Instant;
2
3use crossbeam::channel::{self, Receiver, Sender};
4use graft_core::VolumeId;
5
6use crate::{ClientErr, runtime::storage::volume_state::SyncDirection};
7use culprit::{Culprit, Result};
8
9use super::SyncTaskErr;
10
11#[derive(Debug)]
12pub enum SyncControl {
13 GetAutosync {
14 complete: Sender<bool>,
15 },
16
17 SetAutosync {
18 autosync: bool,
19 complete: Sender<()>,
20 },
21
22 Sync {
23 vid: VolumeId,
24 direction: SyncDirection,
25 complete: Sender<Result<(), ClientErr>>,
26 },
27
28 ResetToRemote {
29 vid: VolumeId,
30 complete: Sender<Result<(), ClientErr>>,
31 },
32
33 DrainRecentErrors {
34 complete: Sender<Vec<(Instant, Culprit<SyncTaskErr>)>>,
35 },
36
37 Shutdown,
38}
39
40#[derive(Debug, Clone)]
41pub struct SyncRpc {
42 control: Option<Sender<SyncControl>>,
43}
44
45impl SyncRpc {
46 pub(crate) fn new(control: Option<Sender<SyncControl>>) -> Self {
47 Self { control }
48 }
49
50 fn must_call<T>(&self, msg: SyncControl, recv: Receiver<T>) -> T {
51 self.control
52 .as_ref()
53 .expect("SyncRpc: control channel missing")
54 .send(msg)
55 .expect("SyncRpc: control channel closed");
56 recv.recv().expect("SyncRpc: response channel closed")
57 }
58
59 pub fn get_autosync(&self) -> bool {
60 let (complete, recv) = channel::bounded(1);
61 self.must_call(SyncControl::GetAutosync { complete }, recv)
62 }
63
64 pub fn set_autosync(&self, autosync: bool) {
65 let (complete, recv) = channel::bounded(1);
66 self.must_call(SyncControl::SetAutosync { autosync, complete }, recv)
67 }
68
69 pub fn sync(&self, vid: VolumeId, direction: SyncDirection) -> Result<(), ClientErr> {
70 let (complete, recv) = channel::bounded(1);
71 self.must_call(SyncControl::Sync { vid, direction, complete }, recv)
72 }
73
74 pub fn reset_to_remote(&self, vid: VolumeId) -> Result<(), ClientErr> {
75 let (complete, recv) = channel::bounded(1);
76 self.must_call(SyncControl::ResetToRemote { vid, complete }, recv)
77 }
78
79 pub fn drain_recent_errors(&self) -> Vec<(Instant, Culprit<SyncTaskErr>)> {
80 let (complete, recv) = channel::bounded(1);
81 self.must_call(SyncControl::DrainRecentErrors { complete }, recv)
82 }
83}