1use crate::actors::connector;
3use anyhow::Error;
4use async_trait::async_trait;
5use futures::Future;
6use meio::Action;
7use rill_protocol::flow::core::{ActionEnvelope, Flow, FlowMode};
8use rill_protocol::io::provider::{Description, Path, ProviderProtocol};
9use rill_protocol::io::transport::Direction;
10use std::sync::{Arc, Mutex, Weak};
11use std::time::Duration;
12use tokio::sync::mpsc;
13
14#[derive(Debug)]
15pub(crate) struct EventEnvelope<T: Flow> {
16 pub direction: Option<Direction<ProviderProtocol>>,
17 pub event: T::Event,
18}
19
20pub(crate) enum ControlEvent<T> {
21 Flush,
22 AttachCallback { callback: BoxedCallback<T> },
23 DetachCallback,
25}
26
27impl<T: Flow> Action for EventEnvelope<T> {}
28
29pub(crate) type DataSender<T> = mpsc::UnboundedSender<EventEnvelope<T>>;
31pub(crate) type DataReceiver<T> = mpsc::UnboundedReceiver<EventEnvelope<T>>;
32
33pub(crate) type ControlSender<T> = mpsc::UnboundedSender<ControlEvent<T>>;
34pub(crate) type ControlReceiver<T> = mpsc::UnboundedReceiver<ControlEvent<T>>;
35
36pub type ActionSender<T> = mpsc::UnboundedSender<ActionEnvelope<T>>;
38pub type ActionReceiver<T> = mpsc::UnboundedReceiver<ActionEnvelope<T>>;
40
41pub fn channel<T: Flow>() -> (ActionSender<T>, ActionReceiver<T>) {
43 mpsc::unbounded_channel()
44}
45
46pub(crate) struct TracerOperator<T: Flow> {
47 pub mode: TracerMode<T>,
48 pub control_rx: Option<ControlReceiver<T>>,
49}
50
51pub(crate) enum TracerMode<T: Flow> {
52 Push {
54 state: T,
55 receiver: Option<DataReceiver<T>>,
56 },
57 Pull {
59 state: Weak<Mutex<T>>,
62 interval: Option<Duration>,
63 },
64}
65
66#[derive(Debug)]
67enum InnerMode<T: Flow> {
68 Push {
69 sender: DataSender<T>,
75 },
76 Pull {
77 state: Arc<Mutex<T>>,
78 },
79}
80
81impl<T: Flow> Clone for InnerMode<T> {
83 fn clone(&self) -> Self {
84 match self {
85 Self::Push { sender } => Self::Push {
86 sender: sender.clone(),
87 },
88 Self::Pull { state } => Self::Pull {
89 state: state.clone(),
90 },
91 }
92 }
93}
94
95#[derive(Debug)]
98pub struct Tracer<T: Flow> {
99 description: Arc<Description>,
100 control_tx: ControlSender<T>,
101 mode: InnerMode<T>,
102}
103
104impl<T: Flow> Clone for Tracer<T> {
105 fn clone(&self) -> Self {
106 Self {
107 description: self.description.clone(),
108 control_tx: self.control_tx.clone(),
109 mode: self.mode.clone(),
110 }
111 }
112}
113
114impl<T: Flow> PartialEq for Tracer<T> {
118 fn eq(&self, other: &Self) -> bool {
119 Arc::ptr_eq(&self.description, &other.description)
120 }
121}
122
123impl<T: Flow> Eq for Tracer<T> {}
124
125impl<T: Flow> Tracer<T> {
126 pub fn new(state: T, path: Path, mode: FlowMode) -> Self {
128 match mode {
129 FlowMode::Realtime => Self::new_push(state, path),
130 FlowMode::Throttle { ms } => {
131 Self::new_pull(state, path, Some(Duration::from_millis(ms)))
132 }
133 FlowMode::FlushOnly => Self::new_pull(state, path, None),
134 }
135 }
136
137 pub fn new_push(state: T, path: Path) -> Self {
139 let (tx, rx) = mpsc::unbounded_channel();
140 let mode = TracerMode::Push {
141 state,
142 receiver: Some(rx),
143 };
144 let inner_mode = InnerMode::Push { sender: tx };
145 Self::new_inner(path, inner_mode, mode)
146 }
147
148 pub fn new_pull(state: T, path: Path, interval: Option<Duration>) -> Self {
150 let state = Arc::new(Mutex::new(state));
151 let mode = TracerMode::Pull {
152 state: Arc::downgrade(&state),
153 interval,
154 };
155 let inner_mode = InnerMode::Pull { state };
156 Self::new_inner(path, inner_mode, mode)
157 }
158
159 fn new_inner(path: Path, inner_mode: InnerMode<T>, mode: TracerMode<T>) -> Self {
160 let (control_tx, control_rx) = mpsc::unbounded_channel();
161 let operator = TracerOperator {
162 mode,
163 control_rx: Some(control_rx),
164 };
165 let stream_type = T::stream_type();
166 let description = Description { path, stream_type };
167 log::trace!("Creating Tracer with path: {}", description.path);
168 let description = Arc::new(description);
169 let this = Tracer {
170 description: description.clone(),
171 control_tx,
172 mode: inner_mode,
173 };
174 if let Err(err) = connector::DISTRIBUTOR.register_tracer(description, operator) {
175 log::error!(
176 "Can't register a Tracer. The worker can be terminated already: {}",
177 err
178 );
179 }
180 this
181 }
182
183 pub fn path(&self) -> &Path {
185 &self.description.path
186 }
187
188 pub fn description(&self) -> &Description {
190 &self.description
191 }
192
193 pub fn send(&self, event: T::Event, direction: Option<Direction<ProviderProtocol>>) {
196 match &self.mode {
197 InnerMode::Push { sender, .. } => {
198 let envelope = EventEnvelope { direction, event };
199 if let Err(err) = sender.send(envelope) {
201 log::error!("Can't transfer data to sender of {}: {}", self.path(), err);
202 }
203 }
204 InnerMode::Pull { state, .. } => match state.lock() {
205 Ok(ref mut state) => {
207 T::apply(state, event);
208 }
209 Err(err) => {
210 log::error!(
211 "Can't lock the mutex to apply the changes of {}: {}",
212 self.path(),
213 err
214 );
215 }
216 },
217 }
218 }
219
220 pub fn flush(&self) {
222 let event = ControlEvent::Flush;
223 if let Err(err) = self.control_tx.send(event) {
224 log::error!("Can't send a flush event to {}: {}", self.path(), err);
225 }
226 }
227
228 pub fn sync_callback<F>(&self, callback: F)
249 where
250 F: Fn(ActionEnvelope<T>) -> Result<(), Error>,
251 F: Send + Sync + 'static,
252 {
253 let sync_callback = SyncCallback::new(callback);
254 let callback = Box::new(sync_callback);
255 let event = ControlEvent::AttachCallback { callback };
256 if let Err(err) = self.control_tx.send(event) {
257 log::error!("Can't attach the callback from {}: {}", self.path(), err);
258 }
259 }
260
261 pub fn async_callback<F, Fut>(&self, callback: F)
263 where
264 F: Fn(ActionEnvelope<T>) -> Fut,
265 F: Send + Sync + 'static,
266 Fut: Future<Output = Result<(), Error>>,
267 Fut: Send + 'static,
268 {
269 let async_callback = AsyncCallback::new(callback);
270 let callback = Box::new(async_callback);
271 let event = ControlEvent::AttachCallback { callback };
272 if let Err(err) = self.control_tx.send(event) {
273 log::error!("Can't attach the callback from {}: {}", self.path(), err);
274 }
275 }
276
277 pub fn detach_callback(&self) {
279 let event = ControlEvent::DetachCallback;
280 if let Err(err) = self.control_tx.send(event) {
281 log::error!("Can't detach the callback from {}: {}", self.path(), err);
282 }
283 }
284}
285
286#[async_trait]
288pub trait ActionCallback<T: Flow>: Send + 'static {
289 async fn handle_activity(&mut self, envelope: ActionEnvelope<T>) -> Result<(), Error>;
307}
308
309pub type BoxedCallback<T> = Box<dyn ActionCallback<T>>;
311
312struct SyncCallback<F> {
314 func: Arc<F>,
315}
316
317impl<F> SyncCallback<F> {
318 fn new(func: F) -> Self {
319 Self {
320 func: Arc::new(func),
321 }
322 }
323}
324
325#[async_trait]
326impl<T, F> ActionCallback<T> for SyncCallback<F>
327where
328 T: Flow,
329 F: Fn(ActionEnvelope<T>) -> Result<(), Error>,
330 F: Send + Sync + 'static,
331{
332 async fn handle_activity(&mut self, envelope: ActionEnvelope<T>) -> Result<(), Error> {
333 let func = self.func.clone();
334 tokio::task::spawn_blocking(move || func(envelope))
335 .await
336 .map_err(Error::from)
337 .and_then(std::convert::identity)
338 }
339}
340
341use std::marker::PhantomData;
342
343struct AsyncCallback<F, Fut> {
344 func: F,
345 fut: PhantomData<Fut>,
346}
347
348impl<F, Fut> AsyncCallback<F, Fut> {
349 fn new(func: F) -> Self {
350 Self {
351 func,
352 fut: PhantomData,
353 }
354 }
355}
356
357#[async_trait]
358impl<T, F, Fut> ActionCallback<T> for AsyncCallback<F, Fut>
359where
360 T: Flow,
361 F: Fn(ActionEnvelope<T>) -> Fut,
362 F: Send + Sync + 'static,
363 Fut: Future<Output = Result<(), Error>>,
364 Fut: Send + 'static,
365{
366 async fn handle_activity(&mut self, envelope: ActionEnvelope<T>) -> Result<(), Error> {
367 (self.func)(envelope).await
368 }
369}