Skip to main content

oxigdal_streaming/transformations/
join.rs

1//! Join operations for streaming data.
2
3use crate::core::stream::StreamElement;
4use crate::error::{Result, StreamingError};
5use chrono::Utc;
6use serde::{Deserialize, Serialize};
7use std::collections::{HashMap, VecDeque};
8use std::sync::Arc;
9use tokio::sync::RwLock;
10
11/// Type of join operation.
12#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
13pub enum JoinType {
14    /// Inner join
15    Inner,
16    /// Left outer join
17    LeftOuter,
18    /// Right outer join
19    RightOuter,
20    /// Full outer join
21    FullOuter,
22}
23
24/// Configuration for join operations.
25#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct JoinConfig {
27    /// Type of join
28    pub join_type: JoinType,
29
30    /// Maximum buffer size per key
31    pub max_buffer_size: usize,
32
33    /// Time-to-live for buffered elements (in seconds)
34    pub ttl_seconds: i64,
35
36    /// Enable cleanup of expired elements
37    pub enable_cleanup: bool,
38}
39
40impl Default for JoinConfig {
41    fn default() -> Self {
42        Self {
43            join_type: JoinType::Inner,
44            max_buffer_size: 1000,
45            ttl_seconds: 300,
46            enable_cleanup: true,
47        }
48    }
49}
50
51/// Join operator for two streams.
52pub struct JoinOperator {
53    config: JoinConfig,
54    left_buffer: Arc<RwLock<HashMap<Vec<u8>, VecDeque<StreamElement>>>>,
55    right_buffer: Arc<RwLock<HashMap<Vec<u8>, VecDeque<StreamElement>>>>,
56}
57
58impl JoinOperator {
59    /// Create a new join operator.
60    pub fn new(config: JoinConfig) -> Self {
61        Self {
62            config,
63            left_buffer: Arc::new(RwLock::new(HashMap::new())),
64            right_buffer: Arc::new(RwLock::new(HashMap::new())),
65        }
66    }
67
68    /// Process a left element.
69    pub async fn process_left(&self, element: StreamElement) -> Result<Vec<StreamElement>> {
70        let key = element
71            .key
72            .clone()
73            .ok_or_else(|| StreamingError::JoinError("Left element must have a key".to_string()))?;
74
75        let mut results = Vec::new();
76        let right_buffer = self.right_buffer.read().await;
77
78        if let Some(right_elements) = right_buffer.get(&key) {
79            for right_elem in right_elements {
80                let joined = self.join_elements(&element, right_elem)?;
81                results.push(joined);
82            }
83        }
84
85        drop(right_buffer);
86
87        if self.config.join_type == JoinType::LeftOuter && results.is_empty() {
88            results.push(element.clone());
89        }
90
91        let mut left_buffer = self.left_buffer.write().await;
92        let buffer = left_buffer.entry(key).or_insert_with(VecDeque::new);
93
94        if buffer.len() >= self.config.max_buffer_size {
95            buffer.pop_front();
96        }
97
98        buffer.push_back(element);
99
100        if self.config.enable_cleanup {
101            self.cleanup_expired_left(&mut left_buffer);
102        }
103
104        Ok(results)
105    }
106
107    /// Process a right element.
108    pub async fn process_right(&self, element: StreamElement) -> Result<Vec<StreamElement>> {
109        let key = element.key.clone().ok_or_else(|| {
110            StreamingError::JoinError("Right element must have a key".to_string())
111        })?;
112
113        let mut results = Vec::new();
114        let left_buffer = self.left_buffer.read().await;
115
116        if let Some(left_elements) = left_buffer.get(&key) {
117            for left_elem in left_elements {
118                let joined = self.join_elements(left_elem, &element)?;
119                results.push(joined);
120            }
121        }
122
123        drop(left_buffer);
124
125        if self.config.join_type == JoinType::RightOuter && results.is_empty() {
126            results.push(element.clone());
127        }
128
129        let mut right_buffer = self.right_buffer.write().await;
130        let buffer = right_buffer.entry(key).or_insert_with(VecDeque::new);
131
132        if buffer.len() >= self.config.max_buffer_size {
133            buffer.pop_front();
134        }
135
136        buffer.push_back(element);
137
138        if self.config.enable_cleanup {
139            self.cleanup_expired_right(&mut right_buffer);
140        }
141
142        Ok(results)
143    }
144
145    /// Join two elements.
146    fn join_elements(&self, left: &StreamElement, right: &StreamElement) -> Result<StreamElement> {
147        let mut joined_data = Vec::new();
148        joined_data.extend_from_slice(&left.data);
149        joined_data.extend_from_slice(&right.data);
150
151        Ok(StreamElement {
152            data: joined_data,
153            event_time: left.event_time.max(right.event_time),
154            processing_time: Utc::now(),
155            key: left.key.clone(),
156            metadata: left.metadata.clone(),
157        })
158    }
159
160    /// Cleanup expired elements from left buffer.
161    fn cleanup_expired_left(&self, buffer: &mut HashMap<Vec<u8>, VecDeque<StreamElement>>) {
162        let now = Utc::now();
163        let ttl_seconds = self.config.ttl_seconds;
164
165        for queue in buffer.values_mut() {
166            queue.retain(|elem| {
167                let age = now.signed_duration_since(elem.event_time);
168                age.num_seconds() < ttl_seconds
169            });
170        }
171    }
172
173    /// Cleanup expired elements from right buffer.
174    fn cleanup_expired_right(&self, buffer: &mut HashMap<Vec<u8>, VecDeque<StreamElement>>) {
175        let now = Utc::now();
176        let ttl_seconds = self.config.ttl_seconds;
177
178        for queue in buffer.values_mut() {
179            queue.retain(|elem| {
180                let age = now.signed_duration_since(elem.event_time);
181                age.num_seconds() < ttl_seconds
182            });
183        }
184    }
185
186    /// Clear all buffers.
187    pub async fn clear(&self) {
188        self.left_buffer.write().await.clear();
189        self.right_buffer.write().await.clear();
190    }
191}
192
193/// CoGroup operator for two streams.
194pub struct CoGroupOperator {
195    left_buffer: Arc<RwLock<HashMap<Vec<u8>, Vec<StreamElement>>>>,
196    right_buffer: Arc<RwLock<HashMap<Vec<u8>, Vec<StreamElement>>>>,
197}
198
199impl CoGroupOperator {
200    /// Create a new cogroup operator.
201    pub fn new() -> Self {
202        Self {
203            left_buffer: Arc::new(RwLock::new(HashMap::new())),
204            right_buffer: Arc::new(RwLock::new(HashMap::new())),
205        }
206    }
207
208    /// Add a left element.
209    pub async fn add_left(&self, element: StreamElement) -> Result<()> {
210        let key = element
211            .key
212            .clone()
213            .ok_or_else(|| StreamingError::JoinError("Element must have a key".to_string()))?;
214
215        let mut buffer = self.left_buffer.write().await;
216        buffer.entry(key).or_insert_with(Vec::new).push(element);
217
218        Ok(())
219    }
220
221    /// Add a right element.
222    pub async fn add_right(&self, element: StreamElement) -> Result<()> {
223        let key = element
224            .key
225            .clone()
226            .ok_or_else(|| StreamingError::JoinError("Element must have a key".to_string()))?;
227
228        let mut buffer = self.right_buffer.write().await;
229        buffer.entry(key).or_insert_with(Vec::new).push(element);
230
231        Ok(())
232    }
233
234    /// Get cogroup results for a key.
235    pub async fn get_results(&self, key: &[u8]) -> (Vec<StreamElement>, Vec<StreamElement>) {
236        let left_buffer = self.left_buffer.read().await;
237        let right_buffer = self.right_buffer.read().await;
238
239        let left = left_buffer.get(key).cloned().unwrap_or_else(Vec::new);
240        let right = right_buffer.get(key).cloned().unwrap_or_else(Vec::new);
241
242        (left, right)
243    }
244
245    /// Clear buffers.
246    pub async fn clear(&self) {
247        self.left_buffer.write().await.clear();
248        self.right_buffer.write().await.clear();
249    }
250}
251
252impl Default for CoGroupOperator {
253    fn default() -> Self {
254        Self::new()
255    }
256}
257
258/// Interval join operator.
259pub struct IntervalJoin {
260    lower_bound_seconds: i64,
261    upper_bound_seconds: i64,
262    left_buffer: Arc<RwLock<HashMap<Vec<u8>, VecDeque<StreamElement>>>>,
263    right_buffer: Arc<RwLock<HashMap<Vec<u8>, VecDeque<StreamElement>>>>,
264}
265
266impl IntervalJoin {
267    /// Create a new interval join.
268    pub fn new(lower_bound_seconds: i64, upper_bound_seconds: i64) -> Self {
269        Self {
270            lower_bound_seconds,
271            upper_bound_seconds,
272            left_buffer: Arc::new(RwLock::new(HashMap::new())),
273            right_buffer: Arc::new(RwLock::new(HashMap::new())),
274        }
275    }
276
277    /// Process a left element.
278    pub async fn process_left(&self, element: StreamElement) -> Result<Vec<StreamElement>> {
279        let key = element
280            .key
281            .clone()
282            .ok_or_else(|| StreamingError::JoinError("Element must have a key".to_string()))?;
283
284        let mut results = Vec::new();
285        let right_buffer = self.right_buffer.read().await;
286
287        if let Some(right_elements) = right_buffer.get(&key) {
288            for right_elem in right_elements {
289                if self.in_interval(&element, right_elem) {
290                    let mut joined_data = Vec::new();
291                    joined_data.extend_from_slice(&element.data);
292                    joined_data.extend_from_slice(&right_elem.data);
293
294                    results.push(StreamElement {
295                        data: joined_data,
296                        event_time: element.event_time.max(right_elem.event_time),
297                        processing_time: Utc::now(),
298                        key: Some(key.clone()),
299                        metadata: element.metadata.clone(),
300                    });
301                }
302            }
303        }
304
305        drop(right_buffer);
306
307        let mut left_buffer = self.left_buffer.write().await;
308        left_buffer
309            .entry(key)
310            .or_insert_with(VecDeque::new)
311            .push_back(element);
312
313        Ok(results)
314    }
315
316    /// Process a right element.
317    pub async fn process_right(&self, element: StreamElement) -> Result<Vec<StreamElement>> {
318        let key = element
319            .key
320            .clone()
321            .ok_or_else(|| StreamingError::JoinError("Element must have a key".to_string()))?;
322
323        let mut results = Vec::new();
324        let left_buffer = self.left_buffer.read().await;
325
326        if let Some(left_elements) = left_buffer.get(&key) {
327            for left_elem in left_elements {
328                if self.in_interval(left_elem, &element) {
329                    let mut joined_data = Vec::new();
330                    joined_data.extend_from_slice(&left_elem.data);
331                    joined_data.extend_from_slice(&element.data);
332
333                    results.push(StreamElement {
334                        data: joined_data,
335                        event_time: left_elem.event_time.max(element.event_time),
336                        processing_time: Utc::now(),
337                        key: Some(key.clone()),
338                        metadata: left_elem.metadata.clone(),
339                    });
340                }
341            }
342        }
343
344        drop(left_buffer);
345
346        let mut right_buffer = self.right_buffer.write().await;
347        right_buffer
348            .entry(key)
349            .or_insert_with(VecDeque::new)
350            .push_back(element);
351
352        Ok(results)
353    }
354
355    /// Check if two elements are within the join interval.
356    fn in_interval(&self, left: &StreamElement, right: &StreamElement) -> bool {
357        let time_diff = right.event_time.signed_duration_since(left.event_time);
358        let time_diff_seconds = time_diff.num_seconds();
359        time_diff_seconds >= self.lower_bound_seconds
360            && time_diff_seconds <= self.upper_bound_seconds
361    }
362
363    /// Clear buffers.
364    pub async fn clear(&self) {
365        self.left_buffer.write().await.clear();
366        self.right_buffer.write().await.clear();
367    }
368}
369
370#[cfg(test)]
371mod tests {
372    use super::*;
373
374    #[tokio::test]
375    async fn test_join_operator() {
376        let config = JoinConfig::default();
377        let join = JoinOperator::new(config);
378
379        let left = StreamElement::new(vec![1, 2], Utc::now()).with_key(vec![1]);
380        let right = StreamElement::new(vec![3, 4], Utc::now()).with_key(vec![1]);
381
382        join.process_left(left)
383            .await
384            .expect("process_left should succeed in test");
385        let results = join
386            .process_right(right)
387            .await
388            .expect("process_right should succeed in test");
389
390        assert_eq!(results.len(), 1);
391        assert_eq!(results[0].data, vec![1, 2, 3, 4]);
392    }
393
394    #[tokio::test]
395    async fn test_cogroup_operator() {
396        let cogroup = CoGroupOperator::new();
397
398        let left = StreamElement::new(vec![1, 2], Utc::now()).with_key(vec![1]);
399        let right = StreamElement::new(vec![3, 4], Utc::now()).with_key(vec![1]);
400
401        cogroup
402            .add_left(left)
403            .await
404            .expect("add_left should succeed in test");
405        cogroup
406            .add_right(right)
407            .await
408            .expect("add_right should succeed in test");
409
410        let (left_elems, right_elems) = cogroup.get_results(&[1]).await;
411        assert_eq!(left_elems.len(), 1);
412        assert_eq!(right_elems.len(), 1);
413    }
414
415    #[tokio::test]
416    async fn test_interval_join() {
417        let join = IntervalJoin::new(0, 10);
418
419        let left = StreamElement::new(vec![1, 2], Utc::now()).with_key(vec![1]);
420        let right_time = Utc::now() + chrono::Duration::seconds(5);
421        let right = StreamElement::new(vec![3, 4], right_time).with_key(vec![1]);
422
423        join.process_left(left)
424            .await
425            .expect("process_left should succeed in test");
426        let results = join
427            .process_right(right)
428            .await
429            .expect("process_right should succeed in test");
430
431        assert!(!results.is_empty());
432    }
433}