1use crate::*;
6use async_stream::stream;
7use futures_util::stream::StreamExt;
8use futures_util::pin_mut;
9use futures_core::Stream;
10use std::collections::HashMap;
11use std::time::{Duration, SystemTime};
12use std::collections::HashSet;
13
14#[derive(Debug, Clone)]
20pub struct TimeWindowConfig {
21 pub window_size: Duration,
22 pub slide_interval: Duration,
23 pub watermark_delay: Duration,
24 pub allowed_lateness: Duration,
25}
26
27impl Default for TimeWindowConfig {
28 fn default() -> Self {
29 Self {
30 window_size: Duration::from_secs(60),
31 slide_interval: Duration::from_secs(60),
32 watermark_delay: Duration::from_secs(10),
33 allowed_lateness: Duration::from_secs(5),
34 }
35 }
36}
37
38#[derive(Debug)]
40pub struct TimeWindow<T> {
41 pub start_time: SystemTime,
42 pub end_time: SystemTime,
43 pub events: Vec<T>,
44}
45
46impl<T> TimeWindow<T> {
47 pub fn new(start_time: SystemTime, end_time: SystemTime) -> Self {
48 Self {
49 start_time,
50 end_time,
51 events: Vec::new(),
52 }
53 }
54
55 pub fn add_event(&mut self, event: T) {
56 self.events.push(event);
57 }
58
59 pub fn is_complete(&self, watermark: SystemTime) -> bool {
60 watermark >= self.end_time
61 }
62}
63
64pub fn window_by_time<T, F>(
66 stream: RS2Stream<T>,
67 config: TimeWindowConfig,
68 timestamp_fn: F,
69) -> RS2Stream<TimeWindow<T>>
70where
71 T: Clone + Send + 'static,
72 F: Fn(&T) -> SystemTime + Send + 'static,
73{
74 stream! {
75 let mut windows: HashMap<u64, TimeWindow<T>> = HashMap::new();
76 let mut watermark = SystemTime::UNIX_EPOCH;
77 pin_mut!(stream);
78
79 while let Some(event) = stream.next().await {
80 let event_time = timestamp_fn(&event);
81 if event_time > watermark {
82 watermark = event_time;
83 }
84
85 let since_epoch = event_time.duration_since(SystemTime::UNIX_EPOCH).unwrap_or_default();
87 let window_size_secs = config.window_size.as_secs();
88 let window_start_secs = (since_epoch.as_secs() / window_size_secs) * window_size_secs;
89 let window_start = SystemTime::UNIX_EPOCH + Duration::from_secs(window_start_secs);
90 let window_end = window_start + config.window_size;
91 let window_id = window_start_secs;
92
93 let window = windows.entry(window_id).or_insert_with(|| {
95 TimeWindow::new(window_start, window_end)
96 });
97 window.add_event(event);
98
99 let mut to_remove = Vec::new();
101 for (id, window) in &windows {
102 if window.is_complete(watermark - config.watermark_delay) {
103 to_remove.push(*id);
104 }
105 }
106 for id in to_remove {
107 if let Some(window) = windows.remove(&id) {
108 yield window;
109 }
110 }
111 }
112
113 for (_, window) in windows {
115 yield window;
116 }
117 }
118 .boxed()
119}
120
121#[derive(Debug, Clone)]
127pub struct TimeJoinConfig {
128 pub window_size: Duration,
129 pub watermark_delay: Duration,
130}
131
132impl Default for TimeJoinConfig {
133 fn default() -> Self {
134 Self {
135 window_size: Duration::from_secs(60),
136 watermark_delay: Duration::from_secs(10),
137 }
138 }
139}
140
141pub fn join_with_time_window<T1, T2, F, G1, G2, K, FK1, FK2>(
144 stream1: RS2Stream<T1>,
145 stream2: RS2Stream<T2>,
146 config: TimeJoinConfig,
147 timestamp_fn1: G1,
148 timestamp_fn2: G2,
149 join_fn: F,
150 key_selector: Option<(FK1, FK2)>,
151) -> RS2Stream<(T1, T2)>
152where
153 T1: Clone + Send + Sync + 'static,
154 T2: Clone + Send + Sync + 'static,
155 F: Fn(T1, T2) -> (T1, T2) + Send + 'static,
156 G1: Fn(&T1) -> SystemTime + Send + 'static,
157 G2: Fn(&T2) -> SystemTime + Send + 'static,
158 K: Eq + std::hash::Hash,
159 FK1: Fn(&T1) -> K + Send + Sync + 'static,
160 FK2: Fn(&T2) -> K + Send + Sync + 'static,
161{
162 enum Either<L, R> { Left(L), Right(R) }
163 stream! {
164 let mut buffer1: Vec<(T1, SystemTime)> = Vec::new();
165 let mut buffer2: Vec<(T2, SystemTime)> = Vec::new();
166 let mut watermark = SystemTime::UNIX_EPOCH;
167 let mut yielded: HashSet<(u128, u128)> = HashSet::new();
168 let s1 = stream1.map(|e| Either::Left(e));
169 let s2 = stream2.map(|e| Either::Right(e));
170 let merged = merge(s1, s2);
171 pin_mut!(merged);
172 while let Some(either) = merged.next().await {
173 match either {
174 Either::Left(e1) => {
175 let t1 = timestamp_fn1(&e1);
176 if t1 > watermark { watermark = t1; }
177 buffer1.push((e1, t1));
178 }
179 Either::Right(e2) => {
180 let t2 = timestamp_fn2(&e2);
181 if t2 > watermark { watermark = t2; }
182 buffer2.push((e2, t2));
183 }
184 }
185 let min_time = watermark - config.window_size;
187 buffer1.retain(|(_, t)| *t >= min_time);
188 buffer2.retain(|(_, t)| *t >= min_time);
189 for (e1, t1) in &buffer1 {
191 for (e2, t2) in &buffer2 {
192 let diff = if t1 > t2 {
193 t1.duration_since(*t2).unwrap_or_default()
194 } else {
195 t2.duration_since(*t1).unwrap_or_default()
196 };
197 if diff <= config.window_size {
198 let key_match = if let Some((ref fk1, ref fk2)) = key_selector {
199 fk1(e1) == fk2(e2)
200 } else {
201 true
202 };
203 if key_match {
204 let t1n = t1.duration_since(SystemTime::UNIX_EPOCH).unwrap_or_default().as_nanos();
206 let t2n = t2.duration_since(SystemTime::UNIX_EPOCH).unwrap_or_default().as_nanos();
207 let key = (t1n, t2n);
208 if !yielded.contains(&key) {
209 yielded.insert(key);
210 yield join_fn(e1.clone(), e2.clone());
211 }
212 }
213 }
214 }
215 }
216 }
217 }
218 .boxed()
219}
220
221pub trait AdvancedAnalyticsExt: Stream + Send + Sized + 'static {
227 fn window_by_time_rs2<F>(
229 self,
230 config: TimeWindowConfig,
231 timestamp_fn: F,
232 ) -> RS2Stream<TimeWindow<<Self as Stream>::Item>>
233 where
234 <Self as Stream>::Item: Clone + Send + 'static,
235 F: Fn(&<Self as Stream>::Item) -> SystemTime + Send + 'static,
236 {
237 window_by_time(self.boxed(), config, timestamp_fn)
238 }
239 fn join_with_time_window_rs2<T2, F, G1, G2, K, FK1, FK2>(
241 self,
242 other: RS2Stream<T2>,
243 config: TimeJoinConfig,
244 timestamp_fn1: G1,
245 timestamp_fn2: G2,
246 join_fn: F,
247 key_selector: Option<(FK1, FK2)>,
248 ) -> RS2Stream<(Self::Item, T2)>
249 where
250 Self::Item: Clone + Send + Sync + 'static,
251 T2: Clone + Send + Sync + 'static,
252 F: Fn(Self::Item, T2) -> (Self::Item, T2) + Send + 'static,
253 G1: Fn(&Self::Item) -> SystemTime + Send + 'static,
254 G2: Fn(&T2) -> SystemTime + Send + 'static,
255 K: Eq + std::hash::Hash,
256 FK1: Fn(&Self::Item) -> K + Send + Sync + 'static,
257 FK2: Fn(&T2) -> K + Send + Sync + 'static,
258 {
259 join_with_time_window(self.boxed(), other, config, timestamp_fn1, timestamp_fn2, join_fn, key_selector)
260 }
261}
262
263impl<S> AdvancedAnalyticsExt for S where S: Stream + Send + Sized + 'static {}