stream_window/
lib.rs

1//! Stream windows build on top of async streams.
2//! Implementations offered:
3//! - tumbling window, waits till window of specified size is filled, emits, and starts a new window
4//! - sliding window, waits till window of specified size is filled, emits, and slides down by one element
5//! - periodic window, waits till window of specified size is filled, emits, and starts a new window on a clock tick
6
7use async_stream::stream;
8use futures::StreamExt;
9use std::{collections::VecDeque, pin::Pin};
10use tokio::select;
11use tokio_stream::Stream;
12
13/// Tumbling window, waits till window of specified size is filled, emits, and starts a new window.
14pub fn to_tumbling_window<'a, T: Clone + 'a>(
15    mut stream: impl Stream<Item = T> + Unpin + 'a,
16    window_size: usize,
17) -> impl Stream<Item = Vec<T>> + 'a {
18    let mut buffer = Vec::with_capacity(window_size);
19    stream! {
20        while let Some(element) = stream.next().await {
21            buffer.push(element);
22            if buffer.len() == window_size {
23                yield buffer.split_off(0);
24            }
25        }
26    }
27}
28
29/// Sliding window, waits till window of specified size is filled, emits, and slides down by one element.
30pub fn to_sliding_window<'a, T: Clone + 'a>(
31    mut stream: impl Stream<Item = T> + Unpin + 'a,
32    window_size: usize,
33) -> impl Stream<Item = Vec<T>> + 'a {
34    let mut buffer = VecDeque::with_capacity(window_size);
35    stream! {
36        while let Some(element) = stream.next().await {
37            buffer.push_back(element);
38            if buffer.len() == window_size {
39                yield buffer.iter().cloned().collect();
40                // slide down
41                buffer.pop_front();
42            }
43        }
44    }
45}
46
47/// Periodic window, waits till window of specified size is filled, emits, and starts a new window on a clock tick.
48pub fn to_periodic_window<'a, T: 'a, CT>(
49    mut stream: impl Stream<Item = T> + Unpin + 'a,
50    mut clock_stream: impl Stream<Item = CT> + Unpin + 'a,
51    emit_last: bool,
52) -> impl Stream<Item = Vec<T>> + 'a {
53    let mut buffer = vec![];
54    stream! {
55        loop {
56            select! {
57                biased;
58
59                _ = clock_stream.next() => {
60                    yield buffer.split_off(0)
61                }
62
63                element = stream.next() => {
64                    let Some(element) = element else {
65                        if emit_last {
66                            yield buffer.split_off(0)
67                        }
68                        break;
69                    };
70
71                    buffer.push(element);
72                }
73            }
74        }
75    }
76}
77
78/// Wrappers for actual window implementations, as an extension trait.
79pub trait WindowExt: Stream {
80    fn tumbling_window<'a>(
81        self,
82        window_size: usize,
83    ) -> Pin<Box<dyn Stream<Item = Vec<Self::Item>> + 'a>>
84    where
85        Self: Unpin + Sized + 'a,
86        Self::Item: Clone,
87    {
88        to_tumbling_window(self, window_size).boxed_local()
89    }
90
91    fn tumbling_window_unpin<'a>(
92        self,
93        window_size: usize,
94    ) -> Pin<Box<dyn Stream<Item = Vec<Self::Item>> + 'a>>
95    where
96        Self: Sized + 'a,
97        Self::Item: Clone,
98    {
99        to_tumbling_window(Box::pin(self), window_size).boxed_local()
100    }
101
102    fn sliding_window<'a>(
103        self,
104        window_size: usize,
105    ) -> Pin<Box<dyn Stream<Item = Vec<Self::Item>> + 'a>>
106    where
107        Self: Unpin + Sized + 'a,
108        Self::Item: Clone,
109    {
110        to_sliding_window(self, window_size).boxed_local()
111    }
112
113    fn sliding_window_unpin<'a>(
114        self,
115        window_size: usize,
116    ) -> Pin<Box<dyn Stream<Item = Vec<Self::Item>> + 'a>>
117    where
118        Self: Sized + 'a,
119        Self::Item: Clone,
120    {
121        to_sliding_window(Box::pin(self), window_size).boxed_local()
122    }
123
124    fn periodic_window<'a, CT>(
125        self,
126        clock_stream: impl Stream<Item = CT> + Unpin + 'a,
127        emit_last: bool,
128    ) -> Pin<Box<dyn Stream<Item = Vec<Self::Item>> + 'a>>
129    where
130        Self: Unpin + Sized + 'a,
131    {
132        to_periodic_window(self, clock_stream, emit_last).boxed_local()
133    }
134
135    fn periodic_window_unpin<'a, CT>(
136        self,
137        clock_stream: impl Stream<Item = CT> + 'a,
138        emit_last: bool,
139    ) -> Pin<Box<dyn Stream<Item = Vec<Self::Item>> + 'a>>
140    where
141        Self: Sized + 'a,
142    {
143        to_periodic_window(Box::pin(self), Box::pin(clock_stream), emit_last).boxed_local()
144    }
145}
146
147impl<S> WindowExt for S where S: Stream {}
148
149#[cfg(test)]
150mod tests;