1use crate::*;
6use async_stream::stream;
7use futures_core::Stream;
8use futures_util::pin_mut;
9use futures_util::stream::StreamExt;
10use std::collections::HashMap;
11use std::collections::HashSet;
12use std::time::{Duration, SystemTime};
13use crate::resource_manager::get_global_resource_manager;
14
15#[derive(Debug, Clone)]
21pub struct TimeWindowConfig {
22 pub window_size: Duration,
23 pub slide_interval: Duration,
24 pub watermark_delay: Duration,
25 pub allowed_lateness: Duration,
26}
27
28impl Default for TimeWindowConfig {
29 fn default() -> Self {
30 Self {
31 window_size: Duration::from_secs(60),
32 slide_interval: Duration::from_secs(60),
33 watermark_delay: Duration::from_secs(10),
34 allowed_lateness: Duration::from_secs(5),
35 }
36 }
37}
38
39#[derive(Debug)]
41pub struct TimeWindow<T> {
42 pub start_time: SystemTime,
43 pub end_time: SystemTime,
44 pub events: Vec<T>,
45}
46
47impl<T> TimeWindow<T> {
48 pub fn new(start_time: SystemTime, end_time: SystemTime) -> Self {
49 Self {
50 start_time,
51 end_time,
52 events: Vec::new(),
53 }
54 }
55
56 pub fn add_event(&mut self, event: T) {
57 self.events.push(event);
58 }
59
60 pub fn is_complete(&self, watermark: SystemTime) -> bool {
61 watermark >= self.end_time
62 }
63}
64
65pub fn window_by_time<T, F>(
67 stream: RS2Stream<T>,
68 config: TimeWindowConfig,
69 timestamp_fn: F,
70) -> RS2Stream<TimeWindow<T>>
71where
72 T: Clone + Send + 'static,
73 F: Fn(&T) -> SystemTime + Send + 'static,
74{
75 stream! {
76 let mut windows: HashMap<u64, TimeWindow<T>> = HashMap::new();
77 let mut watermark = SystemTime::UNIX_EPOCH;
78 let resource_manager = get_global_resource_manager();
79 pin_mut!(stream);
80
81 while let Some(event) = stream.next().await {
82 let event_time = timestamp_fn(&event);
83 if event_time > watermark {
84 watermark = event_time;
85 }
86
87 let since_epoch = event_time.duration_since(SystemTime::UNIX_EPOCH).unwrap_or_default();
89 let window_size_secs = config.window_size.as_secs();
90 let window_start_secs = (since_epoch.as_secs() / window_size_secs) * window_size_secs;
91 let window_start = SystemTime::UNIX_EPOCH + Duration::from_secs(window_start_secs);
92 let window_end = window_start + config.window_size;
93 let window_id = window_start_secs;
94
95 let is_new_window = !windows.contains_key(&window_id);
97 let window = windows.entry(window_id).or_insert_with(|| {
98 TimeWindow::new(window_start, window_end)
99 });
100 if is_new_window {
101 resource_manager.track_memory_allocation(1).await.ok();
102 }
103 window.add_event(event);
104 resource_manager.track_memory_allocation(1).await.ok();
105
106 let mut to_remove = Vec::new();
108 for (id, window) in &windows {
109 if window.is_complete(watermark - config.watermark_delay) {
110 to_remove.push(*id);
111 }
112 }
113 for id in to_remove {
114 if let Some(window) = windows.remove(&id) {
115 resource_manager.track_memory_deallocation(window.events.len() as u64).await;
116 yield window;
117 }
118 }
119 }
120
121 for (_, window) in windows {
123 resource_manager.track_memory_deallocation(window.events.len() as u64).await;
124 yield window;
125 }
126 }
127 .boxed()
128}
129
130#[derive(Debug, Clone)]
136pub struct TimeJoinConfig {
137 pub window_size: Duration,
138 pub watermark_delay: Duration,
139}
140
141impl Default for TimeJoinConfig {
142 fn default() -> Self {
143 Self {
144 window_size: Duration::from_secs(60),
145 watermark_delay: Duration::from_secs(10),
146 }
147 }
148}
149
150pub fn join_with_time_window<T1, T2, F, G1, G2, K, FK1, FK2>(
153 stream1: RS2Stream<T1>,
154 stream2: RS2Stream<T2>,
155 config: TimeJoinConfig,
156 timestamp_fn1: G1,
157 timestamp_fn2: G2,
158 join_fn: F,
159 key_selector: Option<(FK1, FK2)>,
160) -> RS2Stream<(T1, T2)>
161where
162 T1: Clone + Send + Sync + 'static,
163 T2: Clone + Send + Sync + 'static,
164 F: Fn(T1, T2) -> (T1, T2) + Send + 'static,
165 G1: Fn(&T1) -> SystemTime + Send + 'static,
166 G2: Fn(&T2) -> SystemTime + Send + 'static,
167 K: Eq + std::hash::Hash,
168 FK1: Fn(&T1) -> K + Send + Sync + 'static,
169 FK2: Fn(&T2) -> K + Send + Sync + 'static,
170{
171 enum Either<L, R> {
172 Left(L),
173 Right(R),
174 }
175 stream! {
176 let mut buffer1: Vec<(T1, SystemTime)> = Vec::new();
177 let mut buffer2: Vec<(T2, SystemTime)> = Vec::new();
178 let mut watermark = SystemTime::UNIX_EPOCH;
179 let mut yielded: HashSet<(u128, u128)> = HashSet::new();
180 let s1 = stream1.map(|e| Either::Left(e));
181 let s2 = stream2.map(|e| Either::Right(e));
182 let merged = merge(s1, s2);
183 pin_mut!(merged);
184 while let Some(either) = merged.next().await {
185 match either {
186 Either::Left(e1) => {
187 let t1 = timestamp_fn1(&e1);
188 if t1 > watermark { watermark = t1; }
189 buffer1.push((e1, t1));
190 }
191 Either::Right(e2) => {
192 let t2 = timestamp_fn2(&e2);
193 if t2 > watermark { watermark = t2; }
194 buffer2.push((e2, t2));
195 }
196 }
197 let min_time = watermark - config.window_size;
199 buffer1.retain(|(_, t)| *t >= min_time);
200 buffer2.retain(|(_, t)| *t >= min_time);
201 for (e1, t1) in &buffer1 {
203 for (e2, t2) in &buffer2 {
204 let diff = if t1 > t2 {
205 t1.duration_since(*t2).unwrap_or_default()
206 } else {
207 t2.duration_since(*t1).unwrap_or_default()
208 };
209 if diff <= config.window_size {
210 let key_match = if let Some((ref fk1, ref fk2)) = key_selector {
211 fk1(e1) == fk2(e2)
212 } else {
213 true
214 };
215 if key_match {
216 let t1n = t1.duration_since(SystemTime::UNIX_EPOCH).unwrap_or_default().as_nanos();
218 let t2n = t2.duration_since(SystemTime::UNIX_EPOCH).unwrap_or_default().as_nanos();
219 let key = (t1n, t2n);
220 if !yielded.contains(&key) {
221 yielded.insert(key);
222 yield join_fn(e1.clone(), e2.clone());
223 }
224 }
225 }
226 }
227 }
228 }
229 }
230 .boxed()
231}
232
233pub trait AdvancedAnalyticsExt: Stream + Send + Sized + 'static {
239 fn window_by_time_rs2<F>(
241 self,
242 config: TimeWindowConfig,
243 timestamp_fn: F,
244 ) -> RS2Stream<TimeWindow<<Self as Stream>::Item>>
245 where
246 <Self as Stream>::Item: Clone + Send + 'static,
247 F: Fn(&<Self as Stream>::Item) -> SystemTime + Send + 'static,
248 {
249 window_by_time(self.boxed(), config, timestamp_fn)
250 }
251 fn join_with_time_window_rs2<T2, F, G1, G2, K, FK1, FK2>(
253 self,
254 other: RS2Stream<T2>,
255 config: TimeJoinConfig,
256 timestamp_fn1: G1,
257 timestamp_fn2: G2,
258 join_fn: F,
259 key_selector: Option<(FK1, FK2)>,
260 ) -> RS2Stream<(Self::Item, T2)>
261 where
262 Self::Item: Clone + Send + Sync + 'static,
263 T2: Clone + Send + Sync + 'static,
264 F: Fn(Self::Item, T2) -> (Self::Item, T2) + Send + 'static,
265 G1: Fn(&Self::Item) -> SystemTime + Send + 'static,
266 G2: Fn(&T2) -> SystemTime + Send + 'static,
267 K: Eq + std::hash::Hash,
268 FK1: Fn(&Self::Item) -> K + Send + Sync + 'static,
269 FK2: Fn(&T2) -> K + Send + Sync + 'static,
270 {
271 join_with_time_window(
272 self.boxed(),
273 other,
274 config,
275 timestamp_fn1,
276 timestamp_fn2,
277 join_fn,
278 key_selector,
279 )
280 }
281}
282
283impl<S> AdvancedAnalyticsExt for S where S: Stream + Send + Sized + 'static {}