pipebase/collect/
window.rs1use std::time::{Duration, Instant};
2
3use serde::Deserialize;
4use tokio::time::Interval;
5
6use super::Collect;
7use crate::common::{ConfigInto, FromConfig, FromPath, Period};
8use async_trait::async_trait;
9
10#[derive(Deserialize)]
11pub struct InMemoryWindowCollectorConfig {
12 size: Period,
13 slice: Period,
14}
15
16impl FromPath for InMemoryWindowCollectorConfig {}
17
18pub struct InstantContainer<T> {
19 t: T,
20 instant: Instant,
21}
22
23impl<T> InstantContainer<T> {
24 pub fn new(t: T) -> Self {
25 InstantContainer {
26 t,
27 instant: Instant::now(),
28 }
29 }
30
31 pub fn elapsed(&self) -> Duration {
32 self.instant.elapsed()
33 }
34
35 pub fn get(&self) -> &T {
36 &self.t
37 }
38}
39
40pub struct InMemoryWindowCollector<T> {
41 size: Duration,
42 slice: Duration,
43 buffer: Vec<InstantContainer<T>>,
44}
45
46impl<T> ConfigInto<InMemoryWindowCollector<T>> for InMemoryWindowCollectorConfig {}
47
48#[async_trait]
49impl<T> FromConfig<InMemoryWindowCollectorConfig> for InMemoryWindowCollector<T> {
50 async fn from_config(config: InMemoryWindowCollectorConfig) -> anyhow::Result<Self> {
51 Ok(InMemoryWindowCollector {
52 size: config.size.into(),
53 slice: config.slice.into(),
54 buffer: Vec::new(),
55 })
56 }
57}
58
59#[async_trait]
60impl<T> Collect<T, Vec<T>, InMemoryWindowCollectorConfig> for InMemoryWindowCollector<T>
61where
62 T: Clone + Send + 'static,
63{
64 async fn collect(&mut self, t: T) -> anyhow::Result<()> {
65 self.window_collect(t);
66 Ok(())
67 }
68
69 async fn flush(&mut self) -> anyhow::Result<Option<Vec<T>>> {
70 let items = self.flush_window();
71 if items.is_empty() {
72 return Ok(None);
73 }
74 Ok(Some(items))
75 }
76
77 fn get_flush_interval(&self) -> Interval {
78 tokio::time::interval(self.slice.to_owned())
79 }
80}
81
82impl<T> InMemoryWindowCollector<T>
83where
84 T: Clone,
85{
86 fn window_collect(&mut self, t: T) {
87 self.buffer.push(InstantContainer::new(t))
88 }
89
90 fn flush_window(&mut self) -> Vec<T> {
91 let mut cursor: usize = 0;
92 for item in &self.buffer {
93 let elapsed = item.elapsed();
94 if elapsed > self.size {
96 cursor += 1;
97 continue;
98 }
99 break;
100 }
101 self.buffer.drain(0..cursor);
102 self.buffer.iter().map(|item| item.get().clone()).collect()
103 }
104}
105
106#[cfg(test)]
107mod tests {
108
109 use crate::prelude::*;
110 use std::collections::HashMap;
111
112 #[tokio::test]
113 async fn test_window() {
114 let (tx0, rx0) = channel!(u128, 1024);
115 let (tx1, mut rx1) = channel!(Vec<u128>, 1024);
116 let channels0 = pipe_channels!([tx0]);
117 let channels1 = pipe_channels!(rx0, [tx1]);
118 let config0 = config!(TimerConfig, "resources/catalogs/timer.yml");
119 let config1 = config!(
120 InMemoryWindowCollectorConfig,
121 "resources/catalogs/window.yml"
122 );
123 let timer = poller!("timer");
124 let window = collector!("window");
125 let timer = run_pipe!(timer, config0, channels0);
126 let window = run_pipe!(window, config1, channels1);
127 join_pipes!([timer, window]);
128 let mut counts: HashMap<u128, usize> = HashMap::new();
129 while let Some(ticks) = rx1.recv().await {
130 for tick in ticks {
131 *counts.entry(tick).or_insert(0) += 1;
132 }
133 }
134 for i in 0..9 {
135 let count = counts.get(&(i as u128)).unwrap();
136 assert!(*count > 1 && *count <= 4)
137 }
138 }
139}