1use async_stream::stream;
8use futures::StreamExt;
9use std::{collections::VecDeque, pin::Pin};
10use tokio::select;
11use tokio_stream::Stream;
12
13pub fn to_tumbling_window<'a, T: Clone + 'a>(
15 mut stream: impl Stream<Item = T> + Unpin + 'a,
16 window_size: usize,
17) -> impl Stream<Item = Vec<T>> + 'a {
18 let mut buffer = Vec::with_capacity(window_size);
19 stream! {
20 while let Some(element) = stream.next().await {
21 buffer.push(element);
22 if buffer.len() == window_size {
23 yield buffer.split_off(0);
24 }
25 }
26 }
27}
28
29pub fn to_sliding_window<'a, T: Clone + 'a>(
31 mut stream: impl Stream<Item = T> + Unpin + 'a,
32 window_size: usize,
33) -> impl Stream<Item = Vec<T>> + 'a {
34 let mut buffer = VecDeque::with_capacity(window_size);
35 stream! {
36 while let Some(element) = stream.next().await {
37 buffer.push_back(element);
38 if buffer.len() == window_size {
39 yield buffer.iter().cloned().collect();
40 buffer.pop_front();
42 }
43 }
44 }
45}
46
47pub fn to_periodic_window<'a, T: 'a, CT>(
49 mut stream: impl Stream<Item = T> + Unpin + 'a,
50 mut clock_stream: impl Stream<Item = CT> + Unpin + 'a,
51 emit_last: bool,
52) -> impl Stream<Item = Vec<T>> + 'a {
53 let mut buffer = vec![];
54 stream! {
55 loop {
56 select! {
57 biased;
58
59 _ = clock_stream.next() => {
60 yield buffer.split_off(0)
61 }
62
63 element = stream.next() => {
64 let Some(element) = element else {
65 if emit_last {
66 yield buffer.split_off(0)
67 }
68 break;
69 };
70
71 buffer.push(element);
72 }
73 }
74 }
75 }
76}
77
78pub trait WindowExt: Stream {
80 fn tumbling_window<'a>(
81 self,
82 window_size: usize,
83 ) -> Pin<Box<dyn Stream<Item = Vec<Self::Item>> + 'a>>
84 where
85 Self: Unpin + Sized + 'a,
86 Self::Item: Clone,
87 {
88 to_tumbling_window(self, window_size).boxed_local()
89 }
90
91 fn tumbling_window_unpin<'a>(
92 self,
93 window_size: usize,
94 ) -> Pin<Box<dyn Stream<Item = Vec<Self::Item>> + 'a>>
95 where
96 Self: Sized + 'a,
97 Self::Item: Clone,
98 {
99 to_tumbling_window(Box::pin(self), window_size).boxed_local()
100 }
101
102 fn sliding_window<'a>(
103 self,
104 window_size: usize,
105 ) -> Pin<Box<dyn Stream<Item = Vec<Self::Item>> + 'a>>
106 where
107 Self: Unpin + Sized + 'a,
108 Self::Item: Clone,
109 {
110 to_sliding_window(self, window_size).boxed_local()
111 }
112
113 fn sliding_window_unpin<'a>(
114 self,
115 window_size: usize,
116 ) -> Pin<Box<dyn Stream<Item = Vec<Self::Item>> + 'a>>
117 where
118 Self: Sized + 'a,
119 Self::Item: Clone,
120 {
121 to_sliding_window(Box::pin(self), window_size).boxed_local()
122 }
123
124 fn periodic_window<'a, CT>(
125 self,
126 clock_stream: impl Stream<Item = CT> + Unpin + 'a,
127 emit_last: bool,
128 ) -> Pin<Box<dyn Stream<Item = Vec<Self::Item>> + 'a>>
129 where
130 Self: Unpin + Sized + 'a,
131 {
132 to_periodic_window(self, clock_stream, emit_last).boxed_local()
133 }
134
135 fn periodic_window_unpin<'a, CT>(
136 self,
137 clock_stream: impl Stream<Item = CT> + 'a,
138 emit_last: bool,
139 ) -> Pin<Box<dyn Stream<Item = Vec<Self::Item>> + 'a>>
140 where
141 Self: Sized + 'a,
142 {
143 to_periodic_window(Box::pin(self), Box::pin(clock_stream), emit_last).boxed_local()
144 }
145}
146
147impl<S> WindowExt for S where S: Stream {}
148
149#[cfg(test)]
150mod tests;