Skip to main content

graft_client/runtime/sync/
control.rs

1use std::time::Instant;
2
3use crossbeam::channel::{self, Receiver, Sender};
4use graft_core::VolumeId;
5
6use crate::{ClientErr, runtime::storage::volume_state::SyncDirection};
7use culprit::{Culprit, Result};
8
9use super::SyncTaskErr;
10
11#[derive(Debug)]
12pub enum SyncControl {
13    GetAutosync {
14        complete: Sender<bool>,
15    },
16
17    SetAutosync {
18        autosync: bool,
19        complete: Sender<()>,
20    },
21
22    Sync {
23        vid: VolumeId,
24        direction: SyncDirection,
25        complete: Sender<Result<(), ClientErr>>,
26    },
27
28    ResetToRemote {
29        vid: VolumeId,
30        complete: Sender<Result<(), ClientErr>>,
31    },
32
33    DrainRecentErrors {
34        complete: Sender<Vec<(Instant, Culprit<SyncTaskErr>)>>,
35    },
36
37    Shutdown,
38}
39
40#[derive(Debug, Clone)]
41pub struct SyncRpc {
42    control: Option<Sender<SyncControl>>,
43}
44
45impl SyncRpc {
46    pub(crate) fn new(control: Option<Sender<SyncControl>>) -> Self {
47        Self { control }
48    }
49
50    fn must_call<T>(&self, msg: SyncControl, recv: Receiver<T>) -> T {
51        self.control
52            .as_ref()
53            .expect("SyncRpc: control channel missing")
54            .send(msg)
55            .expect("SyncRpc: control channel closed");
56        recv.recv().expect("SyncRpc: response channel closed")
57    }
58
59    pub fn get_autosync(&self) -> bool {
60        let (complete, recv) = channel::bounded(1);
61        self.must_call(SyncControl::GetAutosync { complete }, recv)
62    }
63
64    pub fn set_autosync(&self, autosync: bool) {
65        let (complete, recv) = channel::bounded(1);
66        self.must_call(SyncControl::SetAutosync { autosync, complete }, recv)
67    }
68
69    pub fn sync(&self, vid: VolumeId, direction: SyncDirection) -> Result<(), ClientErr> {
70        let (complete, recv) = channel::bounded(1);
71        self.must_call(SyncControl::Sync { vid, direction, complete }, recv)
72    }
73
74    pub fn reset_to_remote(&self, vid: VolumeId) -> Result<(), ClientErr> {
75        let (complete, recv) = channel::bounded(1);
76        self.must_call(SyncControl::ResetToRemote { vid, complete }, recv)
77    }
78
79    pub fn drain_recent_errors(&self) -> Vec<(Instant, Culprit<SyncTaskErr>)> {
80        let (complete, recv) = channel::bounded(1);
81        self.must_call(SyncControl::DrainRecentErrors { complete }, recv)
82    }
83}