1use anyhow::{anyhow, Result};
7use chrono::{DateTime, Duration as ChronoDuration, Utc};
8use serde::{Deserialize, Serialize};
9use std::collections::VecDeque;
10use std::sync::Arc;
11use tokio::sync::RwLock;
12use tracing::{debug, warn};
13
14use crate::event::StreamEvent;
15
16#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct TemporalJoinConfig {
19 pub join_type: TemporalJoinType,
21 pub time_semantics: TimeSemantics,
23 pub window: TemporalWindow,
25 pub watermark: WatermarkConfig,
27 pub late_data: LateDataConfig,
29}
30
31impl Default for TemporalJoinConfig {
32 fn default() -> Self {
33 Self {
34 join_type: TemporalJoinType::Inner,
35 time_semantics: TimeSemantics::EventTime,
36 window: TemporalWindow::default(),
37 watermark: WatermarkConfig::default(),
38 late_data: LateDataConfig::default(),
39 }
40 }
41}
42
43#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
45pub enum TemporalJoinType {
46 Inner,
48 Left,
50 Right,
52 FullOuter,
54 Interval,
56}
57
58#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
60pub enum TimeSemantics {
61 EventTime,
63 ProcessingTime,
65 IngestionTime,
67}
68
69#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct TemporalWindow {
72 pub lower_bound: ChronoDuration,
74 pub upper_bound: ChronoDuration,
76 pub allow_exact: bool,
78}
79
80impl Default for TemporalWindow {
81 fn default() -> Self {
82 Self {
83 lower_bound: ChronoDuration::minutes(-5),
84 upper_bound: ChronoDuration::minutes(5),
85 allow_exact: true,
86 }
87 }
88}
89
90#[derive(Debug, Clone, Serialize, Deserialize)]
92pub struct WatermarkConfig {
93 pub strategy: WatermarkStrategy,
95 pub max_lateness: ChronoDuration,
97 pub periodic_emit: bool,
99 pub emit_interval: ChronoDuration,
101}
102
103impl Default for WatermarkConfig {
104 fn default() -> Self {
105 Self {
106 strategy: WatermarkStrategy::BoundedOutOfOrder {
107 max_delay: ChronoDuration::seconds(10),
108 },
109 max_lateness: ChronoDuration::minutes(1),
110 periodic_emit: true,
111 emit_interval: ChronoDuration::seconds(1),
112 }
113 }
114}
115
116#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
118pub enum WatermarkStrategy {
119 Ascending,
121 BoundedOutOfOrder { max_delay: ChronoDuration },
123 Periodic { interval: ChronoDuration },
125 Custom,
127}
128
129#[derive(Debug, Clone, Serialize, Deserialize)]
131pub struct LateDataConfig {
132 pub strategy: LateDataStrategy,
134 pub side_output_enabled: bool,
136}
137
138impl Default for LateDataConfig {
139 fn default() -> Self {
140 Self {
141 strategy: LateDataStrategy::Drop,
142 side_output_enabled: true,
143 }
144 }
145}
146
147#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
149pub enum LateDataStrategy {
150 Drop,
152 EmitWithMarker,
154 ReprocessWindows,
156}
157
158pub struct TemporalJoin {
160 config: TemporalJoinConfig,
161 left_buffer: Arc<RwLock<EventBuffer>>,
162 right_buffer: Arc<RwLock<EventBuffer>>,
163 watermarks: Arc<RwLock<Watermarks>>,
164 metrics: Arc<RwLock<TemporalJoinMetrics>>,
165}
166
167#[derive(Debug)]
169struct EventBuffer {
170 events: VecDeque<TimestampedEvent>,
171 max_size: usize,
172}
173
174#[derive(Debug, Clone)]
176struct TimestampedEvent {
177 event: StreamEvent,
178 event_time: DateTime<Utc>,
179 processing_time: DateTime<Utc>,
180}
181
182impl EventBuffer {
183 fn new(max_size: usize) -> Self {
184 Self {
185 events: VecDeque::new(),
186 max_size,
187 }
188 }
189
190 fn add_event(&mut self, event: TimestampedEvent) {
191 if self.events.len() >= self.max_size {
192 self.events.pop_front();
193 }
194 self.events.push_back(event);
195 }
196
197 fn get_events_in_window(
198 &self,
199 timestamp: DateTime<Utc>,
200 window: &TemporalWindow,
201 ) -> Vec<TimestampedEvent> {
202 let lower = timestamp + window.lower_bound;
203 let upper = timestamp + window.upper_bound;
204
205 self.events
206 .iter()
207 .filter(|e| {
208 let t = e.event_time;
209 (t > lower && t < upper) || (window.allow_exact && t == timestamp)
210 })
211 .cloned()
212 .collect()
213 }
214
215 fn purge_before_watermark(&mut self, watermark: DateTime<Utc>) {
216 while let Some(event) = self.events.front() {
217 if event.event_time < watermark {
218 self.events.pop_front();
219 } else {
220 break;
221 }
222 }
223 }
224}
225
226#[derive(Debug, Clone)]
228struct Watermarks {
229 left_watermark: Option<DateTime<Utc>>,
230 right_watermark: Option<DateTime<Utc>>,
231}
232
233impl Watermarks {
234 fn new() -> Self {
235 Self {
236 left_watermark: None,
237 right_watermark: None,
238 }
239 }
240
241 fn update_left(&mut self, watermark: DateTime<Utc>) {
242 self.left_watermark = Some(watermark);
243 }
244
245 fn update_right(&mut self, watermark: DateTime<Utc>) {
246 self.right_watermark = Some(watermark);
247 }
248
249 fn min_watermark(&self) -> Option<DateTime<Utc>> {
250 match (self.left_watermark, self.right_watermark) {
251 (Some(l), Some(r)) => Some(l.min(r)),
252 (Some(l), None) => Some(l),
253 (None, Some(r)) => Some(r),
254 (None, None) => None,
255 }
256 }
257}
258
259#[derive(Debug, Clone, Default, Serialize, Deserialize)]
261pub struct TemporalJoinMetrics {
262 pub left_events_processed: u64,
264 pub right_events_processed: u64,
266 pub join_matches: u64,
268 pub late_events_dropped: u64,
270 pub watermarks_emitted: u64,
272 pub avg_join_latency_ms: f64,
274}
275
276impl TemporalJoin {
277 pub fn new(config: TemporalJoinConfig) -> Self {
279 Self {
280 config,
281 left_buffer: Arc::new(RwLock::new(EventBuffer::new(10000))),
282 right_buffer: Arc::new(RwLock::new(EventBuffer::new(10000))),
283 watermarks: Arc::new(RwLock::new(Watermarks::new())),
284 metrics: Arc::new(RwLock::new(TemporalJoinMetrics::default())),
285 }
286 }
287
288 pub async fn process_left(&self, event: StreamEvent) -> Result<Vec<JoinResult>> {
290 let start_time = std::time::Instant::now();
291
292 let timestamped = self.create_timestamped_event(event).await?;
293
294 if self.is_late_event(×tamped, true).await {
296 return self.handle_late_event(timestamped, true).await;
297 }
298
299 self.left_buffer
301 .write()
302 .await
303 .add_event(timestamped.clone());
304
305 let results = self.join_with_right(×tamped).await?;
307
308 {
310 let mut metrics = self.metrics.write().await;
311 metrics.left_events_processed += 1;
312 metrics.join_matches += results.len() as u64;
313 let latency = start_time.elapsed().as_millis() as f64;
314 metrics.avg_join_latency_ms = (metrics.avg_join_latency_ms + latency) / 2.0;
315 }
316
317 self.update_watermark(×tamped, true).await;
319
320 debug!("Processed left event, found {} matches", results.len());
321 Ok(results)
322 }
323
324 pub async fn process_right(&self, event: StreamEvent) -> Result<Vec<JoinResult>> {
326 let start_time = std::time::Instant::now();
327
328 let timestamped = self.create_timestamped_event(event).await?;
329
330 if self.is_late_event(×tamped, false).await {
332 return self.handle_late_event(timestamped, false).await;
333 }
334
335 self.right_buffer
337 .write()
338 .await
339 .add_event(timestamped.clone());
340
341 let results = self.join_with_left(×tamped).await?;
343
344 {
346 let mut metrics = self.metrics.write().await;
347 metrics.right_events_processed += 1;
348 metrics.join_matches += results.len() as u64;
349 let latency = start_time.elapsed().as_millis() as f64;
350 metrics.avg_join_latency_ms = (metrics.avg_join_latency_ms + latency) / 2.0;
351 }
352
353 self.update_watermark(×tamped, false).await;
355
356 debug!("Processed right event, found {} matches", results.len());
357 Ok(results)
358 }
359
360 async fn create_timestamped_event(&self, event: StreamEvent) -> Result<TimestampedEvent> {
362 let event_time = match self.config.time_semantics {
363 TimeSemantics::EventTime => self.extract_event_time(&event)?,
364 TimeSemantics::ProcessingTime => Utc::now(),
365 TimeSemantics::IngestionTime => Utc::now(),
366 };
367
368 Ok(TimestampedEvent {
369 event,
370 event_time,
371 processing_time: Utc::now(),
372 })
373 }
374
375 fn extract_event_time(&self, event: &StreamEvent) -> Result<DateTime<Utc>> {
377 match event {
378 StreamEvent::TripleAdded { metadata, .. } => Ok(metadata.timestamp),
379 StreamEvent::TripleRemoved { metadata, .. } => Ok(metadata.timestamp),
380 StreamEvent::GraphCreated { metadata, .. } => Ok(metadata.timestamp),
381 StreamEvent::GraphDeleted { metadata, .. } => Ok(metadata.timestamp),
382 StreamEvent::TransactionBegin { metadata, .. } => Ok(metadata.timestamp),
383 StreamEvent::TransactionCommit { metadata, .. } => Ok(metadata.timestamp),
384 StreamEvent::TransactionAbort { metadata, .. } => Ok(metadata.timestamp),
385 _ => Err(anyhow!("Cannot extract event time from event")),
386 }
387 }
388
389 async fn is_late_event(&self, event: &TimestampedEvent, is_left: bool) -> bool {
391 let watermarks = self.watermarks.read().await;
392 let watermark = if is_left {
393 watermarks.left_watermark
394 } else {
395 watermarks.right_watermark
396 };
397
398 if let Some(wm) = watermark {
399 event.event_time < wm - self.config.watermark.max_lateness
400 } else {
401 false
402 }
403 }
404
405 async fn handle_late_event(
407 &self,
408 _event: TimestampedEvent,
409 _is_left: bool,
410 ) -> Result<Vec<JoinResult>> {
411 match self.config.late_data.strategy {
412 LateDataStrategy::Drop => {
413 self.metrics.write().await.late_events_dropped += 1;
414 warn!("Dropped late event");
415 Ok(Vec::new())
416 }
417 LateDataStrategy::EmitWithMarker => {
418 Ok(Vec::new())
420 }
421 LateDataStrategy::ReprocessWindows => {
422 Ok(Vec::new())
424 }
425 }
426 }
427
428 async fn join_with_right(&self, left_event: &TimestampedEvent) -> Result<Vec<JoinResult>> {
430 let right_buffer = self.right_buffer.read().await;
431 let matches = right_buffer.get_events_in_window(left_event.event_time, &self.config.window);
432
433 let results = matches
434 .into_iter()
435 .map(|right_event| JoinResult {
436 left_event: left_event.event.clone(),
437 right_event: Some(right_event.event),
438 join_time: Utc::now(),
439 time_diff: (right_event.event_time - left_event.event_time).num_milliseconds(),
440 })
441 .collect();
442
443 Ok(results)
444 }
445
446 async fn join_with_left(&self, right_event: &TimestampedEvent) -> Result<Vec<JoinResult>> {
448 let left_buffer = self.left_buffer.read().await;
449 let matches = left_buffer.get_events_in_window(right_event.event_time, &self.config.window);
450
451 let results = matches
452 .into_iter()
453 .map(|left_event| JoinResult {
454 left_event: left_event.event,
455 right_event: Some(right_event.event.clone()),
456 join_time: Utc::now(),
457 time_diff: (right_event.event_time - left_event.event_time).num_milliseconds(),
458 })
459 .collect();
460
461 Ok(results)
462 }
463
464 async fn update_watermark(&self, event: &TimestampedEvent, is_left: bool) {
466 let watermark = match self.config.watermark.strategy {
467 WatermarkStrategy::Ascending => event.event_time,
468 WatermarkStrategy::BoundedOutOfOrder { max_delay } => event.event_time - max_delay,
469 WatermarkStrategy::Periodic { .. } => {
470 return;
472 }
473 WatermarkStrategy::Custom => {
474 event.event_time
476 }
477 };
478
479 let mut watermarks = self.watermarks.write().await;
480 if is_left {
481 watermarks.update_left(watermark);
482 } else {
483 watermarks.update_right(watermark);
484 }
485
486 self.metrics.write().await.watermarks_emitted += 1;
487
488 if let Some(min_wm) = watermarks.min_watermark() {
490 drop(watermarks);
491 self.left_buffer
492 .write()
493 .await
494 .purge_before_watermark(min_wm);
495 self.right_buffer
496 .write()
497 .await
498 .purge_before_watermark(min_wm);
499 }
500 }
501
502 pub async fn get_metrics(&self) -> TemporalJoinMetrics {
504 self.metrics.read().await.clone()
505 }
506}
507
508#[derive(Debug, Clone)]
510pub struct JoinResult {
511 pub left_event: StreamEvent,
513 pub right_event: Option<StreamEvent>,
515 pub join_time: DateTime<Utc>,
517 pub time_diff: i64,
519}
520
521pub struct IntervalJoin {
523 config: TemporalJoinConfig,
524 join: TemporalJoin,
525}
526
527impl IntervalJoin {
528 pub fn new(config: TemporalJoinConfig) -> Self {
530 let mut join_config = config.clone();
531 join_config.join_type = TemporalJoinType::Interval;
532
533 Self {
534 config,
535 join: TemporalJoin::new(join_config),
536 }
537 }
538
539 pub async fn process(
541 &self,
542 left_event: StreamEvent,
543 right_event: StreamEvent,
544 ) -> Result<Vec<JoinResult>> {
545 let left_results = self.join.process_left(left_event).await?;
547 let right_results = self.join.process_right(right_event).await?;
548
549 let mut all_results = left_results;
551 all_results.extend(right_results);
552
553 Ok(all_results)
554 }
555}
556
557#[cfg(test)]
558mod tests {
559 use super::*;
560 use crate::event::EventMetadata;
561 use std::collections::HashMap;
562
563 #[tokio::test]
564 async fn test_temporal_join_creation() {
565 let config = TemporalJoinConfig::default();
566 let join = TemporalJoin::new(config);
567 let metrics = join.get_metrics().await;
568 assert_eq!(metrics.left_events_processed, 0);
569 }
570
571 #[tokio::test]
572 async fn test_event_buffer() {
573 let mut buffer = EventBuffer::new(100);
574 let metadata = EventMetadata {
575 event_id: "test".to_string(),
576 timestamp: Utc::now(),
577 source: "test".to_string(),
578 user: None,
579 context: None,
580 caused_by: None,
581 version: "1.0".to_string(),
582 properties: HashMap::new(),
583 checksum: None,
584 };
585
586 let event = TimestampedEvent {
587 event: StreamEvent::GraphCreated {
588 graph: "test".to_string(),
589 metadata,
590 },
591 event_time: Utc::now(),
592 processing_time: Utc::now(),
593 };
594
595 buffer.add_event(event);
596 assert_eq!(buffer.events.len(), 1);
597 }
598
599 #[tokio::test]
600 async fn test_watermark_strategy() {
601 let strategy = WatermarkStrategy::BoundedOutOfOrder {
602 max_delay: ChronoDuration::seconds(5),
603 };
604
605 match strategy {
606 WatermarkStrategy::BoundedOutOfOrder { max_delay } => {
607 assert_eq!(max_delay, ChronoDuration::seconds(5));
608 }
609 _ => panic!("Wrong strategy"),
610 }
611 }
612}