rs2_stream/
advanced_analytics.rs

1//! Advanced analytics for RS2 streams
2//!
3//! Provides time-based windowed aggregations and advanced stream joins for building sophisticated real-time analytics.
4
5use crate::*;
6use async_stream::stream;
7use futures_core::Stream;
8use futures_util::pin_mut;
9use futures_util::stream::StreamExt;
10use std::collections::HashMap;
11use std::collections::HashSet;
12use std::time::{Duration, SystemTime};
13use crate::resource_manager::get_global_resource_manager;
14
15// ================================
16// Time-based Windowed Aggregations
17// ================================
18
19/// Configuration for time-based windowing
20#[derive(Debug, Clone)]
21pub struct TimeWindowConfig {
22    pub window_size: Duration,
23    pub slide_interval: Duration,
24    pub watermark_delay: Duration,
25    pub allowed_lateness: Duration,
26}
27
28impl Default for TimeWindowConfig {
29    fn default() -> Self {
30        Self {
31            window_size: Duration::from_secs(60),
32            slide_interval: Duration::from_secs(60),
33            watermark_delay: Duration::from_secs(10),
34            allowed_lateness: Duration::from_secs(5),
35        }
36    }
37}
38
39/// A time-based window of events
40#[derive(Debug)]
41pub struct TimeWindow<T> {
42    pub start_time: SystemTime,
43    pub end_time: SystemTime,
44    pub events: Vec<T>,
45}
46
47impl<T> TimeWindow<T> {
48    pub fn new(start_time: SystemTime, end_time: SystemTime) -> Self {
49        Self {
50            start_time,
51            end_time,
52            events: Vec::new(),
53        }
54    }
55
56    pub fn add_event(&mut self, event: T) {
57        self.events.push(event);
58    }
59
60    pub fn is_complete(&self, watermark: SystemTime) -> bool {
61        watermark >= self.end_time
62    }
63}
64
65/// Create time-based windows from a stream of timestamped events
66pub fn window_by_time<T, F>(
67    stream: RS2Stream<T>,
68    config: TimeWindowConfig,
69    timestamp_fn: F,
70) -> RS2Stream<TimeWindow<T>>
71where
72    T: Clone + Send + 'static,
73    F: Fn(&T) -> SystemTime + Send + 'static,
74{
75    stream! {
76        let mut windows: HashMap<u64, TimeWindow<T>> = HashMap::new();
77        let mut watermark = SystemTime::UNIX_EPOCH;
78        let resource_manager = get_global_resource_manager();
79        pin_mut!(stream);
80
81        while let Some(event) = stream.next().await {
82            let event_time = timestamp_fn(&event);
83            if event_time > watermark {
84                watermark = event_time;
85            }
86
87            // Calculate window boundaries
88            let since_epoch = event_time.duration_since(SystemTime::UNIX_EPOCH).unwrap_or_default();
89            let window_size_secs = config.window_size.as_secs();
90            let window_start_secs = (since_epoch.as_secs() / window_size_secs) * window_size_secs;
91            let window_start = SystemTime::UNIX_EPOCH + Duration::from_secs(window_start_secs);
92            let window_end = window_start + config.window_size;
93            let window_id = window_start_secs;
94
95            // Add event to appropriate window
96            let is_new_window = !windows.contains_key(&window_id);
97            let window = windows.entry(window_id).or_insert_with(|| {
98                TimeWindow::new(window_start, window_end)
99            });
100            if is_new_window {
101                resource_manager.track_memory_allocation(1).await.ok();
102            }
103            window.add_event(event);
104            resource_manager.track_memory_allocation(1).await.ok();
105
106            // Emit completed windows
107            let mut to_remove = Vec::new();
108            for (id, window) in &windows {
109                if window.is_complete(watermark - config.watermark_delay) {
110                    to_remove.push(*id);
111                }
112            }
113            for id in to_remove {
114                if let Some(window) = windows.remove(&id) {
115                    resource_manager.track_memory_deallocation(window.events.len() as u64).await;
116                    yield window;
117                }
118            }
119        }
120
121        // Emit remaining windows
122        for (_, window) in windows {
123            resource_manager.track_memory_deallocation(window.events.len() as u64).await;
124            yield window;
125        }
126    }
127    .boxed()
128}
129
130// ================================
131// Stream Joins with Time Windows
132// ================================
133
134/// Configuration for time-windowed joins
135#[derive(Debug, Clone)]
136pub struct TimeJoinConfig {
137    pub window_size: Duration,
138    pub watermark_delay: Duration,
139}
140
141impl Default for TimeJoinConfig {
142    fn default() -> Self {
143        Self {
144            window_size: Duration::from_secs(60),
145            watermark_delay: Duration::from_secs(10),
146        }
147    }
148}
149
150/// Join two streams with time-based windowing
151/// If key_selector is provided, only join on matching keys; otherwise, cross join within the window.
152pub fn join_with_time_window<T1, T2, F, G1, G2, K, FK1, FK2>(
153    stream1: RS2Stream<T1>,
154    stream2: RS2Stream<T2>,
155    config: TimeJoinConfig,
156    timestamp_fn1: G1,
157    timestamp_fn2: G2,
158    join_fn: F,
159    key_selector: Option<(FK1, FK2)>,
160) -> RS2Stream<(T1, T2)>
161where
162    T1: Clone + Send + Sync + 'static,
163    T2: Clone + Send + Sync + 'static,
164    F: Fn(T1, T2) -> (T1, T2) + Send + 'static,
165    G1: Fn(&T1) -> SystemTime + Send + 'static,
166    G2: Fn(&T2) -> SystemTime + Send + 'static,
167    K: Eq + std::hash::Hash,
168    FK1: Fn(&T1) -> K + Send + Sync + 'static,
169    FK2: Fn(&T2) -> K + Send + Sync + 'static,
170{
171    enum Either<L, R> {
172        Left(L),
173        Right(R),
174    }
175    stream! {
176        let mut buffer1: Vec<(T1, SystemTime)> = Vec::new();
177        let mut buffer2: Vec<(T2, SystemTime)> = Vec::new();
178        let mut watermark = SystemTime::UNIX_EPOCH;
179        let mut yielded: HashSet<(u128, u128)> = HashSet::new();
180        let s1 = stream1.map(|e| Either::Left(e));
181        let s2 = stream2.map(|e| Either::Right(e));
182        let merged = merge(s1, s2);
183        pin_mut!(merged);
184        while let Some(either) = merged.next().await {
185            match either {
186                Either::Left(e1) => {
187                    let t1 = timestamp_fn1(&e1);
188                    if t1 > watermark { watermark = t1; }
189                    buffer1.push((e1, t1));
190                }
191                Either::Right(e2) => {
192                    let t2 = timestamp_fn2(&e2);
193                    if t2 > watermark { watermark = t2; }
194                    buffer2.push((e2, t2));
195                }
196            }
197            // Clean old events
198            let min_time = watermark - config.window_size;
199            buffer1.retain(|(_, t)| *t >= min_time);
200            buffer2.retain(|(_, t)| *t >= min_time);
201            // Perform joins
202            for (e1, t1) in &buffer1 {
203                for (e2, t2) in &buffer2 {
204                    let diff = if t1 > t2 {
205                        t1.duration_since(*t2).unwrap_or_default()
206                    } else {
207                        t2.duration_since(*t1).unwrap_or_default()
208                    };
209                    if diff <= config.window_size {
210                        let key_match = if let Some((ref fk1, ref fk2)) = key_selector {
211                            fk1(e1) == fk2(e2)
212                        } else {
213                            true
214                        };
215                        if key_match {
216                            // Deduplicate by timestamps
217                            let t1n = t1.duration_since(SystemTime::UNIX_EPOCH).unwrap_or_default().as_nanos();
218                            let t2n = t2.duration_since(SystemTime::UNIX_EPOCH).unwrap_or_default().as_nanos();
219                            let key = (t1n, t2n);
220                            if !yielded.contains(&key) {
221                                yielded.insert(key);
222                                yield join_fn(e1.clone(), e2.clone());
223                            }
224                        }
225                    }
226                }
227            }
228        }
229    }
230    .boxed()
231}
232
233// ================================
234// Extension Traits
235// ================================
236
237/// Extension trait for advanced analytics
238pub trait AdvancedAnalyticsExt: Stream + Send + Sized + 'static {
239    /// Apply time-based windowing to the stream
240    fn window_by_time_rs2<F>(
241        self,
242        config: TimeWindowConfig,
243        timestamp_fn: F,
244    ) -> RS2Stream<TimeWindow<<Self as Stream>::Item>>
245    where
246        <Self as Stream>::Item: Clone + Send + 'static,
247        F: Fn(&<Self as Stream>::Item) -> SystemTime + Send + 'static,
248    {
249        window_by_time(self.boxed(), config, timestamp_fn)
250    }
251    /// Join with another stream using time windows
252    fn join_with_time_window_rs2<T2, F, G1, G2, K, FK1, FK2>(
253        self,
254        other: RS2Stream<T2>,
255        config: TimeJoinConfig,
256        timestamp_fn1: G1,
257        timestamp_fn2: G2,
258        join_fn: F,
259        key_selector: Option<(FK1, FK2)>,
260    ) -> RS2Stream<(Self::Item, T2)>
261    where
262        Self::Item: Clone + Send + Sync + 'static,
263        T2: Clone + Send + Sync + 'static,
264        F: Fn(Self::Item, T2) -> (Self::Item, T2) + Send + 'static,
265        G1: Fn(&Self::Item) -> SystemTime + Send + 'static,
266        G2: Fn(&T2) -> SystemTime + Send + 'static,
267        K: Eq + std::hash::Hash,
268        FK1: Fn(&Self::Item) -> K + Send + Sync + 'static,
269        FK2: Fn(&T2) -> K + Send + Sync + 'static,
270    {
271        join_with_time_window(
272            self.boxed(),
273            other,
274            config,
275            timestamp_fn1,
276            timestamp_fn2,
277            join_fn,
278            key_selector,
279        )
280    }
281}
282
283impl<S> AdvancedAnalyticsExt for S where S: Stream + Send + Sized + 'static {}