1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
use crate::actors::pool::{RillPoolTask, DISTRIBUTOR};
use crate::tracers::tracer::{self, ControlReceiver, ControlSender};
use anyhow::Error;
use async_trait::async_trait;
use futures::{Stream, StreamExt};
use rill_protocol::flow::core;
use rill_protocol::flow::core::ActionEnvelope;
use std::sync::Arc;
use tokio_stream::wrappers::UnboundedReceiverStream;
pub struct Link<T: core::Flow> {
tx: ControlSender<T>,
rx: ControlReceiver<T>,
}
impl<T: core::Flow> Link<T> {
pub fn new() -> Self {
let (tx, rx) = tracer::channel();
Self { tx, rx }
}
pub fn sender(&self) -> ControlSender<T> {
self.tx.clone()
}
pub fn receiver(self) -> ControlReceiver<T> {
self.rx
}
pub fn actions(self) -> impl Stream<Item = T::Action> {
let stream = UnboundedReceiverStream::new(self.rx);
stream.filter_map(|envelope| async move { envelope.activity.to_action() })
}
pub fn sync<F>(self, callback: F) -> Result<(), Error>
where
F: SyncCallback<T>,
{
let sync_callback = SyncCallbackTask {
rx: self.rx,
callback: Arc::new(callback),
};
DISTRIBUTOR.spawn_task(sync_callback)?;
Ok(())
}
}
pub trait SyncCallback<T: core::Flow>: Sync + Send + 'static {
fn execute(&self, envelope: ActionEnvelope<T>) -> Result<(), Error>;
}
impl<F, T> SyncCallback<T> for F
where
T: core::Flow,
F: Fn(ActionEnvelope<T>) -> Result<(), Error>,
F: Sync + Send + 'static,
{
fn execute(&self, envelope: ActionEnvelope<T>) -> Result<(), Error> {
(self)(envelope)
}
}
struct SyncCallbackTask<T, F>
where
T: core::Flow,
{
rx: ControlReceiver<T>,
callback: Arc<F>,
}
#[async_trait]
impl<T, F> RillPoolTask for SyncCallbackTask<T, F>
where
T: core::Flow,
F: SyncCallback<T>,
{
async fn routine(mut self) -> Result<(), Error> {
while let Some(envelope) = self.rx.recv().await {
let callback = self.callback.clone();
let res = tokio::task::spawn_blocking(move || callback.execute(envelope))
.await
.map_err(Error::from)
.and_then(std::convert::identity);
if let Err(err) = res {
log::error!("Sync callback failed with: {}", err);
}
}
Ok(())
}
}