pipebase/collect/
window.rs

1use std::time::{Duration, Instant};
2
3use serde::Deserialize;
4use tokio::time::Interval;
5
6use super::Collect;
7use crate::common::{ConfigInto, FromConfig, FromPath, Period};
8use async_trait::async_trait;
9
10#[derive(Deserialize)]
11pub struct InMemoryWindowCollectorConfig {
12    size: Period,
13    slice: Period,
14}
15
16impl FromPath for InMemoryWindowCollectorConfig {}
17
18pub struct InstantContainer<T> {
19    t: T,
20    instant: Instant,
21}
22
23impl<T> InstantContainer<T> {
24    pub fn new(t: T) -> Self {
25        InstantContainer {
26            t,
27            instant: Instant::now(),
28        }
29    }
30
31    pub fn elapsed(&self) -> Duration {
32        self.instant.elapsed()
33    }
34
35    pub fn get(&self) -> &T {
36        &self.t
37    }
38}
39
40pub struct InMemoryWindowCollector<T> {
41    size: Duration,
42    slice: Duration,
43    buffer: Vec<InstantContainer<T>>,
44}
45
46impl<T> ConfigInto<InMemoryWindowCollector<T>> for InMemoryWindowCollectorConfig {}
47
48#[async_trait]
49impl<T> FromConfig<InMemoryWindowCollectorConfig> for InMemoryWindowCollector<T> {
50    async fn from_config(config: InMemoryWindowCollectorConfig) -> anyhow::Result<Self> {
51        Ok(InMemoryWindowCollector {
52            size: config.size.into(),
53            slice: config.slice.into(),
54            buffer: Vec::new(),
55        })
56    }
57}
58
59#[async_trait]
60impl<T> Collect<T, Vec<T>, InMemoryWindowCollectorConfig> for InMemoryWindowCollector<T>
61where
62    T: Clone + Send + 'static,
63{
64    async fn collect(&mut self, t: T) -> anyhow::Result<()> {
65        self.window_collect(t);
66        Ok(())
67    }
68
69    async fn flush(&mut self) -> anyhow::Result<Option<Vec<T>>> {
70        let items = self.flush_window();
71        if items.is_empty() {
72            return Ok(None);
73        }
74        Ok(Some(items))
75    }
76
77    fn get_flush_interval(&self) -> Interval {
78        tokio::time::interval(self.slice.to_owned())
79    }
80}
81
82impl<T> InMemoryWindowCollector<T>
83where
84    T: Clone,
85{
86    fn window_collect(&mut self, t: T) {
87        self.buffer.push(InstantContainer::new(t))
88    }
89
90    fn flush_window(&mut self) -> Vec<T> {
91        let mut cursor: usize = 0;
92        for item in &self.buffer {
93            let elapsed = item.elapsed();
94            // no longer in window
95            if elapsed > self.size {
96                cursor += 1;
97                continue;
98            }
99            break;
100        }
101        self.buffer.drain(0..cursor);
102        self.buffer.iter().map(|item| item.get().clone()).collect()
103    }
104}
105
106#[cfg(test)]
107mod tests {
108
109    use crate::prelude::*;
110    use std::collections::HashMap;
111
112    #[tokio::test]
113    async fn test_window() {
114        let (tx0, rx0) = channel!(u128, 1024);
115        let (tx1, mut rx1) = channel!(Vec<u128>, 1024);
116        let channels0 = pipe_channels!([tx0]);
117        let channels1 = pipe_channels!(rx0, [tx1]);
118        let config0 = config!(TimerConfig, "resources/catalogs/timer.yml");
119        let config1 = config!(
120            InMemoryWindowCollectorConfig,
121            "resources/catalogs/window.yml"
122        );
123        let timer = poller!("timer");
124        let window = collector!("window");
125        let timer = run_pipe!(timer, config0, channels0);
126        let window = run_pipe!(window, config1, channels1);
127        join_pipes!([timer, window]);
128        let mut counts: HashMap<u128, usize> = HashMap::new();
129        while let Some(ticks) = rx1.recv().await {
130            for tick in ticks {
131                *counts.entry(tick).or_insert(0) += 1;
132            }
133        }
134        for i in 0..9 {
135            let count = counts.get(&(i as u128)).unwrap();
136            assert!(*count > 1 && *count <= 4)
137        }
138    }
139}