rs2_stream/
advanced_analytics.rs

1//! Advanced analytics for RS2 streams
2//!
3//! Provides time-based windowed aggregations and advanced stream joins for building sophisticated real-time analytics.
4
5use crate::*;
6use async_stream::stream;
7use futures_util::stream::StreamExt;
8use futures_util::pin_mut;
9use futures_core::Stream;
10use std::collections::HashMap;
11use std::time::{Duration, SystemTime};
12use std::collections::HashSet;
13
14// ================================
15// Time-based Windowed Aggregations
16// ================================
17
18/// Configuration for time-based windowing
19#[derive(Debug, Clone)]
20pub struct TimeWindowConfig {
21    pub window_size: Duration,
22    pub slide_interval: Duration,
23    pub watermark_delay: Duration,
24    pub allowed_lateness: Duration,
25}
26
27impl Default for TimeWindowConfig {
28    fn default() -> Self {
29        Self {
30            window_size: Duration::from_secs(60),
31            slide_interval: Duration::from_secs(60),
32            watermark_delay: Duration::from_secs(10),
33            allowed_lateness: Duration::from_secs(5),
34        }
35    }
36}
37
38/// A time-based window of events
39#[derive(Debug)]
40pub struct TimeWindow<T> {
41    pub start_time: SystemTime,
42    pub end_time: SystemTime,
43    pub events: Vec<T>,
44}
45
46impl<T> TimeWindow<T> {
47    pub fn new(start_time: SystemTime, end_time: SystemTime) -> Self {
48        Self {
49            start_time,
50            end_time,
51            events: Vec::new(),
52        }
53    }
54
55    pub fn add_event(&mut self, event: T) {
56        self.events.push(event);
57    }
58
59    pub fn is_complete(&self, watermark: SystemTime) -> bool {
60        watermark >= self.end_time
61    }
62}
63
64/// Create time-based windows from a stream of timestamped events
65pub fn window_by_time<T, F>(
66    stream: RS2Stream<T>,
67    config: TimeWindowConfig,
68    timestamp_fn: F,
69) -> RS2Stream<TimeWindow<T>>
70where
71    T: Clone + Send + 'static,
72    F: Fn(&T) -> SystemTime + Send + 'static,
73{
74    stream! {
75        let mut windows: HashMap<u64, TimeWindow<T>> = HashMap::new();
76        let mut watermark = SystemTime::UNIX_EPOCH;
77        pin_mut!(stream);
78
79        while let Some(event) = stream.next().await {
80            let event_time = timestamp_fn(&event);
81            if event_time > watermark {
82                watermark = event_time;
83            }
84
85            // Calculate window boundaries
86            let since_epoch = event_time.duration_since(SystemTime::UNIX_EPOCH).unwrap_or_default();
87            let window_size_secs = config.window_size.as_secs();
88            let window_start_secs = (since_epoch.as_secs() / window_size_secs) * window_size_secs;
89            let window_start = SystemTime::UNIX_EPOCH + Duration::from_secs(window_start_secs);
90            let window_end = window_start + config.window_size;
91            let window_id = window_start_secs;
92
93            // Add event to appropriate window
94            let window = windows.entry(window_id).or_insert_with(|| {
95                TimeWindow::new(window_start, window_end)
96            });
97            window.add_event(event);
98
99            // Emit completed windows
100            let mut to_remove = Vec::new();
101            for (id, window) in &windows {
102                if window.is_complete(watermark - config.watermark_delay) {
103                    to_remove.push(*id);
104                }
105            }
106            for id in to_remove {
107                if let Some(window) = windows.remove(&id) {
108                    yield window;
109                }
110            }
111        }
112
113        // Emit remaining windows
114        for (_, window) in windows {
115            yield window;
116        }
117    }
118    .boxed()
119}
120
121// ================================
122// Stream Joins with Time Windows
123// ================================
124
125/// Configuration for time-windowed joins
126#[derive(Debug, Clone)]
127pub struct TimeJoinConfig {
128    pub window_size: Duration,
129    pub watermark_delay: Duration,
130}
131
132impl Default for TimeJoinConfig {
133    fn default() -> Self {
134        Self {
135            window_size: Duration::from_secs(60),
136            watermark_delay: Duration::from_secs(10),
137        }
138    }
139}
140
141/// Join two streams with time-based windowing
142/// If key_selector is provided, only join on matching keys; otherwise, cross join within the window.
143pub fn join_with_time_window<T1, T2, F, G1, G2, K, FK1, FK2>(
144    stream1: RS2Stream<T1>,
145    stream2: RS2Stream<T2>,
146    config: TimeJoinConfig,
147    timestamp_fn1: G1,
148    timestamp_fn2: G2,
149    join_fn: F,
150    key_selector: Option<(FK1, FK2)>,
151) -> RS2Stream<(T1, T2)>
152where
153    T1: Clone + Send + Sync + 'static,
154    T2: Clone + Send + Sync + 'static,
155    F: Fn(T1, T2) -> (T1, T2) + Send + 'static,
156    G1: Fn(&T1) -> SystemTime + Send + 'static,
157    G2: Fn(&T2) -> SystemTime + Send + 'static,
158    K: Eq + std::hash::Hash,
159    FK1: Fn(&T1) -> K + Send + Sync + 'static,
160    FK2: Fn(&T2) -> K + Send + Sync + 'static,
161{
162    enum Either<L, R> { Left(L), Right(R) }
163    stream! {
164        let mut buffer1: Vec<(T1, SystemTime)> = Vec::new();
165        let mut buffer2: Vec<(T2, SystemTime)> = Vec::new();
166        let mut watermark = SystemTime::UNIX_EPOCH;
167        let mut yielded: HashSet<(u128, u128)> = HashSet::new();
168        let s1 = stream1.map(|e| Either::Left(e));
169        let s2 = stream2.map(|e| Either::Right(e));
170        let merged = merge(s1, s2);
171        pin_mut!(merged);
172        while let Some(either) = merged.next().await {
173            match either {
174                Either::Left(e1) => {
175                    let t1 = timestamp_fn1(&e1);
176                    if t1 > watermark { watermark = t1; }
177                    buffer1.push((e1, t1));
178                }
179                Either::Right(e2) => {
180                    let t2 = timestamp_fn2(&e2);
181                    if t2 > watermark { watermark = t2; }
182                    buffer2.push((e2, t2));
183                }
184            }
185            // Clean old events
186            let min_time = watermark - config.window_size;
187            buffer1.retain(|(_, t)| *t >= min_time);
188            buffer2.retain(|(_, t)| *t >= min_time);
189            // Perform joins
190            for (e1, t1) in &buffer1 {
191                for (e2, t2) in &buffer2 {
192                    let diff = if t1 > t2 {
193                        t1.duration_since(*t2).unwrap_or_default()
194                    } else {
195                        t2.duration_since(*t1).unwrap_or_default()
196                    };
197                    if diff <= config.window_size {
198                        let key_match = if let Some((ref fk1, ref fk2)) = key_selector {
199                            fk1(e1) == fk2(e2)
200                        } else {
201                            true
202                        };
203                        if key_match {
204                            // Deduplicate by timestamps
205                            let t1n = t1.duration_since(SystemTime::UNIX_EPOCH).unwrap_or_default().as_nanos();
206                            let t2n = t2.duration_since(SystemTime::UNIX_EPOCH).unwrap_or_default().as_nanos();
207                            let key = (t1n, t2n);
208                            if !yielded.contains(&key) {
209                                yielded.insert(key);
210                                yield join_fn(e1.clone(), e2.clone());
211                            }
212                        }
213                    }
214                }
215            }
216        }
217    }
218    .boxed()
219}
220
221// ================================
222// Extension Traits
223// ================================
224
225/// Extension trait for advanced analytics
226pub trait AdvancedAnalyticsExt: Stream + Send + Sized + 'static {
227    /// Apply time-based windowing to the stream
228    fn window_by_time_rs2<F>(
229        self,
230        config: TimeWindowConfig,
231        timestamp_fn: F,
232    ) -> RS2Stream<TimeWindow<<Self as Stream>::Item>>
233    where
234        <Self as Stream>::Item: Clone + Send + 'static,
235        F: Fn(&<Self as Stream>::Item) -> SystemTime + Send + 'static,
236    {
237        window_by_time(self.boxed(), config, timestamp_fn)
238    }
239    /// Join with another stream using time windows
240    fn join_with_time_window_rs2<T2, F, G1, G2, K, FK1, FK2>(
241        self,
242        other: RS2Stream<T2>,
243        config: TimeJoinConfig,
244        timestamp_fn1: G1,
245        timestamp_fn2: G2,
246        join_fn: F,
247        key_selector: Option<(FK1, FK2)>,
248    ) -> RS2Stream<(Self::Item, T2)>
249    where
250        Self::Item: Clone + Send + Sync + 'static,
251        T2: Clone + Send + Sync + 'static,
252        F: Fn(Self::Item, T2) -> (Self::Item, T2) + Send + 'static,
253        G1: Fn(&Self::Item) -> SystemTime + Send + 'static,
254        G2: Fn(&T2) -> SystemTime + Send + 'static,
255        K: Eq + std::hash::Hash,
256        FK1: Fn(&Self::Item) -> K + Send + Sync + 'static,
257        FK2: Fn(&T2) -> K + Send + Sync + 'static,
258    {
259        join_with_time_window(self.boxed(), other, config, timestamp_fn1, timestamp_fn2, join_fn, key_selector)
260    }
261}
262
263impl<S> AdvancedAnalyticsExt for S where S: Stream + Send + Sized + 'static {}