Skip to main content

oxirs_stream/
temporal_join.rs

1//! # Temporal Joins for Stream Processing
2//!
3//! Advanced temporal join operations supporting event-time and processing-time semantics
4//! with watermarks, late data handling, and various join strategies.
5
6use anyhow::{anyhow, Result};
7use chrono::{DateTime, Duration as ChronoDuration, Utc};
8use serde::{Deserialize, Serialize};
9use std::collections::VecDeque;
10use std::sync::Arc;
11use tokio::sync::RwLock;
12use tracing::{debug, warn};
13
14use crate::event::StreamEvent;
15
16/// Temporal join configuration
17#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct TemporalJoinConfig {
19    /// Join type
20    pub join_type: TemporalJoinType,
21    /// Time semantics
22    pub time_semantics: TimeSemantics,
23    /// Join window configuration
24    pub window: TemporalWindow,
25    /// Watermark configuration
26    pub watermark: WatermarkConfig,
27    /// Late data handling
28    pub late_data: LateDataConfig,
29}
30
31impl Default for TemporalJoinConfig {
32    fn default() -> Self {
33        Self {
34            join_type: TemporalJoinType::Inner,
35            time_semantics: TimeSemantics::EventTime,
36            window: TemporalWindow::default(),
37            watermark: WatermarkConfig::default(),
38            late_data: LateDataConfig::default(),
39        }
40    }
41}
42
43/// Temporal join types
44#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
45pub enum TemporalJoinType {
46    /// Inner temporal join
47    Inner,
48    /// Left temporal join
49    Left,
50    /// Right temporal join
51    Right,
52    /// Full outer temporal join
53    FullOuter,
54    /// Interval join
55    Interval,
56}
57
58/// Time semantics for temporal operations
59#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
60pub enum TimeSemantics {
61    /// Event time (based on event timestamps)
62    EventTime,
63    /// Processing time (based on system clock)
64    ProcessingTime,
65    /// Ingestion time (based on arrival time)
66    IngestionTime,
67}
68
69/// Temporal window configuration
70#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct TemporalWindow {
72    /// Lower bound offset (negative duration before event)
73    pub lower_bound: ChronoDuration,
74    /// Upper bound offset (positive duration after event)
75    pub upper_bound: ChronoDuration,
76    /// Allow exact timestamp matches
77    pub allow_exact: bool,
78}
79
80impl Default for TemporalWindow {
81    fn default() -> Self {
82        Self {
83            lower_bound: ChronoDuration::minutes(-5),
84            upper_bound: ChronoDuration::minutes(5),
85            allow_exact: true,
86        }
87    }
88}
89
90/// Watermark configuration
91#[derive(Debug, Clone, Serialize, Deserialize)]
92pub struct WatermarkConfig {
93    /// Watermark strategy
94    pub strategy: WatermarkStrategy,
95    /// Maximum allowed lateness
96    pub max_lateness: ChronoDuration,
97    /// Emit watermarks periodically
98    pub periodic_emit: bool,
99    /// Periodic emit interval
100    pub emit_interval: ChronoDuration,
101}
102
103impl Default for WatermarkConfig {
104    fn default() -> Self {
105        Self {
106            strategy: WatermarkStrategy::BoundedOutOfOrder {
107                max_delay: ChronoDuration::seconds(10),
108            },
109            max_lateness: ChronoDuration::minutes(1),
110            periodic_emit: true,
111            emit_interval: ChronoDuration::seconds(1),
112        }
113    }
114}
115
116/// Watermark strategies
117#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
118pub enum WatermarkStrategy {
119    /// Ascending timestamps (no out-of-order)
120    Ascending,
121    /// Bounded out-of-order with maximum delay
122    BoundedOutOfOrder { max_delay: ChronoDuration },
123    /// Periodic watermarks
124    Periodic { interval: ChronoDuration },
125    /// Custom watermark generator
126    Custom,
127}
128
129/// Late data handling configuration
130#[derive(Debug, Clone, Serialize, Deserialize)]
131pub struct LateDataConfig {
132    /// Strategy for handling late data
133    pub strategy: LateDataStrategy,
134    /// Side output for late data
135    pub side_output_enabled: bool,
136}
137
138impl Default for LateDataConfig {
139    fn default() -> Self {
140        Self {
141            strategy: LateDataStrategy::Drop,
142            side_output_enabled: true,
143        }
144    }
145}
146
147/// Late data strategies
148#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
149pub enum LateDataStrategy {
150    /// Drop late data
151    Drop,
152    /// Emit late data with special marker
153    EmitWithMarker,
154    /// Reprocess affected windows
155    ReprocessWindows,
156}
157
158/// Temporal join operator
159pub struct TemporalJoin {
160    config: TemporalJoinConfig,
161    left_buffer: Arc<RwLock<EventBuffer>>,
162    right_buffer: Arc<RwLock<EventBuffer>>,
163    watermarks: Arc<RwLock<Watermarks>>,
164    metrics: Arc<RwLock<TemporalJoinMetrics>>,
165}
166
167/// Event buffer for temporal join
168#[derive(Debug)]
169struct EventBuffer {
170    events: VecDeque<TimestampedEvent>,
171    max_size: usize,
172}
173
174/// Timestamped event wrapper
175#[derive(Debug, Clone)]
176struct TimestampedEvent {
177    event: StreamEvent,
178    event_time: DateTime<Utc>,
179    processing_time: DateTime<Utc>,
180}
181
182impl EventBuffer {
183    fn new(max_size: usize) -> Self {
184        Self {
185            events: VecDeque::new(),
186            max_size,
187        }
188    }
189
190    fn add_event(&mut self, event: TimestampedEvent) {
191        if self.events.len() >= self.max_size {
192            self.events.pop_front();
193        }
194        self.events.push_back(event);
195    }
196
197    fn get_events_in_window(
198        &self,
199        timestamp: DateTime<Utc>,
200        window: &TemporalWindow,
201    ) -> Vec<TimestampedEvent> {
202        let lower = timestamp + window.lower_bound;
203        let upper = timestamp + window.upper_bound;
204
205        self.events
206            .iter()
207            .filter(|e| {
208                let t = e.event_time;
209                (t > lower && t < upper) || (window.allow_exact && t == timestamp)
210            })
211            .cloned()
212            .collect()
213    }
214
215    fn purge_before_watermark(&mut self, watermark: DateTime<Utc>) {
216        while let Some(event) = self.events.front() {
217            if event.event_time < watermark {
218                self.events.pop_front();
219            } else {
220                break;
221            }
222        }
223    }
224}
225
226/// Watermark tracking
227#[derive(Debug, Clone)]
228struct Watermarks {
229    left_watermark: Option<DateTime<Utc>>,
230    right_watermark: Option<DateTime<Utc>>,
231}
232
233impl Watermarks {
234    fn new() -> Self {
235        Self {
236            left_watermark: None,
237            right_watermark: None,
238        }
239    }
240
241    fn update_left(&mut self, watermark: DateTime<Utc>) {
242        self.left_watermark = Some(watermark);
243    }
244
245    fn update_right(&mut self, watermark: DateTime<Utc>) {
246        self.right_watermark = Some(watermark);
247    }
248
249    fn min_watermark(&self) -> Option<DateTime<Utc>> {
250        match (self.left_watermark, self.right_watermark) {
251            (Some(l), Some(r)) => Some(l.min(r)),
252            (Some(l), None) => Some(l),
253            (None, Some(r)) => Some(r),
254            (None, None) => None,
255        }
256    }
257}
258
259/// Temporal join metrics
260#[derive(Debug, Clone, Default, Serialize, Deserialize)]
261pub struct TemporalJoinMetrics {
262    /// Total left events processed
263    pub left_events_processed: u64,
264    /// Total right events processed
265    pub right_events_processed: u64,
266    /// Total join matches
267    pub join_matches: u64,
268    /// Late events dropped
269    pub late_events_dropped: u64,
270    /// Watermarks emitted
271    pub watermarks_emitted: u64,
272    /// Average join latency (ms)
273    pub avg_join_latency_ms: f64,
274}
275
276impl TemporalJoin {
277    /// Create a new temporal join operator
278    pub fn new(config: TemporalJoinConfig) -> Self {
279        Self {
280            config,
281            left_buffer: Arc::new(RwLock::new(EventBuffer::new(10000))),
282            right_buffer: Arc::new(RwLock::new(EventBuffer::new(10000))),
283            watermarks: Arc::new(RwLock::new(Watermarks::new())),
284            metrics: Arc::new(RwLock::new(TemporalJoinMetrics::default())),
285        }
286    }
287
288    /// Process left stream event
289    pub async fn process_left(&self, event: StreamEvent) -> Result<Vec<JoinResult>> {
290        let start_time = std::time::Instant::now();
291
292        let timestamped = self.create_timestamped_event(event).await?;
293
294        // Check for late data
295        if self.is_late_event(&timestamped, true).await {
296            return self.handle_late_event(timestamped, true).await;
297        }
298
299        // Add to buffer
300        self.left_buffer
301            .write()
302            .await
303            .add_event(timestamped.clone());
304
305        // Perform join
306        let results = self.join_with_right(&timestamped).await?;
307
308        // Update metrics
309        {
310            let mut metrics = self.metrics.write().await;
311            metrics.left_events_processed += 1;
312            metrics.join_matches += results.len() as u64;
313            let latency = start_time.elapsed().as_millis() as f64;
314            metrics.avg_join_latency_ms = (metrics.avg_join_latency_ms + latency) / 2.0;
315        }
316
317        // Update watermark
318        self.update_watermark(&timestamped, true).await;
319
320        debug!("Processed left event, found {} matches", results.len());
321        Ok(results)
322    }
323
324    /// Process right stream event
325    pub async fn process_right(&self, event: StreamEvent) -> Result<Vec<JoinResult>> {
326        let start_time = std::time::Instant::now();
327
328        let timestamped = self.create_timestamped_event(event).await?;
329
330        // Check for late data
331        if self.is_late_event(&timestamped, false).await {
332            return self.handle_late_event(timestamped, false).await;
333        }
334
335        // Add to buffer
336        self.right_buffer
337            .write()
338            .await
339            .add_event(timestamped.clone());
340
341        // Perform join
342        let results = self.join_with_left(&timestamped).await?;
343
344        // Update metrics
345        {
346            let mut metrics = self.metrics.write().await;
347            metrics.right_events_processed += 1;
348            metrics.join_matches += results.len() as u64;
349            let latency = start_time.elapsed().as_millis() as f64;
350            metrics.avg_join_latency_ms = (metrics.avg_join_latency_ms + latency) / 2.0;
351        }
352
353        // Update watermark
354        self.update_watermark(&timestamped, false).await;
355
356        debug!("Processed right event, found {} matches", results.len());
357        Ok(results)
358    }
359
360    /// Create timestamped event
361    async fn create_timestamped_event(&self, event: StreamEvent) -> Result<TimestampedEvent> {
362        let event_time = match self.config.time_semantics {
363            TimeSemantics::EventTime => self.extract_event_time(&event)?,
364            TimeSemantics::ProcessingTime => Utc::now(),
365            TimeSemantics::IngestionTime => Utc::now(),
366        };
367
368        Ok(TimestampedEvent {
369            event,
370            event_time,
371            processing_time: Utc::now(),
372        })
373    }
374
375    /// Extract event time from event
376    fn extract_event_time(&self, event: &StreamEvent) -> Result<DateTime<Utc>> {
377        match event {
378            StreamEvent::TripleAdded { metadata, .. } => Ok(metadata.timestamp),
379            StreamEvent::TripleRemoved { metadata, .. } => Ok(metadata.timestamp),
380            StreamEvent::GraphCreated { metadata, .. } => Ok(metadata.timestamp),
381            StreamEvent::GraphDeleted { metadata, .. } => Ok(metadata.timestamp),
382            StreamEvent::TransactionBegin { metadata, .. } => Ok(metadata.timestamp),
383            StreamEvent::TransactionCommit { metadata, .. } => Ok(metadata.timestamp),
384            StreamEvent::TransactionAbort { metadata, .. } => Ok(metadata.timestamp),
385            _ => Err(anyhow!("Cannot extract event time from event")),
386        }
387    }
388
389    /// Check if event is late
390    async fn is_late_event(&self, event: &TimestampedEvent, is_left: bool) -> bool {
391        let watermarks = self.watermarks.read().await;
392        let watermark = if is_left {
393            watermarks.left_watermark
394        } else {
395            watermarks.right_watermark
396        };
397
398        if let Some(wm) = watermark {
399            event.event_time < wm - self.config.watermark.max_lateness
400        } else {
401            false
402        }
403    }
404
405    /// Handle late event
406    async fn handle_late_event(
407        &self,
408        _event: TimestampedEvent,
409        _is_left: bool,
410    ) -> Result<Vec<JoinResult>> {
411        match self.config.late_data.strategy {
412            LateDataStrategy::Drop => {
413                self.metrics.write().await.late_events_dropped += 1;
414                warn!("Dropped late event");
415                Ok(Vec::new())
416            }
417            LateDataStrategy::EmitWithMarker => {
418                // Emit with late marker
419                Ok(Vec::new())
420            }
421            LateDataStrategy::ReprocessWindows => {
422                // Reprocess affected windows
423                Ok(Vec::new())
424            }
425        }
426    }
427
428    /// Join with right buffer
429    async fn join_with_right(&self, left_event: &TimestampedEvent) -> Result<Vec<JoinResult>> {
430        let right_buffer = self.right_buffer.read().await;
431        let matches = right_buffer.get_events_in_window(left_event.event_time, &self.config.window);
432
433        let results = matches
434            .into_iter()
435            .map(|right_event| JoinResult {
436                left_event: left_event.event.clone(),
437                right_event: Some(right_event.event),
438                join_time: Utc::now(),
439                time_diff: (right_event.event_time - left_event.event_time).num_milliseconds(),
440            })
441            .collect();
442
443        Ok(results)
444    }
445
446    /// Join with left buffer
447    async fn join_with_left(&self, right_event: &TimestampedEvent) -> Result<Vec<JoinResult>> {
448        let left_buffer = self.left_buffer.read().await;
449        let matches = left_buffer.get_events_in_window(right_event.event_time, &self.config.window);
450
451        let results = matches
452            .into_iter()
453            .map(|left_event| JoinResult {
454                left_event: left_event.event,
455                right_event: Some(right_event.event.clone()),
456                join_time: Utc::now(),
457                time_diff: (right_event.event_time - left_event.event_time).num_milliseconds(),
458            })
459            .collect();
460
461        Ok(results)
462    }
463
464    /// Update watermark
465    async fn update_watermark(&self, event: &TimestampedEvent, is_left: bool) {
466        let watermark = match self.config.watermark.strategy {
467            WatermarkStrategy::Ascending => event.event_time,
468            WatermarkStrategy::BoundedOutOfOrder { max_delay } => event.event_time - max_delay,
469            WatermarkStrategy::Periodic { .. } => {
470                // Handled by periodic task
471                return;
472            }
473            WatermarkStrategy::Custom => {
474                // Custom logic
475                event.event_time
476            }
477        };
478
479        let mut watermarks = self.watermarks.write().await;
480        if is_left {
481            watermarks.update_left(watermark);
482        } else {
483            watermarks.update_right(watermark);
484        }
485
486        self.metrics.write().await.watermarks_emitted += 1;
487
488        // Purge old events
489        if let Some(min_wm) = watermarks.min_watermark() {
490            drop(watermarks);
491            self.left_buffer
492                .write()
493                .await
494                .purge_before_watermark(min_wm);
495            self.right_buffer
496                .write()
497                .await
498                .purge_before_watermark(min_wm);
499        }
500    }
501
502    /// Get metrics
503    pub async fn get_metrics(&self) -> TemporalJoinMetrics {
504        self.metrics.read().await.clone()
505    }
506}
507
508/// Join result
509#[derive(Debug, Clone)]
510pub struct JoinResult {
511    /// Left stream event
512    pub left_event: StreamEvent,
513    /// Right stream event (None for outer joins)
514    pub right_event: Option<StreamEvent>,
515    /// Join timestamp
516    pub join_time: DateTime<Utc>,
517    /// Time difference between events (milliseconds)
518    pub time_diff: i64,
519}
520
521/// Interval join operator for asymmetric temporal joins
522pub struct IntervalJoin {
523    config: TemporalJoinConfig,
524    join: TemporalJoin,
525}
526
527impl IntervalJoin {
528    /// Create a new interval join
529    pub fn new(config: TemporalJoinConfig) -> Self {
530        let mut join_config = config.clone();
531        join_config.join_type = TemporalJoinType::Interval;
532
533        Self {
534            config,
535            join: TemporalJoin::new(join_config),
536        }
537    }
538
539    /// Process event with interval constraints
540    pub async fn process(
541        &self,
542        left_event: StreamEvent,
543        right_event: StreamEvent,
544    ) -> Result<Vec<JoinResult>> {
545        // Process both events
546        let left_results = self.join.process_left(left_event).await?;
547        let right_results = self.join.process_right(right_event).await?;
548
549        // Combine results
550        let mut all_results = left_results;
551        all_results.extend(right_results);
552
553        Ok(all_results)
554    }
555}
556
557#[cfg(test)]
558mod tests {
559    use super::*;
560    use crate::event::EventMetadata;
561    use std::collections::HashMap;
562
563    #[tokio::test]
564    async fn test_temporal_join_creation() {
565        let config = TemporalJoinConfig::default();
566        let join = TemporalJoin::new(config);
567        let metrics = join.get_metrics().await;
568        assert_eq!(metrics.left_events_processed, 0);
569    }
570
571    #[tokio::test]
572    async fn test_event_buffer() {
573        let mut buffer = EventBuffer::new(100);
574        let metadata = EventMetadata {
575            event_id: "test".to_string(),
576            timestamp: Utc::now(),
577            source: "test".to_string(),
578            user: None,
579            context: None,
580            caused_by: None,
581            version: "1.0".to_string(),
582            properties: HashMap::new(),
583            checksum: None,
584        };
585
586        let event = TimestampedEvent {
587            event: StreamEvent::GraphCreated {
588                graph: "test".to_string(),
589                metadata,
590            },
591            event_time: Utc::now(),
592            processing_time: Utc::now(),
593        };
594
595        buffer.add_event(event);
596        assert_eq!(buffer.events.len(), 1);
597    }
598
599    #[tokio::test]
600    async fn test_watermark_strategy() {
601        let strategy = WatermarkStrategy::BoundedOutOfOrder {
602            max_delay: ChronoDuration::seconds(5),
603        };
604
605        match strategy {
606            WatermarkStrategy::BoundedOutOfOrder { max_delay } => {
607                assert_eq!(max_delay, ChronoDuration::seconds(5));
608            }
609            _ => panic!("Wrong strategy"),
610        }
611    }
612}