1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
use crate::state::{ProviderMode, RegisterProvider, RILL_STATE};
use anyhow::Error;
use futures::channel::mpsc;
use meio::prelude::Action;
use rill_protocol::provider::{Description, Path, RillData};
use std::sync::Arc;
use std::time::SystemTime;
use tokio::sync::watch;
#[derive(Debug)]
pub(crate) enum DataEnvelope {
DataEvent {
timestamp: SystemTime,
data: RillData,
},
EndStream {
description: Arc<Description>,
},
}
impl Action for DataEnvelope {}
pub(crate) type DataSender = mpsc::UnboundedSender<DataEnvelope>;
pub(crate) type DataReceiver = mpsc::UnboundedReceiver<DataEnvelope>;
#[derive(Debug)]
pub struct Provider {
active: watch::Receiver<bool>,
description: Arc<Description>,
sender: DataSender,
}
impl Provider {
pub(crate) fn new(description: Description, active: bool) -> Self {
log::trace!("Creating Provider with path: {:?}", description.path);
let (tx, rx) = mpsc::unbounded();
let (active_tx, active_rx) = watch::channel(active);
let description = Arc::new(description);
let this = Provider {
active: active_rx,
description: description.clone(),
sender: tx,
};
let mode = {
if active {
ProviderMode::Active
} else {
ProviderMode::Reactive {
activator: active_tx,
}
}
};
let event = RegisterProvider {
description,
mode,
rx,
};
let state = RILL_STATE.get().expect("rill is not installed!");
state.send(event);
this
}
pub fn path(&self) -> &Path {
&self.description.path
}
pub(crate) fn send(&self, data: RillData, timestamp: Option<SystemTime>) {
if *self.active.borrow() {
let timestamp = timestamp.unwrap_or_else(SystemTime::now);
let envelope = DataEnvelope::DataEvent { timestamp, data };
if let Err(err) = self.sender.unbounded_send(envelope) {
log::error!("Can't transfer data to sender: {}", err);
}
}
}
}
impl Provider {
pub fn is_active(&self) -> bool {
*self.active.borrow()
}
pub async fn when_activated(&mut self) -> Result<(), Error> {
loop {
if self.is_active() {
break;
}
self.active.changed().await?;
}
Ok(())
}
}
impl Drop for Provider {
fn drop(&mut self) {
let end_stream = DataEnvelope::EndStream {
description: self.description.clone(),
};
if let Err(_err) = self.sender.unbounded_send(end_stream) {
log::error!(
"Can't send `EndStream` to the worker actor from: {}",
self.description.path
);
}
}
}