oxigdal_streaming/transformations/
join.rs1use crate::core::stream::StreamElement;
4use crate::error::{Result, StreamingError};
5use chrono::Utc;
6use serde::{Deserialize, Serialize};
7use std::collections::{HashMap, VecDeque};
8use std::sync::Arc;
9use tokio::sync::RwLock;
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
13pub enum JoinType {
14 Inner,
16 LeftOuter,
18 RightOuter,
20 FullOuter,
22}
23
24#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct JoinConfig {
27 pub join_type: JoinType,
29
30 pub max_buffer_size: usize,
32
33 pub ttl_seconds: i64,
35
36 pub enable_cleanup: bool,
38}
39
40impl Default for JoinConfig {
41 fn default() -> Self {
42 Self {
43 join_type: JoinType::Inner,
44 max_buffer_size: 1000,
45 ttl_seconds: 300,
46 enable_cleanup: true,
47 }
48 }
49}
50
51pub struct JoinOperator {
53 config: JoinConfig,
54 left_buffer: Arc<RwLock<HashMap<Vec<u8>, VecDeque<StreamElement>>>>,
55 right_buffer: Arc<RwLock<HashMap<Vec<u8>, VecDeque<StreamElement>>>>,
56}
57
58impl JoinOperator {
59 pub fn new(config: JoinConfig) -> Self {
61 Self {
62 config,
63 left_buffer: Arc::new(RwLock::new(HashMap::new())),
64 right_buffer: Arc::new(RwLock::new(HashMap::new())),
65 }
66 }
67
68 pub async fn process_left(&self, element: StreamElement) -> Result<Vec<StreamElement>> {
70 let key = element
71 .key
72 .clone()
73 .ok_or_else(|| StreamingError::JoinError("Left element must have a key".to_string()))?;
74
75 let mut results = Vec::new();
76 let right_buffer = self.right_buffer.read().await;
77
78 if let Some(right_elements) = right_buffer.get(&key) {
79 for right_elem in right_elements {
80 let joined = self.join_elements(&element, right_elem)?;
81 results.push(joined);
82 }
83 }
84
85 drop(right_buffer);
86
87 if self.config.join_type == JoinType::LeftOuter && results.is_empty() {
88 results.push(element.clone());
89 }
90
91 let mut left_buffer = self.left_buffer.write().await;
92 let buffer = left_buffer.entry(key).or_insert_with(VecDeque::new);
93
94 if buffer.len() >= self.config.max_buffer_size {
95 buffer.pop_front();
96 }
97
98 buffer.push_back(element);
99
100 if self.config.enable_cleanup {
101 self.cleanup_expired_left(&mut left_buffer);
102 }
103
104 Ok(results)
105 }
106
107 pub async fn process_right(&self, element: StreamElement) -> Result<Vec<StreamElement>> {
109 let key = element.key.clone().ok_or_else(|| {
110 StreamingError::JoinError("Right element must have a key".to_string())
111 })?;
112
113 let mut results = Vec::new();
114 let left_buffer = self.left_buffer.read().await;
115
116 if let Some(left_elements) = left_buffer.get(&key) {
117 for left_elem in left_elements {
118 let joined = self.join_elements(left_elem, &element)?;
119 results.push(joined);
120 }
121 }
122
123 drop(left_buffer);
124
125 if self.config.join_type == JoinType::RightOuter && results.is_empty() {
126 results.push(element.clone());
127 }
128
129 let mut right_buffer = self.right_buffer.write().await;
130 let buffer = right_buffer.entry(key).or_insert_with(VecDeque::new);
131
132 if buffer.len() >= self.config.max_buffer_size {
133 buffer.pop_front();
134 }
135
136 buffer.push_back(element);
137
138 if self.config.enable_cleanup {
139 self.cleanup_expired_right(&mut right_buffer);
140 }
141
142 Ok(results)
143 }
144
145 fn join_elements(&self, left: &StreamElement, right: &StreamElement) -> Result<StreamElement> {
147 let mut joined_data = Vec::new();
148 joined_data.extend_from_slice(&left.data);
149 joined_data.extend_from_slice(&right.data);
150
151 Ok(StreamElement {
152 data: joined_data,
153 event_time: left.event_time.max(right.event_time),
154 processing_time: Utc::now(),
155 key: left.key.clone(),
156 metadata: left.metadata.clone(),
157 })
158 }
159
160 fn cleanup_expired_left(&self, buffer: &mut HashMap<Vec<u8>, VecDeque<StreamElement>>) {
162 let now = Utc::now();
163 let ttl_seconds = self.config.ttl_seconds;
164
165 for queue in buffer.values_mut() {
166 queue.retain(|elem| {
167 let age = now.signed_duration_since(elem.event_time);
168 age.num_seconds() < ttl_seconds
169 });
170 }
171 }
172
173 fn cleanup_expired_right(&self, buffer: &mut HashMap<Vec<u8>, VecDeque<StreamElement>>) {
175 let now = Utc::now();
176 let ttl_seconds = self.config.ttl_seconds;
177
178 for queue in buffer.values_mut() {
179 queue.retain(|elem| {
180 let age = now.signed_duration_since(elem.event_time);
181 age.num_seconds() < ttl_seconds
182 });
183 }
184 }
185
186 pub async fn clear(&self) {
188 self.left_buffer.write().await.clear();
189 self.right_buffer.write().await.clear();
190 }
191}
192
193pub struct CoGroupOperator {
195 left_buffer: Arc<RwLock<HashMap<Vec<u8>, Vec<StreamElement>>>>,
196 right_buffer: Arc<RwLock<HashMap<Vec<u8>, Vec<StreamElement>>>>,
197}
198
199impl CoGroupOperator {
200 pub fn new() -> Self {
202 Self {
203 left_buffer: Arc::new(RwLock::new(HashMap::new())),
204 right_buffer: Arc::new(RwLock::new(HashMap::new())),
205 }
206 }
207
208 pub async fn add_left(&self, element: StreamElement) -> Result<()> {
210 let key = element
211 .key
212 .clone()
213 .ok_or_else(|| StreamingError::JoinError("Element must have a key".to_string()))?;
214
215 let mut buffer = self.left_buffer.write().await;
216 buffer.entry(key).or_insert_with(Vec::new).push(element);
217
218 Ok(())
219 }
220
221 pub async fn add_right(&self, element: StreamElement) -> Result<()> {
223 let key = element
224 .key
225 .clone()
226 .ok_or_else(|| StreamingError::JoinError("Element must have a key".to_string()))?;
227
228 let mut buffer = self.right_buffer.write().await;
229 buffer.entry(key).or_insert_with(Vec::new).push(element);
230
231 Ok(())
232 }
233
234 pub async fn get_results(&self, key: &[u8]) -> (Vec<StreamElement>, Vec<StreamElement>) {
236 let left_buffer = self.left_buffer.read().await;
237 let right_buffer = self.right_buffer.read().await;
238
239 let left = left_buffer.get(key).cloned().unwrap_or_else(Vec::new);
240 let right = right_buffer.get(key).cloned().unwrap_or_else(Vec::new);
241
242 (left, right)
243 }
244
245 pub async fn clear(&self) {
247 self.left_buffer.write().await.clear();
248 self.right_buffer.write().await.clear();
249 }
250}
251
252impl Default for CoGroupOperator {
253 fn default() -> Self {
254 Self::new()
255 }
256}
257
258pub struct IntervalJoin {
260 lower_bound_seconds: i64,
261 upper_bound_seconds: i64,
262 left_buffer: Arc<RwLock<HashMap<Vec<u8>, VecDeque<StreamElement>>>>,
263 right_buffer: Arc<RwLock<HashMap<Vec<u8>, VecDeque<StreamElement>>>>,
264}
265
266impl IntervalJoin {
267 pub fn new(lower_bound_seconds: i64, upper_bound_seconds: i64) -> Self {
269 Self {
270 lower_bound_seconds,
271 upper_bound_seconds,
272 left_buffer: Arc::new(RwLock::new(HashMap::new())),
273 right_buffer: Arc::new(RwLock::new(HashMap::new())),
274 }
275 }
276
277 pub async fn process_left(&self, element: StreamElement) -> Result<Vec<StreamElement>> {
279 let key = element
280 .key
281 .clone()
282 .ok_or_else(|| StreamingError::JoinError("Element must have a key".to_string()))?;
283
284 let mut results = Vec::new();
285 let right_buffer = self.right_buffer.read().await;
286
287 if let Some(right_elements) = right_buffer.get(&key) {
288 for right_elem in right_elements {
289 if self.in_interval(&element, right_elem) {
290 let mut joined_data = Vec::new();
291 joined_data.extend_from_slice(&element.data);
292 joined_data.extend_from_slice(&right_elem.data);
293
294 results.push(StreamElement {
295 data: joined_data,
296 event_time: element.event_time.max(right_elem.event_time),
297 processing_time: Utc::now(),
298 key: Some(key.clone()),
299 metadata: element.metadata.clone(),
300 });
301 }
302 }
303 }
304
305 drop(right_buffer);
306
307 let mut left_buffer = self.left_buffer.write().await;
308 left_buffer
309 .entry(key)
310 .or_insert_with(VecDeque::new)
311 .push_back(element);
312
313 Ok(results)
314 }
315
316 pub async fn process_right(&self, element: StreamElement) -> Result<Vec<StreamElement>> {
318 let key = element
319 .key
320 .clone()
321 .ok_or_else(|| StreamingError::JoinError("Element must have a key".to_string()))?;
322
323 let mut results = Vec::new();
324 let left_buffer = self.left_buffer.read().await;
325
326 if let Some(left_elements) = left_buffer.get(&key) {
327 for left_elem in left_elements {
328 if self.in_interval(left_elem, &element) {
329 let mut joined_data = Vec::new();
330 joined_data.extend_from_slice(&left_elem.data);
331 joined_data.extend_from_slice(&element.data);
332
333 results.push(StreamElement {
334 data: joined_data,
335 event_time: left_elem.event_time.max(element.event_time),
336 processing_time: Utc::now(),
337 key: Some(key.clone()),
338 metadata: left_elem.metadata.clone(),
339 });
340 }
341 }
342 }
343
344 drop(left_buffer);
345
346 let mut right_buffer = self.right_buffer.write().await;
347 right_buffer
348 .entry(key)
349 .or_insert_with(VecDeque::new)
350 .push_back(element);
351
352 Ok(results)
353 }
354
355 fn in_interval(&self, left: &StreamElement, right: &StreamElement) -> bool {
357 let time_diff = right.event_time.signed_duration_since(left.event_time);
358 let time_diff_seconds = time_diff.num_seconds();
359 time_diff_seconds >= self.lower_bound_seconds
360 && time_diff_seconds <= self.upper_bound_seconds
361 }
362
363 pub async fn clear(&self) {
365 self.left_buffer.write().await.clear();
366 self.right_buffer.write().await.clear();
367 }
368}
369
370#[cfg(test)]
371mod tests {
372 use super::*;
373
374 #[tokio::test]
375 async fn test_join_operator() {
376 let config = JoinConfig::default();
377 let join = JoinOperator::new(config);
378
379 let left = StreamElement::new(vec![1, 2], Utc::now()).with_key(vec![1]);
380 let right = StreamElement::new(vec![3, 4], Utc::now()).with_key(vec![1]);
381
382 join.process_left(left)
383 .await
384 .expect("process_left should succeed in test");
385 let results = join
386 .process_right(right)
387 .await
388 .expect("process_right should succeed in test");
389
390 assert_eq!(results.len(), 1);
391 assert_eq!(results[0].data, vec![1, 2, 3, 4]);
392 }
393
394 #[tokio::test]
395 async fn test_cogroup_operator() {
396 let cogroup = CoGroupOperator::new();
397
398 let left = StreamElement::new(vec![1, 2], Utc::now()).with_key(vec![1]);
399 let right = StreamElement::new(vec![3, 4], Utc::now()).with_key(vec![1]);
400
401 cogroup
402 .add_left(left)
403 .await
404 .expect("add_left should succeed in test");
405 cogroup
406 .add_right(right)
407 .await
408 .expect("add_right should succeed in test");
409
410 let (left_elems, right_elems) = cogroup.get_results(&[1]).await;
411 assert_eq!(left_elems.len(), 1);
412 assert_eq!(right_elems.len(), 1);
413 }
414
415 #[tokio::test]
416 async fn test_interval_join() {
417 let join = IntervalJoin::new(0, 10);
418
419 let left = StreamElement::new(vec![1, 2], Utc::now()).with_key(vec![1]);
420 let right_time = Utc::now() + chrono::Duration::seconds(5);
421 let right = StreamElement::new(vec![3, 4], right_time).with_key(vec![1]);
422
423 join.process_left(left)
424 .await
425 .expect("process_left should succeed in test");
426 let results = join
427 .process_right(right)
428 .await
429 .expect("process_right should succeed in test");
430
431 assert!(!results.is_empty());
432 }
433}