1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
use super::data::snapshot::Snapshot;
use crossbeam_channel::{bounded, Sender};
use std::fmt;
use tokio_sync::oneshot;
#[derive(Debug)]
pub enum SnapshotError {
InternalError,
ReceiverShutdown,
}
pub(crate) enum ControlFrame {
Snapshot(Sender<Snapshot>),
SnapshotAsync(oneshot::Sender<Snapshot>),
}
#[derive(Clone)]
pub struct Controller {
control_tx: Sender<ControlFrame>,
}
impl Controller {
pub(crate) fn new(control_tx: Sender<ControlFrame>) -> Controller { Controller { control_tx } }
pub fn get_snapshot(&self) -> Result<Snapshot, SnapshotError> {
let (tx, rx) = bounded(0);
let msg = ControlFrame::Snapshot(tx);
self.control_tx
.send(msg)
.map_err(|_| SnapshotError::ReceiverShutdown)
.and_then(move |_| rx.recv().map_err(|_| SnapshotError::InternalError))
}
pub fn get_snapshot_async(&self) -> Result<oneshot::Receiver<Snapshot>, SnapshotError> {
let (tx, rx) = oneshot::channel();
let msg = ControlFrame::SnapshotAsync(tx);
self.control_tx
.send(msg)
.map_err(|_| SnapshotError::ReceiverShutdown)
.map(move |_| rx)
}
}
impl fmt::Display for SnapshotError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
SnapshotError::InternalError => write!(f, "internal error during snapshot generation"),
SnapshotError::ReceiverShutdown => write!(f, "the receiver is not currently running"),
}
}
}