1use crate::StreamEvent;
14use anyhow::{anyhow, Result};
15use chrono::{DateTime, Duration as ChronoDuration, Utc};
16use serde::{Deserialize, Serialize};
17use std::collections::VecDeque;
18use std::sync::Arc;
19use tokio::sync::RwLock;
20
21#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
23pub enum JoinType {
24 Inner,
26 LeftOuter,
28 RightOuter,
30 FullOuter,
32}
33
34#[derive(Debug, Clone, Serialize, Deserialize)]
36pub enum JoinWindowStrategy {
37 Tumbling { duration: ChronoDuration },
39 Sliding {
41 duration: ChronoDuration,
42 slide: ChronoDuration,
43 },
44 Session { gap_timeout: ChronoDuration },
46 CountBased { size: usize },
48}
49
50#[derive(Debug, Clone, Serialize, Deserialize)]
52pub enum JoinCondition {
53 OnEquals {
55 left_field: String,
56 right_field: String,
57 },
58 Custom { expression: String },
60 TimeProximity { max_difference: ChronoDuration },
62}
63
64#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct JoinConfig {
67 pub join_type: JoinType,
68 pub window_strategy: JoinWindowStrategy,
69 pub condition: JoinCondition,
70 pub max_buffer_size: usize,
71 pub emit_incomplete: bool, }
73
74impl Default for JoinConfig {
75 fn default() -> Self {
76 Self {
77 join_type: JoinType::Inner,
78 window_strategy: JoinWindowStrategy::Tumbling {
79 duration: ChronoDuration::seconds(60),
80 },
81 condition: JoinCondition::TimeProximity {
82 max_difference: ChronoDuration::seconds(10),
83 },
84 max_buffer_size: 10000,
85 emit_incomplete: true,
86 }
87 }
88}
89
90#[derive(Debug, Clone, Serialize, Deserialize)]
92pub struct JoinedEvent {
93 pub left: Option<StreamEvent>,
94 pub right: Option<StreamEvent>,
95 pub join_time: DateTime<Utc>,
96 pub match_confidence: f64,
97 pub window_id: String,
98}
99
100type EventBuffer = Arc<RwLock<VecDeque<(StreamEvent, DateTime<Utc>)>>>;
102
103pub struct StreamJoiner {
105 config: JoinConfig,
106 left_buffer: EventBuffer,
107 right_buffer: EventBuffer,
108 join_results: Arc<RwLock<Vec<JoinedEvent>>>,
109 stats: Arc<RwLock<JoinStats>>,
110 current_window_id: Arc<RwLock<String>>,
111}
112
113#[derive(Debug, Clone, Default)]
114pub struct JoinStats {
115 pub left_events_received: u64,
116 pub right_events_received: u64,
117 pub pairs_matched: u64,
118 pub pairs_emitted: u64,
119 pub left_unmatched: u64,
120 pub right_unmatched: u64,
121 pub windows_processed: u64,
122 pub avg_join_latency_ms: f64,
123}
124
125impl StreamJoiner {
126 pub fn new(config: JoinConfig) -> Self {
128 Self {
129 config,
130 left_buffer: Arc::new(RwLock::new(VecDeque::new())),
131 right_buffer: Arc::new(RwLock::new(VecDeque::new())),
132 join_results: Arc::new(RwLock::new(Vec::new())),
133 stats: Arc::new(RwLock::new(JoinStats::default())),
134 current_window_id: Arc::new(RwLock::new(uuid::Uuid::new_v4().to_string())),
135 }
136 }
137
138 pub async fn process_left(&self, event: StreamEvent) -> Result<Vec<JoinedEvent>> {
140 let start = std::time::Instant::now();
141 let now = Utc::now();
142
143 {
145 let mut stats = self.stats.write().await;
146 stats.left_events_received += 1;
147 }
148
149 {
151 let mut left_buffer = self.left_buffer.write().await;
152 left_buffer.push_back((event.clone(), now));
153
154 if left_buffer.len() > self.config.max_buffer_size {
156 left_buffer.pop_front();
157 }
158 }
159
160 let results = self.perform_join(Some(event), None, now).await?;
162
163 {
165 let mut stats = self.stats.write().await;
166 let latency = start.elapsed().as_secs_f64() * 1000.0;
167 let alpha = 0.1;
168 stats.avg_join_latency_ms = alpha * latency + (1.0 - alpha) * stats.avg_join_latency_ms;
169 }
170
171 Ok(results)
172 }
173
174 pub async fn process_right(&self, event: StreamEvent) -> Result<Vec<JoinedEvent>> {
176 let start = std::time::Instant::now();
177 let now = Utc::now();
178
179 {
181 let mut stats = self.stats.write().await;
182 stats.right_events_received += 1;
183 }
184
185 {
187 let mut right_buffer = self.right_buffer.write().await;
188 right_buffer.push_back((event.clone(), now));
189
190 if right_buffer.len() > self.config.max_buffer_size {
192 right_buffer.pop_front();
193 }
194 }
195
196 let results = self.perform_join(None, Some(event), now).await?;
198
199 {
201 let mut stats = self.stats.write().await;
202 let latency = start.elapsed().as_secs_f64() * 1000.0;
203 let alpha = 0.1;
204 stats.avg_join_latency_ms = alpha * latency + (1.0 - alpha) * stats.avg_join_latency_ms;
205 }
206
207 Ok(results)
208 }
209
210 async fn perform_join(
212 &self,
213 left_event: Option<StreamEvent>,
214 right_event: Option<StreamEvent>,
215 now: DateTime<Utc>,
216 ) -> Result<Vec<JoinedEvent>> {
217 let mut results = Vec::new();
218
219 match self.config.join_type {
220 JoinType::Inner => {
221 self.perform_inner_join(&mut results, left_event, right_event, now)
222 .await?;
223 }
224 JoinType::LeftOuter => {
225 self.perform_left_outer_join(&mut results, left_event, right_event, now)
226 .await?;
227 }
228 JoinType::RightOuter => {
229 self.perform_right_outer_join(&mut results, left_event, right_event, now)
230 .await?;
231 }
232 JoinType::FullOuter => {
233 self.perform_full_outer_join(&mut results, left_event, right_event, now)
234 .await?;
235 }
236 }
237
238 {
240 let mut stats = self.stats.write().await;
241 stats.pairs_matched += results.len() as u64;
242 stats.pairs_emitted += results.len() as u64;
243 }
244
245 Ok(results)
246 }
247
248 async fn perform_inner_join(
250 &self,
251 results: &mut Vec<JoinedEvent>,
252 left_event: Option<StreamEvent>,
253 right_event: Option<StreamEvent>,
254 now: DateTime<Utc>,
255 ) -> Result<()> {
256 if let Some(left) = left_event {
257 let right_buffer = self.right_buffer.read().await;
259
260 for (right, right_time) in right_buffer.iter() {
261 if self
262 .matches_condition(&left, right, *right_time, now)
263 .await?
264 {
265 results.push(JoinedEvent {
266 left: Some(left.clone()),
267 right: Some(right.clone()),
268 join_time: now,
269 match_confidence: 1.0,
270 window_id: self.current_window_id.read().await.clone(),
271 });
272 }
273 }
274 }
275
276 if let Some(right) = right_event {
277 let left_buffer = self.left_buffer.read().await;
279
280 for (left, left_time) in left_buffer.iter() {
281 if self
282 .matches_condition(left, &right, *left_time, now)
283 .await?
284 {
285 results.push(JoinedEvent {
286 left: Some(left.clone()),
287 right: Some(right.clone()),
288 join_time: now,
289 match_confidence: 1.0,
290 window_id: self.current_window_id.read().await.clone(),
291 });
292 }
293 }
294 }
295
296 Ok(())
297 }
298
299 async fn perform_left_outer_join(
301 &self,
302 results: &mut Vec<JoinedEvent>,
303 left_event: Option<StreamEvent>,
304 right_event: Option<StreamEvent>,
305 now: DateTime<Utc>,
306 ) -> Result<()> {
307 if let Some(left) = left_event {
308 let right_buffer = self.right_buffer.read().await;
309 let mut found_match = false;
310
311 for (right, right_time) in right_buffer.iter() {
312 if self
313 .matches_condition(&left, right, *right_time, now)
314 .await?
315 {
316 results.push(JoinedEvent {
317 left: Some(left.clone()),
318 right: Some(right.clone()),
319 join_time: now,
320 match_confidence: 1.0,
321 window_id: self.current_window_id.read().await.clone(),
322 });
323 found_match = true;
324 }
325 }
326
327 if !found_match && self.config.emit_incomplete {
329 results.push(JoinedEvent {
330 left: Some(left),
331 right: None,
332 join_time: now,
333 match_confidence: 0.0,
334 window_id: self.current_window_id.read().await.clone(),
335 });
336
337 let mut stats = self.stats.write().await;
338 stats.left_unmatched += 1;
339 }
340 }
341
342 if right_event.is_some() {
344 self.perform_inner_join(results, None, right_event, now)
345 .await?;
346 }
347
348 Ok(())
349 }
350
351 async fn perform_right_outer_join(
353 &self,
354 results: &mut Vec<JoinedEvent>,
355 left_event: Option<StreamEvent>,
356 right_event: Option<StreamEvent>,
357 now: DateTime<Utc>,
358 ) -> Result<()> {
359 if let Some(right) = right_event {
360 let left_buffer = self.left_buffer.read().await;
361 let mut found_match = false;
362
363 for (left, left_time) in left_buffer.iter() {
364 if self
365 .matches_condition(left, &right, *left_time, now)
366 .await?
367 {
368 results.push(JoinedEvent {
369 left: Some(left.clone()),
370 right: Some(right.clone()),
371 join_time: now,
372 match_confidence: 1.0,
373 window_id: self.current_window_id.read().await.clone(),
374 });
375 found_match = true;
376 }
377 }
378
379 if !found_match && self.config.emit_incomplete {
381 results.push(JoinedEvent {
382 left: None,
383 right: Some(right),
384 join_time: now,
385 match_confidence: 0.0,
386 window_id: self.current_window_id.read().await.clone(),
387 });
388
389 let mut stats = self.stats.write().await;
390 stats.right_unmatched += 1;
391 }
392 }
393
394 if left_event.is_some() {
396 self.perform_inner_join(results, left_event, None, now)
397 .await?;
398 }
399
400 Ok(())
401 }
402
403 async fn perform_full_outer_join(
405 &self,
406 results: &mut Vec<JoinedEvent>,
407 left_event: Option<StreamEvent>,
408 right_event: Option<StreamEvent>,
409 now: DateTime<Utc>,
410 ) -> Result<()> {
411 if left_event.is_some() {
413 self.perform_left_outer_join(results, left_event, None, now)
414 .await?;
415 }
416
417 if right_event.is_some() {
418 self.perform_right_outer_join(results, None, right_event, now)
419 .await?;
420 }
421
422 Ok(())
423 }
424
425 async fn matches_condition(
427 &self,
428 left: &StreamEvent,
429 right: &StreamEvent,
430 event_time: DateTime<Utc>,
431 now: DateTime<Utc>,
432 ) -> Result<bool> {
433 if !self.is_in_current_window(event_time, now).await? {
435 return Ok(false);
436 }
437
438 match &self.config.condition {
440 JoinCondition::OnEquals {
441 left_field,
442 right_field,
443 } => {
444 let left_value = self.extract_field_value(left, left_field)?;
445 let right_value = self.extract_field_value(right, right_field)?;
446 Ok(left_value == right_value)
447 }
448 JoinCondition::TimeProximity { max_difference } => {
449 let left_time = left.timestamp();
450 let right_time = right.timestamp();
451 let diff = if left_time > right_time {
452 left_time - right_time
453 } else {
454 right_time - left_time
455 };
456 Ok(diff <= *max_difference)
457 }
458 JoinCondition::Custom { expression } => {
459 self.evaluate_custom_condition(left, right, expression)
461 }
462 }
463 }
464
465 async fn is_in_current_window(
467 &self,
468 event_time: DateTime<Utc>,
469 now: DateTime<Utc>,
470 ) -> Result<bool> {
471 match &self.config.window_strategy {
472 JoinWindowStrategy::Tumbling { duration } => {
473 let window_start = now - *duration;
474 Ok(event_time >= window_start)
475 }
476 JoinWindowStrategy::Sliding { duration, .. } => {
477 let window_start = now - *duration;
478 Ok(event_time >= window_start)
479 }
480 JoinWindowStrategy::Session { gap_timeout } => {
481 let last_activity = now - *gap_timeout;
482 Ok(event_time >= last_activity)
483 }
484 JoinWindowStrategy::CountBased { .. } => Ok(true), }
486 }
487
488 fn extract_field_value(&self, event: &StreamEvent, field: &str) -> Result<String> {
490 match event {
491 StreamEvent::TripleAdded {
492 subject,
493 predicate,
494 object,
495 ..
496 }
497 | StreamEvent::TripleRemoved {
498 subject,
499 predicate,
500 object,
501 ..
502 } => match field {
503 "subject" => Ok(subject.clone()),
504 "predicate" => Ok(predicate.clone()),
505 "object" => Ok(object.clone()),
506 _ => Err(anyhow!("Unknown field: {}", field)),
507 },
508 StreamEvent::QuadAdded {
509 subject,
510 predicate,
511 object,
512 graph,
513 ..
514 }
515 | StreamEvent::QuadRemoved {
516 subject,
517 predicate,
518 object,
519 graph,
520 ..
521 } => match field {
522 "subject" => Ok(subject.clone()),
523 "predicate" => Ok(predicate.clone()),
524 "object" => Ok(object.clone()),
525 "graph" => Ok(graph.clone()),
526 _ => Err(anyhow!("Unknown field: {}", field)),
527 },
528 _ => Err(anyhow!("Event type doesn't support field extraction")),
529 }
530 }
531
532 fn evaluate_custom_condition(
534 &self,
535 _left: &StreamEvent,
536 _right: &StreamEvent,
537 _expression: &str,
538 ) -> Result<bool> {
539 Ok(true)
542 }
543
544 pub async fn stats(&self) -> JoinStats {
546 self.stats.read().await.clone()
547 }
548
549 pub async fn clear(&self) {
551 self.left_buffer.write().await.clear();
552 self.right_buffer.write().await.clear();
553 self.join_results.write().await.clear();
554 }
555
556 pub async fn window_id(&self) -> String {
558 self.current_window_id.read().await.clone()
559 }
560
561 pub async fn rotate_window(&self) {
563 let new_window_id = uuid::Uuid::new_v4().to_string();
564 *self.current_window_id.write().await = new_window_id;
565
566 let mut stats = self.stats.write().await;
567 stats.windows_processed += 1;
568 }
569}
570
571#[cfg(test)]
572mod tests {
573 use super::*;
574 use crate::event::EventMetadata;
575
576 fn create_test_event(subject: &str) -> StreamEvent {
577 StreamEvent::TripleAdded {
578 subject: subject.to_string(),
579 predicate: "test".to_string(),
580 object: "value".to_string(),
581 graph: None,
582 metadata: EventMetadata::default(),
583 }
584 }
585
586 #[tokio::test]
587 async fn test_inner_join() {
588 let config = JoinConfig {
589 join_type: JoinType::Inner,
590 window_strategy: JoinWindowStrategy::Tumbling {
591 duration: ChronoDuration::seconds(60),
592 },
593 condition: JoinCondition::OnEquals {
594 left_field: "subject".to_string(),
595 right_field: "subject".to_string(),
596 },
597 ..Default::default()
598 };
599
600 let joiner = StreamJoiner::new(config);
601
602 let left = create_test_event("test_subject");
604 let results1 = joiner.process_left(left).await.unwrap();
605 assert_eq!(results1.len(), 0); let right = create_test_event("test_subject");
609 let results2 = joiner.process_right(right).await.unwrap();
610 assert_eq!(results2.len(), 1); assert!(results2[0].left.is_some());
613 assert!(results2[0].right.is_some());
614 }
615
616 #[tokio::test]
617 async fn test_left_outer_join() {
618 let config = JoinConfig {
619 join_type: JoinType::LeftOuter,
620 emit_incomplete: true,
621 ..Default::default()
622 };
623
624 let joiner = StreamJoiner::new(config);
625
626 let left = create_test_event("unmatched");
628 let results = joiner.process_left(left).await.unwrap();
629
630 assert_eq!(results.len(), 1);
632 assert!(results[0].left.is_some());
633 assert!(results[0].right.is_none());
634 }
635
636 #[tokio::test]
637 async fn test_join_stats() {
638 let config = JoinConfig::default();
639 let joiner = StreamJoiner::new(config);
640
641 joiner
642 .process_left(create_test_event("test1"))
643 .await
644 .unwrap();
645 joiner
646 .process_right(create_test_event("test2"))
647 .await
648 .unwrap();
649
650 let stats = joiner.stats().await;
651 assert_eq!(stats.left_events_received, 1);
652 assert_eq!(stats.right_events_received, 1);
653 }
654}