rill_engine/tracers/
tracer.rs

1//! This module contains a generic `Tracer`'s methods.
2use crate::actors::connector;
3use anyhow::Error;
4use async_trait::async_trait;
5use futures::Future;
6use meio::Action;
7use rill_protocol::flow::core::{ActionEnvelope, Flow, FlowMode};
8use rill_protocol::io::provider::{Description, Path, ProviderProtocol};
9use rill_protocol::io::transport::Direction;
10use std::sync::{Arc, Mutex, Weak};
11use std::time::Duration;
12use tokio::sync::mpsc;
13
14#[derive(Debug)]
15pub(crate) struct EventEnvelope<T: Flow> {
16    pub direction: Option<Direction<ProviderProtocol>>,
17    pub event: T::Event,
18}
19
20pub(crate) enum ControlEvent<T> {
21    Flush,
22    AttachCallback { callback: BoxedCallback<T> },
23    // AttachCallbackSender { sender: ActionSender<T> },
24    DetachCallback,
25}
26
27impl<T: Flow> Action for EventEnvelope<T> {}
28
29// TODO: Remove that aliases and use raw types receivers in recorders.
30pub(crate) type DataSender<T> = mpsc::UnboundedSender<EventEnvelope<T>>;
31pub(crate) type DataReceiver<T> = mpsc::UnboundedReceiver<EventEnvelope<T>>;
32
33pub(crate) type ControlSender<T> = mpsc::UnboundedSender<ControlEvent<T>>;
34pub(crate) type ControlReceiver<T> = mpsc::UnboundedReceiver<ControlEvent<T>>;
35
36/// A sender for actions wrapped with an envelope.
37pub type ActionSender<T> = mpsc::UnboundedSender<ActionEnvelope<T>>;
38/// A receiver for actions.
39pub type ActionReceiver<T> = mpsc::UnboundedReceiver<ActionEnvelope<T>>;
40
41/// Creates a new control channel.
42pub fn channel<T: Flow>() -> (ActionSender<T>, ActionReceiver<T>) {
43    mpsc::unbounded_channel()
44}
45
46pub(crate) struct TracerOperator<T: Flow> {
47    pub mode: TracerMode<T>,
48    pub control_rx: Option<ControlReceiver<T>>,
49}
50
51pub(crate) enum TracerMode<T: Flow> {
52    /// Real-time mode
53    Push {
54        state: T,
55        receiver: Option<DataReceiver<T>>,
56    },
57    /// Pulling for intensive streams with high-load activities
58    Pull {
59        // TODO: Replace with `Arc` since data channel used
60        // to detect Tracers's termination
61        state: Weak<Mutex<T>>,
62        interval: Option<Duration>,
63    },
64}
65
66#[derive(Debug)]
67enum InnerMode<T: Flow> {
68    Push {
69        // TODO: Add an optional buffer + flushing:
70        // TODO: Also it's possible to add a special `AccumulatedDelta` subtype to `Flow`.
71        // buffer: `Option<Vec<T>, usize>`,
72        // if the `buffer` exists it does `autoflush`
73        // or can be flushed manually by `tracer.flush()` call.
74        sender: DataSender<T>,
75    },
76    Pull {
77        state: Arc<Mutex<T>>,
78    },
79}
80
81// TODO: Or require `Clone` for the `Flow` to derive this
82impl<T: Flow> Clone for InnerMode<T> {
83    fn clone(&self) -> Self {
84        match self {
85            Self::Push { sender } => Self::Push {
86                sender: sender.clone(),
87            },
88            Self::Pull { state } => Self::Pull {
89                state: state.clone(),
90            },
91        }
92    }
93}
94
95/// The generic provider that forwards metrics to worker and keeps a flag
96/// for checking the activitiy status of the `Tracer`.
97#[derive(Debug)]
98pub struct Tracer<T: Flow> {
99    description: Arc<Description>,
100    control_tx: ControlSender<T>,
101    mode: InnerMode<T>,
102}
103
104impl<T: Flow> Clone for Tracer<T> {
105    fn clone(&self) -> Self {
106        Self {
107            description: self.description.clone(),
108            control_tx: self.control_tx.clone(),
109            mode: self.mode.clone(),
110        }
111    }
112}
113
114// TODO: Not sure this is suitable for on-demand spawned recorders.
115/// Both tracers are equal only if they use the same description.
116/// That means they both have the same recorder/channel.
117impl<T: Flow> PartialEq for Tracer<T> {
118    fn eq(&self, other: &Self) -> bool {
119        Arc::ptr_eq(&self.description, &other.description)
120    }
121}
122
123impl<T: Flow> Eq for Tracer<T> {}
124
125impl<T: Flow> Tracer<T> {
126    /// Create a new `Tracer`
127    pub fn new(state: T, path: Path, mode: FlowMode) -> Self {
128        match mode {
129            FlowMode::Realtime => Self::new_push(state, path),
130            FlowMode::Throttle { ms } => {
131                Self::new_pull(state, path, Some(Duration::from_millis(ms)))
132            }
133            FlowMode::FlushOnly => Self::new_pull(state, path, None),
134        }
135    }
136
137    /// Create a `Push` mode `Tracer`
138    pub fn new_push(state: T, path: Path) -> Self {
139        let (tx, rx) = mpsc::unbounded_channel();
140        let mode = TracerMode::Push {
141            state,
142            receiver: Some(rx),
143        };
144        let inner_mode = InnerMode::Push { sender: tx };
145        Self::new_inner(path, inner_mode, mode)
146    }
147
148    /// Create a `Pull` mode `Tracer`
149    pub fn new_pull(state: T, path: Path, interval: Option<Duration>) -> Self {
150        let state = Arc::new(Mutex::new(state));
151        let mode = TracerMode::Pull {
152            state: Arc::downgrade(&state),
153            interval,
154        };
155        let inner_mode = InnerMode::Pull { state };
156        Self::new_inner(path, inner_mode, mode)
157    }
158
159    fn new_inner(path: Path, inner_mode: InnerMode<T>, mode: TracerMode<T>) -> Self {
160        let (control_tx, control_rx) = mpsc::unbounded_channel();
161        let operator = TracerOperator {
162            mode,
163            control_rx: Some(control_rx),
164        };
165        let stream_type = T::stream_type();
166        let description = Description { path, stream_type };
167        log::trace!("Creating Tracer with path: {}", description.path);
168        let description = Arc::new(description);
169        let this = Tracer {
170            description: description.clone(),
171            control_tx,
172            mode: inner_mode,
173        };
174        if let Err(err) = connector::DISTRIBUTOR.register_tracer(description, operator) {
175            log::error!(
176                "Can't register a Tracer. The worker can be terminated already: {}",
177                err
178            );
179        }
180        this
181    }
182
183    /// Returns a reference to a `Path` of the `Tracer`.
184    pub fn path(&self) -> &Path {
185        &self.description.path
186    }
187
188    /// Returns a reference to a `Description` of the `Tracer`.
189    pub fn description(&self) -> &Description {
190        &self.description
191    }
192
193    /// Send an event to a `Recorder`.
194    // TODO: Consider using explicit direction value. What sould Broadcast be?
195    pub fn send(&self, event: T::Event, direction: Option<Direction<ProviderProtocol>>) {
196        match &self.mode {
197            InnerMode::Push { sender, .. } => {
198                let envelope = EventEnvelope { direction, event };
199                // And will never send an event
200                if let Err(err) = sender.send(envelope) {
201                    log::error!("Can't transfer data to sender of {}: {}", self.path(), err);
202                }
203            }
204            InnerMode::Pull { state, .. } => match state.lock() {
205                // `direction` ignored always in the `Pull` mode
206                Ok(ref mut state) => {
207                    T::apply(state, event);
208                }
209                Err(err) => {
210                    log::error!(
211                        "Can't lock the mutex to apply the changes of {}: {}",
212                        self.path(),
213                        err
214                    );
215                }
216            },
217        }
218    }
219
220    /// Ask recorder to resend a state in the `Pull` mode.
221    pub fn flush(&self) {
222        let event = ControlEvent::Flush;
223        if let Err(err) = self.control_tx.send(event) {
224            log::error!("Can't send a flush event to {}: {}", self.path(), err);
225        }
226    }
227
228    /*
229    /// Registers a callback to the flow.
230    pub fn callback<F>(&mut self, func: F)
231    where
232        F: Fn(ActionEnvelope<T>) + Send + 'static,
233    {
234        let callback = Callback {
235            tracer: self.clone(),
236            callback: func,
237        };
238        if let Err(err) = pool::DISTRIBUTOR.spawn_task(callback) {
239            log::error!(
240                "Can't spawn a Callback. The worker can be terminated already: {}",
241                err
242            );
243        }
244    }
245    */
246
247    /// Assign a sync callback
248    pub fn sync_callback<F>(&self, callback: F)
249    where
250        F: Fn(ActionEnvelope<T>) -> Result<(), Error>,
251        F: Send + Sync + 'static,
252    {
253        let sync_callback = SyncCallback::new(callback);
254        let callback = Box::new(sync_callback);
255        let event = ControlEvent::AttachCallback { callback };
256        if let Err(err) = self.control_tx.send(event) {
257            log::error!("Can't attach the callback from {}: {}", self.path(), err);
258        }
259    }
260
261    /// Assign an async callback
262    pub fn async_callback<F, Fut>(&self, callback: F)
263    where
264        F: Fn(ActionEnvelope<T>) -> Fut,
265        F: Send + Sync + 'static,
266        Fut: Future<Output = Result<(), Error>>,
267        Fut: Send + 'static,
268    {
269        let async_callback = AsyncCallback::new(callback);
270        let callback = Box::new(async_callback);
271        let event = ControlEvent::AttachCallback { callback };
272        if let Err(err) = self.control_tx.send(event) {
273            log::error!("Can't attach the callback from {}: {}", self.path(), err);
274        }
275    }
276
277    /// Removes the callback
278    pub fn detach_callback(&self) {
279        let event = ControlEvent::DetachCallback;
280        if let Err(err) = self.control_tx.send(event) {
281            log::error!("Can't detach the callback from {}: {}", self.path(), err);
282        }
283    }
284}
285
286/// The callback that called on flow's incoming actions.
287#[async_trait]
288pub trait ActionCallback<T: Flow>: Send + 'static {
289    /*
290    /// When at least one connection exists.
291    async fn awake(&mut self) {}
292
293    /// When all clients disconnected.
294    async fn suspend(&mut self) {}
295
296    /// A method to handle an action.
297    async fn handle_activity(
298        &mut self,
299        origin: ProviderReqId,
300        activity: Activity<T>,
301    ) -> Result<(), Error>;
302    */
303
304    // TODO: Make it sync
305    /// A method to handle an action.
306    async fn handle_activity(&mut self, envelope: ActionEnvelope<T>) -> Result<(), Error>;
307}
308
309/// Boxed callback.
310pub type BoxedCallback<T> = Box<dyn ActionCallback<T>>;
311
312/// Sync callback.
313struct SyncCallback<F> {
314    func: Arc<F>,
315}
316
317impl<F> SyncCallback<F> {
318    fn new(func: F) -> Self {
319        Self {
320            func: Arc::new(func),
321        }
322    }
323}
324
325#[async_trait]
326impl<T, F> ActionCallback<T> for SyncCallback<F>
327where
328    T: Flow,
329    F: Fn(ActionEnvelope<T>) -> Result<(), Error>,
330    F: Send + Sync + 'static,
331{
332    async fn handle_activity(&mut self, envelope: ActionEnvelope<T>) -> Result<(), Error> {
333        let func = self.func.clone();
334        tokio::task::spawn_blocking(move || func(envelope))
335            .await
336            .map_err(Error::from)
337            .and_then(std::convert::identity)
338    }
339}
340
341use std::marker::PhantomData;
342
343struct AsyncCallback<F, Fut> {
344    func: F,
345    fut: PhantomData<Fut>,
346}
347
348impl<F, Fut> AsyncCallback<F, Fut> {
349    fn new(func: F) -> Self {
350        Self {
351            func,
352            fut: PhantomData,
353        }
354    }
355}
356
357#[async_trait]
358impl<T, F, Fut> ActionCallback<T> for AsyncCallback<F, Fut>
359where
360    T: Flow,
361    F: Fn(ActionEnvelope<T>) -> Fut,
362    F: Send + Sync + 'static,
363    Fut: Future<Output = Result<(), Error>>,
364    Fut: Send + 'static,
365{
366    async fn handle_activity(&mut self, envelope: ActionEnvelope<T>) -> Result<(), Error> {
367        (self.func)(envelope).await
368    }
369}