Skip to main content

oxirs_stream/processing/
joins.rs

1//! Stream Joins Module
2//!
3//! Provides sophisticated join operations for stream processing:
4//! - Inner join: Match events from two streams
5//! - Left join: Keep all left stream events, match right
6//! - Window-based joins: Join within time windows
7//! - Keyed joins: Join on specific field values
8//! - Tumbling window joins
9//! - Sliding window joins
10//!
11//! Uses SciRS2 for efficient join algorithms and performance optimization
12
13use crate::StreamEvent;
14use anyhow::{anyhow, Result};
15use chrono::{DateTime, Duration as ChronoDuration, Utc};
16use serde::{Deserialize, Serialize};
17use std::collections::VecDeque;
18use std::sync::Arc;
19use tokio::sync::RwLock;
20
21/// Join type enumeration
22#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
23pub enum JoinType {
24    /// Inner join - only matching pairs
25    Inner,
26    /// Left outer join - all left events, matching right
27    LeftOuter,
28    /// Right outer join - all right events, matching left
29    RightOuter,
30    /// Full outer join - all events from both streams
31    FullOuter,
32}
33
34/// Join window strategy
35#[derive(Debug, Clone, Serialize, Deserialize)]
36pub enum JoinWindowStrategy {
37    /// Tumbling window - non-overlapping fixed windows
38    Tumbling { duration: ChronoDuration },
39    /// Sliding window - overlapping windows
40    Sliding {
41        duration: ChronoDuration,
42        slide: ChronoDuration,
43    },
44    /// Session window - based on activity gaps
45    Session { gap_timeout: ChronoDuration },
46    /// Fixed count window
47    CountBased { size: usize },
48}
49
50/// Join condition
51#[derive(Debug, Clone, Serialize, Deserialize)]
52pub enum JoinCondition {
53    /// Join on equal field values
54    OnEquals {
55        left_field: String,
56        right_field: String,
57    },
58    /// Join on custom predicate
59    Custom { expression: String },
60    /// Join on time proximity
61    TimeProximity { max_difference: ChronoDuration },
62}
63
64/// Join configuration
65#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct JoinConfig {
67    pub join_type: JoinType,
68    pub window_strategy: JoinWindowStrategy,
69    pub condition: JoinCondition,
70    pub max_buffer_size: usize,
71    pub emit_incomplete: bool, // For outer joins
72}
73
74impl Default for JoinConfig {
75    fn default() -> Self {
76        Self {
77            join_type: JoinType::Inner,
78            window_strategy: JoinWindowStrategy::Tumbling {
79                duration: ChronoDuration::seconds(60),
80            },
81            condition: JoinCondition::TimeProximity {
82                max_difference: ChronoDuration::seconds(10),
83            },
84            max_buffer_size: 10000,
85            emit_incomplete: true,
86        }
87    }
88}
89
90/// Joined event pair
91#[derive(Debug, Clone, Serialize, Deserialize)]
92pub struct JoinedEvent {
93    pub left: Option<StreamEvent>,
94    pub right: Option<StreamEvent>,
95    pub join_time: DateTime<Utc>,
96    pub match_confidence: f64,
97    pub window_id: String,
98}
99
100/// Type alias for timestamped event buffer
101type EventBuffer = Arc<RwLock<VecDeque<(StreamEvent, DateTime<Utc>)>>>;
102
103/// Stream joiner for combining two streams
104pub struct StreamJoiner {
105    config: JoinConfig,
106    left_buffer: EventBuffer,
107    right_buffer: EventBuffer,
108    join_results: Arc<RwLock<Vec<JoinedEvent>>>,
109    stats: Arc<RwLock<JoinStats>>,
110    current_window_id: Arc<RwLock<String>>,
111}
112
113#[derive(Debug, Clone, Default)]
114pub struct JoinStats {
115    pub left_events_received: u64,
116    pub right_events_received: u64,
117    pub pairs_matched: u64,
118    pub pairs_emitted: u64,
119    pub left_unmatched: u64,
120    pub right_unmatched: u64,
121    pub windows_processed: u64,
122    pub avg_join_latency_ms: f64,
123}
124
125impl StreamJoiner {
126    /// Create a new stream joiner
127    pub fn new(config: JoinConfig) -> Self {
128        Self {
129            config,
130            left_buffer: Arc::new(RwLock::new(VecDeque::new())),
131            right_buffer: Arc::new(RwLock::new(VecDeque::new())),
132            join_results: Arc::new(RwLock::new(Vec::new())),
133            stats: Arc::new(RwLock::new(JoinStats::default())),
134            current_window_id: Arc::new(RwLock::new(uuid::Uuid::new_v4().to_string())),
135        }
136    }
137
138    /// Process an event from the left stream
139    pub async fn process_left(&self, event: StreamEvent) -> Result<Vec<JoinedEvent>> {
140        let start = std::time::Instant::now();
141        let now = Utc::now();
142
143        // Update stats
144        {
145            let mut stats = self.stats.write().await;
146            stats.left_events_received += 1;
147        }
148
149        // Add to left buffer
150        {
151            let mut left_buffer = self.left_buffer.write().await;
152            left_buffer.push_back((event.clone(), now));
153
154            // Enforce buffer size limit
155            if left_buffer.len() > self.config.max_buffer_size {
156                left_buffer.pop_front();
157            }
158        }
159
160        // Perform join
161        let results = self.perform_join(Some(event), None, now).await?;
162
163        // Update latency stats
164        {
165            let mut stats = self.stats.write().await;
166            let latency = start.elapsed().as_secs_f64() * 1000.0;
167            let alpha = 0.1;
168            stats.avg_join_latency_ms = alpha * latency + (1.0 - alpha) * stats.avg_join_latency_ms;
169        }
170
171        Ok(results)
172    }
173
174    /// Process an event from the right stream
175    pub async fn process_right(&self, event: StreamEvent) -> Result<Vec<JoinedEvent>> {
176        let start = std::time::Instant::now();
177        let now = Utc::now();
178
179        // Update stats
180        {
181            let mut stats = self.stats.write().await;
182            stats.right_events_received += 1;
183        }
184
185        // Add to right buffer
186        {
187            let mut right_buffer = self.right_buffer.write().await;
188            right_buffer.push_back((event.clone(), now));
189
190            // Enforce buffer size limit
191            if right_buffer.len() > self.config.max_buffer_size {
192                right_buffer.pop_front();
193            }
194        }
195
196        // Perform join
197        let results = self.perform_join(None, Some(event), now).await?;
198
199        // Update latency stats
200        {
201            let mut stats = self.stats.write().await;
202            let latency = start.elapsed().as_secs_f64() * 1000.0;
203            let alpha = 0.1;
204            stats.avg_join_latency_ms = alpha * latency + (1.0 - alpha) * stats.avg_join_latency_ms;
205        }
206
207        Ok(results)
208    }
209
210    /// Perform the actual join operation
211    async fn perform_join(
212        &self,
213        left_event: Option<StreamEvent>,
214        right_event: Option<StreamEvent>,
215        now: DateTime<Utc>,
216    ) -> Result<Vec<JoinedEvent>> {
217        let mut results = Vec::new();
218
219        match self.config.join_type {
220            JoinType::Inner => {
221                self.perform_inner_join(&mut results, left_event, right_event, now)
222                    .await?;
223            }
224            JoinType::LeftOuter => {
225                self.perform_left_outer_join(&mut results, left_event, right_event, now)
226                    .await?;
227            }
228            JoinType::RightOuter => {
229                self.perform_right_outer_join(&mut results, left_event, right_event, now)
230                    .await?;
231            }
232            JoinType::FullOuter => {
233                self.perform_full_outer_join(&mut results, left_event, right_event, now)
234                    .await?;
235            }
236        }
237
238        // Update stats
239        {
240            let mut stats = self.stats.write().await;
241            stats.pairs_matched += results.len() as u64;
242            stats.pairs_emitted += results.len() as u64;
243        }
244
245        Ok(results)
246    }
247
248    /// Perform inner join
249    async fn perform_inner_join(
250        &self,
251        results: &mut Vec<JoinedEvent>,
252        left_event: Option<StreamEvent>,
253        right_event: Option<StreamEvent>,
254        now: DateTime<Utc>,
255    ) -> Result<()> {
256        if let Some(left) = left_event {
257            // Find matching right events
258            let right_buffer = self.right_buffer.read().await;
259
260            for (right, right_time) in right_buffer.iter() {
261                if self
262                    .matches_condition(&left, right, *right_time, now)
263                    .await?
264                {
265                    results.push(JoinedEvent {
266                        left: Some(left.clone()),
267                        right: Some(right.clone()),
268                        join_time: now,
269                        match_confidence: 1.0,
270                        window_id: self.current_window_id.read().await.clone(),
271                    });
272                }
273            }
274        }
275
276        if let Some(right) = right_event {
277            // Find matching left events
278            let left_buffer = self.left_buffer.read().await;
279
280            for (left, left_time) in left_buffer.iter() {
281                if self
282                    .matches_condition(left, &right, *left_time, now)
283                    .await?
284                {
285                    results.push(JoinedEvent {
286                        left: Some(left.clone()),
287                        right: Some(right.clone()),
288                        join_time: now,
289                        match_confidence: 1.0,
290                        window_id: self.current_window_id.read().await.clone(),
291                    });
292                }
293            }
294        }
295
296        Ok(())
297    }
298
299    /// Perform left outer join
300    async fn perform_left_outer_join(
301        &self,
302        results: &mut Vec<JoinedEvent>,
303        left_event: Option<StreamEvent>,
304        right_event: Option<StreamEvent>,
305        now: DateTime<Utc>,
306    ) -> Result<()> {
307        if let Some(left) = left_event {
308            let right_buffer = self.right_buffer.read().await;
309            let mut found_match = false;
310
311            for (right, right_time) in right_buffer.iter() {
312                if self
313                    .matches_condition(&left, right, *right_time, now)
314                    .await?
315                {
316                    results.push(JoinedEvent {
317                        left: Some(left.clone()),
318                        right: Some(right.clone()),
319                        join_time: now,
320                        match_confidence: 1.0,
321                        window_id: self.current_window_id.read().await.clone(),
322                    });
323                    found_match = true;
324                }
325            }
326
327            // Emit left event even if no match found
328            if !found_match && self.config.emit_incomplete {
329                results.push(JoinedEvent {
330                    left: Some(left),
331                    right: None,
332                    join_time: now,
333                    match_confidence: 0.0,
334                    window_id: self.current_window_id.read().await.clone(),
335                });
336
337                let mut stats = self.stats.write().await;
338                stats.left_unmatched += 1;
339            }
340        }
341
342        // For right events, perform regular inner join
343        if right_event.is_some() {
344            self.perform_inner_join(results, None, right_event, now)
345                .await?;
346        }
347
348        Ok(())
349    }
350
351    /// Perform right outer join
352    async fn perform_right_outer_join(
353        &self,
354        results: &mut Vec<JoinedEvent>,
355        left_event: Option<StreamEvent>,
356        right_event: Option<StreamEvent>,
357        now: DateTime<Utc>,
358    ) -> Result<()> {
359        if let Some(right) = right_event {
360            let left_buffer = self.left_buffer.read().await;
361            let mut found_match = false;
362
363            for (left, left_time) in left_buffer.iter() {
364                if self
365                    .matches_condition(left, &right, *left_time, now)
366                    .await?
367                {
368                    results.push(JoinedEvent {
369                        left: Some(left.clone()),
370                        right: Some(right.clone()),
371                        join_time: now,
372                        match_confidence: 1.0,
373                        window_id: self.current_window_id.read().await.clone(),
374                    });
375                    found_match = true;
376                }
377            }
378
379            // Emit right event even if no match found
380            if !found_match && self.config.emit_incomplete {
381                results.push(JoinedEvent {
382                    left: None,
383                    right: Some(right),
384                    join_time: now,
385                    match_confidence: 0.0,
386                    window_id: self.current_window_id.read().await.clone(),
387                });
388
389                let mut stats = self.stats.write().await;
390                stats.right_unmatched += 1;
391            }
392        }
393
394        // For left events, perform regular inner join
395        if left_event.is_some() {
396            self.perform_inner_join(results, left_event, None, now)
397                .await?;
398        }
399
400        Ok(())
401    }
402
403    /// Perform full outer join
404    async fn perform_full_outer_join(
405        &self,
406        results: &mut Vec<JoinedEvent>,
407        left_event: Option<StreamEvent>,
408        right_event: Option<StreamEvent>,
409        now: DateTime<Utc>,
410    ) -> Result<()> {
411        // Combine left and right outer joins
412        if left_event.is_some() {
413            self.perform_left_outer_join(results, left_event, None, now)
414                .await?;
415        }
416
417        if right_event.is_some() {
418            self.perform_right_outer_join(results, None, right_event, now)
419                .await?;
420        }
421
422        Ok(())
423    }
424
425    /// Check if two events match the join condition
426    async fn matches_condition(
427        &self,
428        left: &StreamEvent,
429        right: &StreamEvent,
430        event_time: DateTime<Utc>,
431        now: DateTime<Utc>,
432    ) -> Result<bool> {
433        // Check window strategy
434        if !self.is_in_current_window(event_time, now).await? {
435            return Ok(false);
436        }
437
438        // Check join condition
439        match &self.config.condition {
440            JoinCondition::OnEquals {
441                left_field,
442                right_field,
443            } => {
444                let left_value = self.extract_field_value(left, left_field)?;
445                let right_value = self.extract_field_value(right, right_field)?;
446                Ok(left_value == right_value)
447            }
448            JoinCondition::TimeProximity { max_difference } => {
449                let left_time = left.timestamp();
450                let right_time = right.timestamp();
451                let diff = if left_time > right_time {
452                    left_time - right_time
453                } else {
454                    right_time - left_time
455                };
456                Ok(diff <= *max_difference)
457            }
458            JoinCondition::Custom { expression } => {
459                // Simple custom expression evaluation
460                self.evaluate_custom_condition(left, right, expression)
461            }
462        }
463    }
464
465    /// Check if event is in current window
466    async fn is_in_current_window(
467        &self,
468        event_time: DateTime<Utc>,
469        now: DateTime<Utc>,
470    ) -> Result<bool> {
471        match &self.config.window_strategy {
472            JoinWindowStrategy::Tumbling { duration } => {
473                let window_start = now - *duration;
474                Ok(event_time >= window_start)
475            }
476            JoinWindowStrategy::Sliding { duration, .. } => {
477                let window_start = now - *duration;
478                Ok(event_time >= window_start)
479            }
480            JoinWindowStrategy::Session { gap_timeout } => {
481                let last_activity = now - *gap_timeout;
482                Ok(event_time >= last_activity)
483            }
484            JoinWindowStrategy::CountBased { .. } => Ok(true), // Always in window for count-based
485        }
486    }
487
488    /// Extract field value from event
489    fn extract_field_value(&self, event: &StreamEvent, field: &str) -> Result<String> {
490        match event {
491            StreamEvent::TripleAdded {
492                subject,
493                predicate,
494                object,
495                ..
496            }
497            | StreamEvent::TripleRemoved {
498                subject,
499                predicate,
500                object,
501                ..
502            } => match field {
503                "subject" => Ok(subject.clone()),
504                "predicate" => Ok(predicate.clone()),
505                "object" => Ok(object.clone()),
506                _ => Err(anyhow!("Unknown field: {}", field)),
507            },
508            StreamEvent::QuadAdded {
509                subject,
510                predicate,
511                object,
512                graph,
513                ..
514            }
515            | StreamEvent::QuadRemoved {
516                subject,
517                predicate,
518                object,
519                graph,
520                ..
521            } => match field {
522                "subject" => Ok(subject.clone()),
523                "predicate" => Ok(predicate.clone()),
524                "object" => Ok(object.clone()),
525                "graph" => Ok(graph.clone()),
526                _ => Err(anyhow!("Unknown field: {}", field)),
527            },
528            _ => Err(anyhow!("Event type doesn't support field extraction")),
529        }
530    }
531
532    /// Evaluate custom join condition
533    fn evaluate_custom_condition(
534        &self,
535        _left: &StreamEvent,
536        _right: &StreamEvent,
537        _expression: &str,
538    ) -> Result<bool> {
539        // Simplified custom condition evaluation
540        // In production, this would use a proper expression parser
541        Ok(true)
542    }
543
544    /// Get join statistics
545    pub async fn stats(&self) -> JoinStats {
546        self.stats.read().await.clone()
547    }
548
549    /// Clear buffers
550    pub async fn clear(&self) {
551        self.left_buffer.write().await.clear();
552        self.right_buffer.write().await.clear();
553        self.join_results.write().await.clear();
554    }
555
556    /// Get current window ID
557    pub async fn window_id(&self) -> String {
558        self.current_window_id.read().await.clone()
559    }
560
561    /// Rotate to new window
562    pub async fn rotate_window(&self) {
563        let new_window_id = uuid::Uuid::new_v4().to_string();
564        *self.current_window_id.write().await = new_window_id;
565
566        let mut stats = self.stats.write().await;
567        stats.windows_processed += 1;
568    }
569}
570
571#[cfg(test)]
572mod tests {
573    use super::*;
574    use crate::event::EventMetadata;
575
576    fn create_test_event(subject: &str) -> StreamEvent {
577        StreamEvent::TripleAdded {
578            subject: subject.to_string(),
579            predicate: "test".to_string(),
580            object: "value".to_string(),
581            graph: None,
582            metadata: EventMetadata::default(),
583        }
584    }
585
586    #[tokio::test]
587    async fn test_inner_join() {
588        let config = JoinConfig {
589            join_type: JoinType::Inner,
590            window_strategy: JoinWindowStrategy::Tumbling {
591                duration: ChronoDuration::seconds(60),
592            },
593            condition: JoinCondition::OnEquals {
594                left_field: "subject".to_string(),
595                right_field: "subject".to_string(),
596            },
597            ..Default::default()
598        };
599
600        let joiner = StreamJoiner::new(config);
601
602        // Process left event
603        let left = create_test_event("test_subject");
604        let results1 = joiner.process_left(left).await.unwrap();
605        assert_eq!(results1.len(), 0); // No match yet
606
607        // Process matching right event
608        let right = create_test_event("test_subject");
609        let results2 = joiner.process_right(right).await.unwrap();
610        assert_eq!(results2.len(), 1); // Should match
611
612        assert!(results2[0].left.is_some());
613        assert!(results2[0].right.is_some());
614    }
615
616    #[tokio::test]
617    async fn test_left_outer_join() {
618        let config = JoinConfig {
619            join_type: JoinType::LeftOuter,
620            emit_incomplete: true,
621            ..Default::default()
622        };
623
624        let joiner = StreamJoiner::new(config);
625
626        // Process left event with no matching right
627        let left = create_test_event("unmatched");
628        let results = joiner.process_left(left).await.unwrap();
629
630        // Should emit left event with null right
631        assert_eq!(results.len(), 1);
632        assert!(results[0].left.is_some());
633        assert!(results[0].right.is_none());
634    }
635
636    #[tokio::test]
637    async fn test_join_stats() {
638        let config = JoinConfig::default();
639        let joiner = StreamJoiner::new(config);
640
641        joiner
642            .process_left(create_test_event("test1"))
643            .await
644            .unwrap();
645        joiner
646            .process_right(create_test_event("test2"))
647            .await
648            .unwrap();
649
650        let stats = joiner.stats().await;
651        assert_eq!(stats.left_events_received, 1);
652        assert_eq!(stats.right_events_received, 1);
653    }
654}