1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
//! This module contains a generic `Tracer`'s methods.
use crate::actors::connector;
//use crate::actors::pool::{self, RillPoolTask};
use anyhow::Error;
use async_trait::async_trait;
use meio::Action;
use rill_protocol::flow::core::{self, ActionEnvelope, TimedEvent};
use rill_protocol::io::provider::{Description, Path, ProviderProtocol, Timestamp};
use rill_protocol::io::transport::Direction;
use std::sync::{Arc, Mutex, Weak};
use std::time::{Duration, SystemTime};
use tokio::sync::mpsc;

#[derive(Debug)]
pub(crate) struct EventEnvelope<T: core::Flow> {
    pub direction: Option<Direction<ProviderProtocol>>,
    pub event: T::Event,
}

pub(crate) enum ControlEvent<T> {
    Flush,
    AttachCallback { callback: BoxedCallback<T> },
    // AttachCallbackSender { sender: ActionSender<T> },
    DetachCallback,
}

impl<T: core::Flow> Action for EventEnvelope<T> {}

// TODO: Remove that aliases and use raw types receivers in recorders.
pub(crate) type DataSender<T> = mpsc::UnboundedSender<EventEnvelope<T>>;
pub(crate) type DataReceiver<T> = mpsc::UnboundedReceiver<EventEnvelope<T>>;

pub(crate) type ControlSender<T> = mpsc::UnboundedSender<ControlEvent<T>>;
pub(crate) type ControlReceiver<T> = mpsc::UnboundedReceiver<ControlEvent<T>>;

/// A sender for actions wrapped with an envelope.
pub type ActionSender<T> = mpsc::UnboundedSender<ActionEnvelope<T>>;
/// A receiver for actions.
pub type ActionReceiver<T> = mpsc::UnboundedReceiver<ActionEnvelope<T>>;

/// Creates a new control channel.
pub fn channel<T: core::Flow>() -> (ActionSender<T>, ActionReceiver<T>) {
    mpsc::unbounded_channel()
}

pub(crate) struct TracerOperator<T: core::Flow> {
    pub mode: TracerMode<T>,
    pub control_rx: Option<ControlReceiver<T>>,
}

pub(crate) enum TracerMode<T: core::Flow> {
    /// Real-time mode
    Push {
        state: T,
        receiver: Option<DataReceiver<T>>,
    },
    /// Pulling for intensive streams with high-load activities
    Pull {
        // TODO: Replace with `Arc` since data channel used
        // to detect Tracers's termination
        state: Weak<Mutex<T>>,
        interval: Duration,
    },
}

#[derive(Debug)]
enum InnerMode<T: core::Flow> {
    Push {
        // TODO: Add an optional buffer + flushing:
        // TODO: Also it's possible to add a special `AccumulatedDelta` subtype to `Flow`.
        // buffer: `Option<Vec<T>, usize>`,
        // if the `buffer` exists it does `autoflush`
        // or can be flushed manually by `tracer.flush()` call.
        sender: DataSender<T>,
    },
    Pull {
        state: Arc<Mutex<T>>,
    },
}

// TODO: Or require `Clone` for the `Flow` to derive this
impl<T: core::Flow> Clone for InnerMode<T> {
    fn clone(&self) -> Self {
        match self {
            Self::Push { sender } => Self::Push {
                sender: sender.clone(),
            },
            Self::Pull { state } => Self::Pull {
                state: state.clone(),
            },
        }
    }
}

/// The generic provider that forwards metrics to worker and keeps a flag
/// for checking the activitiy status of the `Tracer`.
#[derive(Debug)]
pub struct Tracer<T: core::Flow> {
    description: Arc<Description>,
    control_tx: ControlSender<T>,
    mode: InnerMode<T>,
}

impl<T: core::Flow> Clone for Tracer<T> {
    fn clone(&self) -> Self {
        Self {
            description: self.description.clone(),
            control_tx: self.control_tx.clone(),
            mode: self.mode.clone(),
        }
    }
}

// TODO: Not sure this is suitable for on-demand spawned recorders.
/// Both tracers are equal only if they use the same description.
/// That means they both have the same recorder/channel.
impl<T: core::Flow> PartialEq for Tracer<T> {
    fn eq(&self, other: &Self) -> bool {
        Arc::ptr_eq(&self.description, &other.description)
    }
}

impl<T: core::Flow> Eq for Tracer<T> {}

impl<T: core::Flow> Tracer<T> {
    /// Create a new `Tracer`
    pub fn new(state: T, path: Path, pull_interval: Option<Duration>) -> Self {
        if let Some(duration) = pull_interval {
            Self::new_pull(state, path, duration)
        } else {
            Self::new_push(state, path)
        }
    }

    /// Create a `Push` mode `Tracer`
    pub fn new_push(state: T, path: Path) -> Self {
        let (tx, rx) = mpsc::unbounded_channel();
        let mode = TracerMode::Push {
            state,
            receiver: Some(rx),
        };
        let inner_mode = InnerMode::Push { sender: tx };
        Self::new_inner(path, inner_mode, mode)
    }

    /// Create a `Pull` mode `Tracer`
    pub fn new_pull(state: T, path: Path, interval: Duration) -> Self {
        let state = Arc::new(Mutex::new(state));
        let mode = TracerMode::Pull {
            state: Arc::downgrade(&state),
            interval,
        };
        let inner_mode = InnerMode::Pull { state };
        Self::new_inner(path, inner_mode, mode)
    }

    fn new_inner(path: Path, inner_mode: InnerMode<T>, mode: TracerMode<T>) -> Self {
        let (control_tx, control_rx) = mpsc::unbounded_channel();
        let operator = TracerOperator {
            mode,
            control_rx: Some(control_rx),
        };
        let stream_type = T::stream_type();
        let info = format!("{} - {}", path, stream_type);
        let description = Description {
            path,
            info,
            stream_type,
        };
        log::trace!("Creating Tracer with path: {}", description.path);
        let description = Arc::new(description);
        let this = Tracer {
            description: description.clone(),
            control_tx,
            mode: inner_mode,
        };
        if let Err(err) = connector::DISTRIBUTOR.register_tracer(description, operator) {
            log::error!(
                "Can't register a Tracer. The worker can be terminated already: {}",
                err
            );
        }
        this
    }

    /// Returns a reference to a `Path` of the `Tracer`.
    pub fn path(&self) -> &Path {
        &self.description.path
    }

    /// Returns a reference to a `Description` of the `Tracer`.
    pub fn description(&self) -> &Description {
        &self.description
    }

    /// Send an event to a `Recorder`.
    pub fn send(&self, event: T::Event, direction: Option<Direction<ProviderProtocol>>) {
        match &self.mode {
            InnerMode::Push { sender, .. } => {
                let envelope = EventEnvelope { direction, event };
                // And will never send an event
                if let Err(err) = sender.send(envelope) {
                    log::error!("Can't transfer data to sender of {}: {}", self.path(), err);
                }
            }
            InnerMode::Pull { state, .. } => match state.lock() {
                // `direction` ignored always in the `Pull` mode
                Ok(ref mut state) => {
                    T::apply(state, event);
                }
                Err(err) => {
                    log::error!(
                        "Can't lock the mutex to apply the changes of {}: {}",
                        self.path(),
                        err
                    );
                }
            },
        }
    }

    /// Ask recorder to resend a state in the `Pull` mode.
    pub fn flush(&self) {
        let event = ControlEvent::Flush;
        if let Err(err) = self.control_tx.send(event) {
            log::error!("Can't send a flush event to {}: {}", self.path(), err);
        }
    }

    /*
    /// Registers a callback to the flow.
    pub fn callback<F>(&mut self, func: F)
    where
        F: Fn(ActionEnvelope<T>) + Send + 'static,
    {
        let callback = Callback {
            tracer: self.clone(),
            callback: func,
        };
        if let Err(err) = pool::DISTRIBUTOR.spawn_task(callback) {
            log::error!(
                "Can't spawn a Callback. The worker can be terminated already: {}",
                err
            );
        }
    }
    */

    /// Assign a callback
    pub fn sync_callback<F>(&self, callback: F)
    where
        F: Fn(ActionEnvelope<T>) -> Result<(), Error>,
        F: Send + Sync + 'static,
    {
        let sync_callback = SyncCallback::new(callback);
        let callback = Box::new(sync_callback);
        let event = ControlEvent::AttachCallback { callback };
        if let Err(err) = self.control_tx.send(event) {
            log::error!("Can't attach the callback from {}: {}", self.path(), err);
        }
    }

    /// Removes the callback
    pub fn detach_callback(&self) {
        let event = ControlEvent::DetachCallback;
        if let Err(err) = self.control_tx.send(event) {
            log::error!("Can't detach the callback from {}: {}", self.path(), err);
        }
    }
}

/// The callback that called on flow's incoming actions.
#[async_trait]
pub trait ActionCallback<T: core::Flow>: Send + Sync + 'static {
    /*
    /// When at least one connection exists.
    async fn awake(&mut self) {}

    /// When all clients disconnected.
    async fn suspend(&mut self) {}

    /// A method to handle an action.
    async fn handle_activity(
        &mut self,
        origin: ProviderReqId,
        activity: Activity<T>,
    ) -> Result<(), Error>;
    */

    /// A method to handle an action.
    async fn handle_activity(&mut self, envelope: ActionEnvelope<T>) -> Result<(), Error>;
}

/// Boxed callback.
pub type BoxedCallback<T> = Box<dyn ActionCallback<T>>;

/// Sync callback.
struct SyncCallback<F> {
    func: Arc<F>,
}

impl<F> SyncCallback<F> {
    fn new(func: F) -> Self {
        Self {
            func: Arc::new(func),
        }
    }
}

#[async_trait]
impl<T, F> ActionCallback<T> for SyncCallback<F>
where
    T: core::Flow,
    F: Fn(ActionEnvelope<T>) -> Result<(), Error>,
    F: Send + Sync + 'static,
{
    async fn handle_activity(&mut self, envelope: ActionEnvelope<T>) -> Result<(), Error> {
        let func = self.func.clone();
        tokio::task::spawn_blocking(move || func(envelope))
            .await
            .map_err(Error::from)
            .and_then(std::convert::identity)
    }
}

/*
struct Callback<T: core::Flow, F> {
    tracer: Tracer<T>,
    callback: F,
}

#[async_trait]
impl<T, F> RillPoolTask for Callback<T, F>
where
    T: core::Flow,
    F: Fn(ActionEnvelope<T>) + Send + 'static,
{
    async fn routine(mut self) -> Result<(), Error> {
        let mut stream = self.tracer.subscribe()?;
        loop {
            let envelope = stream.recv().await?;
            (self.callback)(envelope)
        }
    }
}
*/

/// Wraps with timed event
pub fn timed<T>(event: T) -> Option<TimedEvent<T>> {
    time_to_ts(None)
        .map(move |timestamp| TimedEvent { timestamp, event })
        .ok()
}

/// Generates a `Timestamp` of converts `SystemTime` to it.
// TODO: How to avoid errors here?
pub fn time_to_ts(opt_system_time: Option<SystemTime>) -> Result<Timestamp, Error> {
    opt_system_time
        .unwrap_or_else(SystemTime::now)
        .duration_since(SystemTime::UNIX_EPOCH)
        .map(Timestamp::from)
        .map_err(Error::from)
}