1use arrow::record_batch::RecordBatch;
2use std::collections::VecDeque;
3use std::time::{Duration, Instant};
4
5pub struct WindowBuffer {
6 window_duration: Duration,
7 batches: VecDeque<(Instant, RecordBatch)>,
8}
9
10impl WindowBuffer {
11 pub fn new(seconds: u64) -> Self {
12 Self {
13 window_duration: Duration::from_secs(seconds),
14 batches: VecDeque::new(),
15 }
16 }
17
18 pub fn add_batch(&mut self, batch: RecordBatch) {
19 self.batches.push_back((Instant::now(), batch));
20 self.prune();
21 }
22
23 pub fn prune(&mut self) {
24 let now = Instant::now();
25 while let Some((timestamp, _)) = self.batches.front() {
26 if now.duration_since(*timestamp) > self.window_duration {
27 self.batches.pop_front();
28 } else {
29 break;
30 }
31 }
32 }
33
34 pub fn get_batches(&self) -> Vec<RecordBatch> {
35 self.batches.iter().map(|(_, b)| b.clone()).collect()
36 }
37
38 pub fn is_empty(&self) -> bool {
39 self.batches.is_empty()
40 }
41}