1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
//! This module contains a generic `Tracer`'s methods.
use crate::state::RILL_LINK;
use futures::channel::mpsc;
use meio::prelude::Action;
use rill_protocol::provider::{Description, Path, RillEvent, Timestamp};
use std::sync::Arc;
use std::time::SystemTime;
use tokio::sync::watch;

pub trait TracerEvent: Sized + Send + 'static {
    type State: Default + Send + 'static;
    fn aggregate(self, state: &mut Self::State, timestamp: Timestamp) -> Option<&RillEvent>;
    // TODO: Replace to `IntoIterator<RillEvent> for Self::State`?
    fn make_snapshot(state: &Self::State) -> Vec<RillEvent>;
}

#[derive(Debug)]
pub enum DataEnvelope<T> {
    Event { system_time: SystemTime, data: T },
}

impl<T: TracerEvent> Action for DataEnvelope<T> {}

// TODO: Remove that aliases and use raw types receivers in recorders.
pub type DataSender<T> = mpsc::UnboundedSender<DataEnvelope<T>>;
pub type DataReceiver<T> = mpsc::UnboundedReceiver<DataEnvelope<T>>;

/// The generic provider that forwards metrics to worker and keeps a flag
/// for checking the activitiy status of the `Tracer`.
#[derive(Debug)]
pub struct Tracer<T> {
    /// The receiver that used to activate/deactivate streams.
    active: watch::Receiver<bool>,
    description: Arc<Description>,
    sender: DataSender<T>,
}

impl<T: TracerEvent> Tracer<T> {
    pub(crate) fn new(description: Description) -> Self {
        // TODO: Remove this active watch channel?
        let (_active_tx, active_rx) = watch::channel(true);
        log::trace!("Creating Tracer with path: {:?}", description.path);
        let (tx, rx) = mpsc::unbounded();
        let description = Arc::new(description);
        let this = Tracer {
            active: active_rx,
            description: description.clone(),
            sender: tx,
        };
        if let Err(err) = RILL_LINK.register_tracer(description, rx) {
            log::error!(
                "Can't register a Tracer. The worker can be terminated already: {}",
                err
            );
        }
        this
    }

    /// Returns a reference to a `Path` of the `Tracer`.
    pub fn path(&self) -> &Path {
        &self.description.path
    }

    pub(crate) fn send(&self, data: T, opt_system_time: Option<SystemTime>) {
        if self.is_active() {
            let system_time = opt_system_time.unwrap_or_else(SystemTime::now);
            let envelope = DataEnvelope::Event { system_time, data };
            // And will never send an event
            if let Err(err) = self.sender.unbounded_send(envelope) {
                log::error!("Can't transfer data to sender: {}", err);
            }
        }
    }
}

impl<T> Tracer<T> {
    /// Returns `true` is the `Tracer` has to send data.
    pub fn is_active(&self) -> bool {
        *self.active.borrow()
    }

    /* TODO: Remove or replace with an alternative
    /// Use this method to detect when stream had activated.
    ///
    /// It's useful if you want to spawn async coroutine that
    /// can read a batch of data, but will wait when some streams
    /// will be activated to avoid resources wasting.
    ///
    /// When the generating coroutine active you can use `is_active`
    /// method to detect when to change it to awaiting state again.
    pub async fn when_activated(&mut self) -> Result<(), Error> {
        loop {
            if self.is_active() {
                break;
            }
            self.active.changed().await?;
        }
        Ok(())
    }
    */
}