fuse_rule/
window.rs

1use arrow::record_batch::RecordBatch;
2use std::collections::VecDeque;
3use std::time::{Duration, Instant};
4
5pub struct WindowBuffer {
6    window_duration: Duration,
7    batches: VecDeque<(Instant, RecordBatch)>,
8}
9
10impl WindowBuffer {
11    pub fn new(seconds: u64) -> Self {
12        Self {
13            window_duration: Duration::from_secs(seconds),
14            batches: VecDeque::new(),
15        }
16    }
17
18    pub fn add_batch(&mut self, batch: RecordBatch) {
19        self.batches.push_back((Instant::now(), batch));
20        self.prune();
21    }
22
23    pub fn prune(&mut self) {
24        let now = Instant::now();
25        while let Some((timestamp, _)) = self.batches.front() {
26            if now.duration_since(*timestamp) > self.window_duration {
27                self.batches.pop_front();
28            } else {
29                break;
30            }
31        }
32    }
33
34    pub fn get_batches(&self) -> Vec<RecordBatch> {
35        self.batches.iter().map(|(_, b)| b.clone()).collect()
36    }
37
38    pub fn is_empty(&self) -> bool {
39        self.batches.is_empty()
40    }
41}