pipebase 0.2.0

A tokio based runtime library for data integration app
Documentation
use std::time::{Duration, Instant};

use serde::Deserialize;
use tokio::time::Interval;

use super::Collect;
use crate::common::{ConfigInto, FromConfig, FromPath, Period};
use async_trait::async_trait;

#[derive(Deserialize)]
pub struct InMemoryWindowCollectorConfig {
    size: Period,
    slice: Period,
}

impl FromPath for InMemoryWindowCollectorConfig {}

pub struct InstantContainer<T> {
    t: T,
    instant: Instant,
}

impl<T> InstantContainer<T> {
    pub fn new(t: T) -> Self {
        InstantContainer {
            t,
            instant: Instant::now(),
        }
    }

    pub fn elapsed(&self) -> Duration {
        self.instant.elapsed()
    }

    pub fn get(&self) -> &T {
        &self.t
    }
}

pub struct InMemoryWindowCollector<T> {
    size: Duration,
    slice: Duration,
    buffer: Vec<InstantContainer<T>>,
}

impl<T> ConfigInto<InMemoryWindowCollector<T>> for InMemoryWindowCollectorConfig {}

#[async_trait]
impl<T> FromConfig<InMemoryWindowCollectorConfig> for InMemoryWindowCollector<T> {
    async fn from_config(config: InMemoryWindowCollectorConfig) -> anyhow::Result<Self> {
        Ok(InMemoryWindowCollector {
            size: config.size.into(),
            slice: config.slice.into(),
            buffer: Vec::new(),
        })
    }
}

#[async_trait]
impl<T> Collect<T, Vec<T>, InMemoryWindowCollectorConfig> for InMemoryWindowCollector<T>
where
    T: Clone + Send + 'static,
{
    async fn collect(&mut self, t: T) -> anyhow::Result<()> {
        self.window_collect(t);
        Ok(())
    }

    async fn flush(&mut self) -> anyhow::Result<Option<Vec<T>>> {
        let items = self.flush_window();
        if items.is_empty() {
            return Ok(None);
        }
        Ok(Some(items))
    }

    fn get_flush_interval(&self) -> Interval {
        tokio::time::interval(self.slice.to_owned())
    }
}

impl<T> InMemoryWindowCollector<T>
where
    T: Clone,
{
    fn window_collect(&mut self, t: T) {
        self.buffer.push(InstantContainer::new(t))
    }

    fn flush_window(&mut self) -> Vec<T> {
        let mut cursor: usize = 0;
        for item in &self.buffer {
            let elapsed = item.elapsed();
            // no longer in window
            if elapsed > self.size {
                cursor += 1;
                continue;
            }
            break;
        }
        self.buffer.drain(0..cursor);
        self.buffer.iter().map(|item| item.get().clone()).collect()
    }
}

#[cfg(test)]
mod tests {

    use crate::prelude::*;
    use std::collections::HashMap;

    #[tokio::test]
    async fn test_window() {
        let (tx0, rx0) = channel!(u128, 1024);
        let (tx1, mut rx1) = channel!(Vec<u128>, 1024);
        let channels0 = pipe_channels!([tx0]);
        let channels1 = pipe_channels!(rx0, [tx1]);
        let config0 = config!(TimerConfig, "resources/catalogs/timer.yml");
        let config1 = config!(
            InMemoryWindowCollectorConfig,
            "resources/catalogs/window.yml"
        );
        let timer = poller!("timer");
        let window = collector!("window");
        let timer = run_pipe!(timer, config0, channels0);
        let window = run_pipe!(window, config1, channels1);
        join_pipes!([timer, window]);
        let mut counts: HashMap<u128, usize> = HashMap::new();
        while let Some(ticks) = rx1.recv().await {
            for tick in ticks {
                *counts.entry(tick).or_insert(0) += 1;
            }
        }
        for i in 0..9 {
            let count = counts.get(&(i as u128)).unwrap();
            assert!(*count > 1 && *count <= 4)
        }
    }
}