1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
//! This module contains a generic `Provider`'s methods.
use crate::state::{ProviderMode, RegisterProvider, RILL_STATE};
use anyhow::Error;
use futures::channel::mpsc;
use meio::prelude::Action;
use rill_protocol::provider::{Description, Path, RillData};
use std::sync::Arc;
use std::time::SystemTime;
use tokio::sync::watch;

#[derive(Debug)]
pub(crate) enum DataEnvelope {
    DataEvent {
        timestamp: SystemTime,
        data: RillData,
    },
    EndStream {
        description: Arc<Description>,
    },
}

impl Action for DataEnvelope {}

pub(crate) type DataSender = mpsc::UnboundedSender<DataEnvelope>;
pub(crate) type DataReceiver = mpsc::UnboundedReceiver<DataEnvelope>;

/// The generic provider that forwards metrics to worker and keeps a flag
/// for checking the activitiy status of the `Provider`.
#[derive(Debug)]
pub struct Provider {
    /// The receiver that used to activate/deactivate streams.
    active: watch::Receiver<bool>,
    description: Arc<Description>,
    sender: DataSender,
}

impl Provider {
    pub(crate) fn new(description: Description, active: bool) -> Self {
        log::trace!("Creating Provider with path: {:?}", description.path);
        let (tx, rx) = mpsc::unbounded();
        let (active_tx, active_rx) = watch::channel(active);
        let description = Arc::new(description);
        let this = Provider {
            active: active_rx,
            description: description.clone(),
            sender: tx,
        };
        let mode = {
            if active {
                ProviderMode::Active
            } else {
                ProviderMode::Reactive {
                    activator: active_tx,
                }
            }
        };
        let event = RegisterProvider {
            description,
            mode,
            rx,
        };
        let state = RILL_STATE.get().expect("rill is not installed!");
        state.send(event);
        this
    }

    /// Returns a reference to a `Path` of the `Provider`.
    pub fn path(&self) -> &Path {
        &self.description.path
    }

    pub(crate) fn send(&self, data: RillData, timestamp: Option<SystemTime>) {
        if *self.active.borrow() {
            let timestamp = timestamp.unwrap_or_else(SystemTime::now);
            let envelope = DataEnvelope::DataEvent { timestamp, data };
            if let Err(err) = self.sender.unbounded_send(envelope) {
                log::error!("Can't transfer data to sender: {}", err);
            }
        }
    }
}

impl Provider {
    /// Returns `true` is the `Provider` has to send data.
    pub fn is_active(&self) -> bool {
        *self.active.borrow()
    }

    /// Use this method to detect when stream had activated.
    ///
    /// It's useful if you want to spawn async coroutine that
    /// can read a batch of data, but will wait when some streams
    /// will be activated to avoid resources wasting.
    ///
    /// When the generating coroutine active you can use `is_active`
    /// method to detect when to change it to awaiting state again.
    pub async fn when_activated(&mut self) -> Result<(), Error> {
        loop {
            if self.is_active() {
                break;
            }
            self.active.changed().await?;
        }
        Ok(())
    }
}

impl Drop for Provider {
    fn drop(&mut self) {
        let end_stream = DataEnvelope::EndStream {
            description: self.description.clone(),
        };
        if let Err(_err) = self.sender.unbounded_send(end_stream) {
            log::error!(
                "Can't send `EndStream` to the worker actor from: {}",
                self.description.path
            );
        }
    }
}