1use std::cmp::Reverse;
7use std::collections::{BinaryHeap, HashMap};
8
9use chrono::{DateTime, Duration, Utc};
10use rustc_hash::FxHashMap;
11use tracing::{debug, trace};
12use varpulis_core::ast::JoinType;
13use varpulis_core::Value;
14
15use crate::event::Event;
16
17type KeyedEventBuffer = FxHashMap<String, Vec<(DateTime<Utc>, Event)>>;
19
20#[derive(Debug)]
22pub struct JoinBuffer {
23 buffers: FxHashMap<String, KeyedEventBuffer>,
26 sources: Vec<String>,
28 join_keys: FxHashMap<String, String>,
31 window_duration: Duration,
33 max_events_per_key: usize,
35 expiry_queue: BinaryHeap<Reverse<(DateTime<Utc>, String, String)>>,
38 last_gc: Option<DateTime<Utc>>,
40 gc_interval: Duration,
42 join_type: JoinType,
44}
45
46impl JoinBuffer {
47 pub fn new(
54 sources: Vec<String>,
55 join_keys: FxHashMap<String, String>,
56 window_duration: Duration,
57 ) -> Self {
58 let mut buffers = FxHashMap::default();
59 for source in &sources {
60 buffers.insert(source.clone(), FxHashMap::default());
61 }
62
63 let gc_interval_ms = (window_duration.num_milliseconds() / 10).clamp(10, 1000);
65 let gc_interval = Duration::milliseconds(gc_interval_ms);
66
67 Self {
68 buffers,
69 sources,
70 join_keys,
71 window_duration,
72 max_events_per_key: 1000, expiry_queue: BinaryHeap::new(),
74 last_gc: None,
75 gc_interval,
76 join_type: JoinType::Inner,
77 }
78 }
79
80 pub const fn with_join_type(mut self, join_type: JoinType) -> Self {
82 self.join_type = join_type;
83 self
84 }
85
86 pub const fn with_max_events(mut self, max_events: usize) -> Self {
88 self.max_events_per_key = max_events;
89 self
90 }
91
92 pub fn add_event(&mut self, source_name: &str, event: Event) -> Option<Event> {
102 let join_key_field = match self.join_keys.get(source_name) {
104 Some(field) => field.clone(),
105 None => {
106 if let Some(field) = self.find_common_key_field(&event) {
108 field
109 } else {
110 debug!(
111 "No join key field found for source '{}', skipping",
112 source_name
113 );
114 return None;
115 }
116 }
117 };
118
119 let key_value = match event.get(&join_key_field) {
121 Some(v) => v.to_partition_key().into_owned(),
122 None => {
123 debug!(
124 "Event missing join key field '{}', skipping",
125 join_key_field
126 );
127 return None;
128 }
129 };
130
131 trace!(
132 "JoinBuffer: Adding event from '{}' with key '{}' = '{}'",
133 source_name,
134 join_key_field,
135 key_value
136 );
137
138 self.cleanup_expired(event.timestamp);
140
141 if let Some(source_buffer) = self.buffers.get_mut(source_name) {
143 let key_events = source_buffer.entry(key_value.clone()).or_default();
144
145 while key_events.len() >= self.max_events_per_key {
147 key_events.remove(0);
148 }
149
150 key_events.push((event.timestamp, event.clone()));
151
152 let expiry_time = event.timestamp + self.window_duration;
154 self.expiry_queue.push(Reverse((
155 expiry_time,
156 source_name.to_string(),
157 key_value.clone(),
158 )));
159 }
160
161 let source = source_name.to_string();
163 self.try_correlate(&key_value, event.timestamp, Some(&source))
164 }
165
166 fn try_correlate(
171 &mut self,
172 key_value: &str,
173 current_time: DateTime<Utc>,
174 triggering_source: Option<&str>,
175 ) -> Option<Event> {
176 let cutoff = current_time - self.window_duration;
177
178 let mut source_events: Vec<(&str, Option<&Event>)> = Vec::new();
180 let mut missing_count = 0;
181
182 for source in &self.sources {
183 if let Some(source_buffer) = self.buffers.get(source) {
184 if let Some(key_events) = source_buffer.get(key_value) {
185 let valid_event = key_events
186 .iter()
187 .rev()
188 .find(|(ts, _)| *ts >= cutoff)
189 .map(|(_, e)| e);
190
191 match valid_event {
192 Some(event) => {
193 source_events.push((source.as_str(), Some(event)));
194 }
195 None => {
196 trace!(
197 "JoinBuffer: No valid event from '{}' for key '{}'",
198 source,
199 key_value
200 );
201 source_events.push((source.as_str(), None));
202 missing_count += 1;
203 }
204 }
205 } else {
206 trace!(
207 "JoinBuffer: No events from '{}' for key '{}'",
208 source,
209 key_value
210 );
211 source_events.push((source.as_str(), None));
212 missing_count += 1;
213 }
214 } else {
215 source_events.push((source.as_str(), None));
216 missing_count += 1;
217 }
218 }
219
220 if missing_count == 0 {
222 debug!(
223 "JoinBuffer: Correlating {} events for key '{}'",
224 source_events.len(),
225 key_value
226 );
227 return Some(self.create_correlated_event_outer(&source_events));
228 }
229
230 if self.join_type == JoinType::Inner {
232 return None;
233 }
234
235 if missing_count == self.sources.len() {
237 return None;
238 }
239
240 let triggering_source = triggering_source?;
241
242 let first_source = self.sources.first().map_or("", |s| s.as_str());
244 let is_left_trigger = triggering_source == first_source;
245
246 let should_emit = match self.join_type {
247 JoinType::Left => is_left_trigger,
248 JoinType::Right => !is_left_trigger,
249 JoinType::Full => true,
250 JoinType::Inner => false, };
252
253 if should_emit {
254 debug!(
255 "JoinBuffer: Outer join ({:?}) emitting for key '{}' from source '{}'",
256 self.join_type, key_value, triggering_source
257 );
258 Some(self.create_correlated_event_outer(&source_events))
259 } else {
260 None
261 }
262 }
263
264 fn create_correlated_event_outer(&self, source_events: &[(&str, Option<&Event>)]) -> Event {
266 let mut correlated = Event::new("JoinedEvent");
267
268 let max_ts = source_events
270 .iter()
271 .filter_map(|(_, e)| e.map(|ev| ev.timestamp))
272 .max()
273 .unwrap_or_else(Utc::now);
274 correlated.timestamp = max_ts;
275
276 for (source, maybe_event) in source_events {
278 match maybe_event {
279 Some(event) => {
280 for (field, value) in &event.data {
281 let prefixed_key = format!("{source}.{field}");
282 correlated.data.insert(prefixed_key.into(), value.clone());
283
284 if *source != &*event.event_type {
285 let et_prefixed_key = format!("{}.{}", event.event_type, field);
286 correlated
287 .data
288 .insert(et_prefixed_key.into(), value.clone());
289 }
290
291 if !correlated.data.contains_key(field) {
292 correlated.data.insert(field.clone(), value.clone());
293 }
294 }
295 }
296 None => {
297 if let Some(key_field) = self.join_keys.get(*source) {
301 let prefixed_key = format!("{source}.{key_field}");
302 correlated.data.insert(prefixed_key.into(), Value::Null);
303 }
304 }
305 }
306 }
307
308 correlated
309 }
310
311 fn cleanup_expired(&mut self, current_time: DateTime<Utc>) {
316 if let Some(last_gc) = self.last_gc {
318 if current_time - last_gc < self.gc_interval {
319 return;
320 }
321 }
322 self.last_gc = Some(current_time);
323
324 let cutoff = current_time - self.window_duration;
325
326 while let Some(Reverse((expiry_time, _, _))) = self.expiry_queue.peek() {
328 if *expiry_time > current_time {
329 break;
331 }
332
333 let Some(Reverse((_, source, key))) = self.expiry_queue.pop() else {
335 break;
336 };
337
338 if let Some(source_buffer) = self.buffers.get_mut(&source) {
340 if let Some(key_events) = source_buffer.get_mut(&key) {
341 let cutoff_idx = key_events.partition_point(|(ts, _)| *ts < cutoff);
343 if cutoff_idx > 0 {
344 key_events.drain(..cutoff_idx);
345 }
346 if key_events.is_empty() {
348 source_buffer.remove(&key);
349 }
350 }
351 }
352 }
353 }
354
355 fn find_common_key_field(&self, event: &Event) -> Option<String> {
357 const COMMON_KEYS: &[&str] = &["symbol", "key", "id", "user_id", "order_id"];
359
360 for key in COMMON_KEYS {
361 if event.data.contains_key(*key) {
362 return Some((*key).to_string());
363 }
364 }
365 None
366 }
367
368 pub fn stats(&self) -> JoinBufferStats {
370 let mut total_events = 0;
371 let mut events_per_source = FxHashMap::default();
372
373 for (source, buffer) in &self.buffers {
374 let source_count: usize = buffer.values().map(|v| v.len()).sum();
375 events_per_source.insert(source.clone(), source_count);
376 total_events += source_count;
377 }
378
379 JoinBufferStats {
380 total_events,
381 events_per_source,
382 sources: self.sources.clone(),
383 }
384 }
385}
386
387impl JoinBuffer {
388 pub fn checkpoint(&self) -> crate::persistence::JoinCheckpoint {
390 use crate::persistence::SerializableEvent;
391 let mut buffers = HashMap::new();
392 for (source, keyed_buffer) in &self.buffers {
393 let mut keyed = HashMap::new();
394 for (key, events) in keyed_buffer {
395 let serialized: Vec<(i64, SerializableEvent)> = events
396 .iter()
397 .map(|(ts, e)| (ts.timestamp_millis(), SerializableEvent::from(e)))
398 .collect();
399 keyed.insert(key.clone(), serialized);
400 }
401 buffers.insert(source.clone(), keyed);
402 }
403
404 crate::persistence::JoinCheckpoint {
405 buffers,
406 sources: self.sources.clone(),
407 join_keys: self
408 .join_keys
409 .iter()
410 .map(|(k, v)| (k.clone(), v.clone()))
411 .collect(),
412 window_duration_ms: self.window_duration.num_milliseconds(),
413 }
414 }
415
416 pub fn restore(&mut self, cp: &crate::persistence::JoinCheckpoint) {
418 use std::cmp::Reverse;
419
420 use crate::event::Event;
421
422 self.buffers.clear();
423 self.expiry_queue = BinaryHeap::new();
424
425 for (source, keyed) in &cp.buffers {
426 let mut keyed_buffer: KeyedEventBuffer = FxHashMap::default();
427 for (key, events) in keyed {
428 let restored: Vec<(DateTime<Utc>, Event)> = events
429 .iter()
430 .filter_map(|(ts_ms, se)| {
431 let ts = DateTime::from_timestamp_millis(*ts_ms)?;
432 let event = Event::from(se.clone());
433 Some((ts, event))
434 })
435 .collect();
436
437 for (ts, _) in &restored {
439 let expiry_time = *ts + self.window_duration;
440 self.expiry_queue
441 .push(Reverse((expiry_time, source.clone(), key.clone())));
442 }
443
444 keyed_buffer.insert(key.clone(), restored);
445 }
446 self.buffers.insert(source.clone(), keyed_buffer);
447 }
448 }
449}
450
451#[derive(Debug)]
453pub struct JoinBufferStats {
454 pub total_events: usize,
455 pub events_per_source: FxHashMap<String, usize>,
456 pub sources: Vec<String>,
457}
458
459#[cfg(test)]
460mod tests {
461 use varpulis_core::Value;
462
463 use super::*;
464
465 fn create_event(event_type: &str, symbol: &str, value: f64) -> Event {
466 Event::new(event_type)
467 .with_field("symbol", symbol)
468 .with_field("value", value)
469 }
470
471 #[test]
472 fn test_join_buffer_correlates_matching_events() {
473 let sources = vec!["A".to_string(), "B".to_string()];
474 let mut join_keys = FxHashMap::default();
475 join_keys.insert("A".to_string(), "symbol".to_string());
476 join_keys.insert("B".to_string(), "symbol".to_string());
477
478 let mut buffer = JoinBuffer::new(sources, join_keys, Duration::minutes(1));
479
480 let event_a = create_event("A", "BTC", 100.0);
482 let result = buffer.add_event("A", event_a);
483 assert!(result.is_none(), "Should not correlate with just one event");
484
485 let event_b = create_event("B", "BTC", 200.0);
487 let result = buffer.add_event("B", event_b);
488 assert!(
489 result.is_some(),
490 "Should correlate when both sources present"
491 );
492
493 let correlated = result.unwrap();
494 assert_eq!(correlated.get("symbol"), Some(&Value::Str("BTC".into())));
495 assert_eq!(correlated.get("A.value"), Some(&Value::Float(100.0)));
496 assert_eq!(correlated.get("B.value"), Some(&Value::Float(200.0)));
497 }
498
499 #[test]
500 fn test_join_buffer_no_correlation_different_keys() {
501 let sources = vec!["A".to_string(), "B".to_string()];
502 let mut join_keys = FxHashMap::default();
503 join_keys.insert("A".to_string(), "symbol".to_string());
504 join_keys.insert("B".to_string(), "symbol".to_string());
505
506 let mut buffer = JoinBuffer::new(sources, join_keys, Duration::minutes(1));
507
508 let event_a = create_event("A", "BTC", 100.0);
510 buffer.add_event("A", event_a);
511
512 let event_b = create_event("B", "ETH", 200.0);
514 let result = buffer.add_event("B", event_b);
515 assert!(result.is_none(), "Should not correlate with different keys");
516 }
517
518 #[test]
519 fn test_join_buffer_window_expiration() {
520 let sources = vec!["A".to_string(), "B".to_string()];
521 let mut join_keys = FxHashMap::default();
522 join_keys.insert("A".to_string(), "symbol".to_string());
523 join_keys.insert("B".to_string(), "symbol".to_string());
524
525 let mut buffer = JoinBuffer::new(sources, join_keys, Duration::seconds(1));
526
527 let base_time = Utc::now();
528
529 let event_a = Event::new("A")
531 .with_timestamp(base_time)
532 .with_field("symbol", "BTC")
533 .with_field("value", 100.0f64);
534 buffer.add_event("A", event_a);
535
536 let event_b = Event::new("B")
538 .with_timestamp(base_time + Duration::seconds(5))
539 .with_field("symbol", "BTC")
540 .with_field("value", 200.0f64);
541 let result = buffer.add_event("B", event_b);
542
543 assert!(result.is_none(), "Should not correlate - event A expired");
544 }
545
546 #[test]
547 fn test_join_buffer_stats() {
548 let sources = vec!["A".to_string(), "B".to_string()];
549 let mut join_keys = FxHashMap::default();
550 join_keys.insert("A".to_string(), "symbol".to_string());
551 join_keys.insert("B".to_string(), "symbol".to_string());
552
553 let mut buffer = JoinBuffer::new(sources, join_keys, Duration::minutes(1));
554
555 buffer.add_event("A", create_event("A", "BTC", 100.0));
557 buffer.add_event("A", create_event("A", "ETH", 150.0));
558 buffer.add_event("B", create_event("B", "BTC", 200.0));
559
560 let stats = buffer.stats();
561 assert_eq!(stats.total_events, 3);
562 assert_eq!(stats.events_per_source.get("A"), Some(&2));
563 assert_eq!(stats.events_per_source.get("B"), Some(&1));
564 }
565
566 #[test]
567 fn test_join_buffer_multiple_matches() {
568 let sources = vec!["A".to_string(), "B".to_string()];
569 let mut join_keys = FxHashMap::default();
570 join_keys.insert("A".to_string(), "symbol".to_string());
571 join_keys.insert("B".to_string(), "symbol".to_string());
572
573 let mut buffer = JoinBuffer::new(sources, join_keys, Duration::minutes(1));
574
575 buffer.add_event("A", create_event("A", "BTC", 100.0));
577 buffer.add_event("A", create_event("A", "BTC", 110.0));
578
579 let event_b = create_event("B", "BTC", 200.0);
581 let result = buffer.add_event("B", event_b);
582
583 assert!(result.is_some());
584 let correlated = result.unwrap();
585 assert_eq!(correlated.get("A.value"), Some(&Value::Float(110.0)));
587 }
588
589 #[test]
590 fn test_join_buffer_three_way_join() {
591 let sources = vec!["A".to_string(), "B".to_string(), "C".to_string()];
592 let mut join_keys = FxHashMap::default();
593 join_keys.insert("A".to_string(), "symbol".to_string());
594 join_keys.insert("B".to_string(), "symbol".to_string());
595 join_keys.insert("C".to_string(), "symbol".to_string());
596
597 let mut buffer = JoinBuffer::new(sources, join_keys, Duration::minutes(1));
598
599 buffer.add_event("A", create_event("A", "BTC", 100.0));
601 let result = buffer.add_event("B", create_event("B", "BTC", 200.0));
602 assert!(
603 result.is_none(),
604 "Should not correlate with just 2 of 3 sources"
605 );
606
607 let result = buffer.add_event("C", create_event("C", "BTC", 300.0));
609 assert!(result.is_some(), "Should correlate with all 3 sources");
610
611 let correlated = result.unwrap();
612 assert_eq!(correlated.get("A.value"), Some(&Value::Float(100.0)));
613 assert_eq!(correlated.get("B.value"), Some(&Value::Float(200.0)));
614 assert_eq!(correlated.get("C.value"), Some(&Value::Float(300.0)));
615 }
616
617 #[test]
618 fn test_join_buffer_max_events_limit() {
619 let sources = vec!["A".to_string(), "B".to_string()];
620 let mut join_keys = FxHashMap::default();
621 join_keys.insert("A".to_string(), "symbol".to_string());
622 join_keys.insert("B".to_string(), "symbol".to_string());
623
624 let mut buffer =
625 JoinBuffer::new(sources, join_keys, Duration::minutes(1)).with_max_events(3);
626
627 for i in 0..5 {
629 buffer.add_event("A", create_event("A", "BTC", i as f64));
630 }
631
632 let stats = buffer.stats();
634 assert_eq!(stats.events_per_source.get("A"), Some(&3));
635 }
636
637 #[test]
638 fn test_join_buffer_missing_key_field() {
639 let sources = vec!["A".to_string(), "B".to_string()];
640 let mut join_keys = FxHashMap::default();
641 join_keys.insert("A".to_string(), "symbol".to_string());
642 join_keys.insert("B".to_string(), "symbol".to_string());
643
644 let mut buffer = JoinBuffer::new(sources, join_keys, Duration::minutes(1));
645
646 buffer.add_event("A", create_event("A", "BTC", 100.0));
648
649 let event_b = Event::new("B").with_field("value", 200.0f64);
651 let result = buffer.add_event("B", event_b);
652
653 assert!(result.is_none(), "Should not correlate - missing key field");
654 }
655
656 #[test]
657 fn test_join_buffer_common_key_detection() {
658 let sources = vec!["A".to_string(), "B".to_string()];
659 let join_keys = FxHashMap::default();
661
662 let mut buffer = JoinBuffer::new(sources, join_keys, Duration::minutes(1));
663
664 buffer.add_event("A", create_event("A", "BTC", 100.0));
666 let result = buffer.add_event("B", create_event("B", "BTC", 200.0));
667
668 assert!(
670 result.is_some(),
671 "Should correlate using auto-detected symbol key"
672 );
673 }
674
675 #[test]
676 fn test_join_buffer_continuous_correlation() {
677 let sources = vec!["A".to_string(), "B".to_string()];
678 let mut join_keys = FxHashMap::default();
679 join_keys.insert("A".to_string(), "symbol".to_string());
680 join_keys.insert("B".to_string(), "symbol".to_string());
681
682 let mut buffer = JoinBuffer::new(sources, join_keys, Duration::minutes(1));
683
684 buffer.add_event("A", create_event("A", "BTC", 100.0));
686 let result1 = buffer.add_event("B", create_event("B", "BTC", 200.0));
687 assert!(result1.is_some());
688
689 let result2 = buffer.add_event("A", create_event("A", "BTC", 150.0));
691 assert!(
692 result2.is_some(),
693 "Should correlate again with existing B event"
694 );
695
696 let result3 = buffer.add_event("B", create_event("B", "BTC", 250.0));
697 assert!(result3.is_some(), "Should correlate with recent A event");
698 }
699
700 #[test]
701 fn test_join_buffer_multiple_symbols() {
702 let sources = vec!["A".to_string(), "B".to_string()];
703 let mut join_keys = FxHashMap::default();
704 join_keys.insert("A".to_string(), "symbol".to_string());
705 join_keys.insert("B".to_string(), "symbol".to_string());
706
707 let mut buffer = JoinBuffer::new(sources, join_keys, Duration::minutes(1));
708
709 buffer.add_event("A", create_event("A", "BTC", 100.0));
711 buffer.add_event("A", create_event("A", "ETH", 50.0));
712 buffer.add_event("B", create_event("B", "ETH", 60.0)); let stats = buffer.stats();
715 assert!(stats.total_events >= 2);
717 }
718
719 #[test]
720 fn test_join_buffer_checkpoint_restore() {
721 let sources = vec!["A".to_string(), "B".to_string()];
722 let mut join_keys = FxHashMap::default();
723 join_keys.insert("A".to_string(), "symbol".to_string());
724 join_keys.insert("B".to_string(), "symbol".to_string());
725
726 let mut buffer = JoinBuffer::new(sources.clone(), join_keys.clone(), Duration::minutes(1));
727
728 let event_a = create_event("A", "BTC", 100.0);
730 let result = buffer.add_event("A", event_a);
731 assert!(
732 result.is_none(),
733 "Should not correlate with just one source"
734 );
735
736 let cp = buffer.checkpoint();
738
739 let mut buffer2 = JoinBuffer::new(sources, join_keys, Duration::minutes(1));
741
742 buffer2.restore(&cp);
744
745 let event_b = create_event("B", "BTC", 200.0);
747 let result = buffer2.add_event("B", event_b);
748 assert!(
749 result.is_some(),
750 "Should correlate after restoring source A event from checkpoint"
751 );
752
753 let correlated = result.unwrap();
754 assert_eq!(correlated.get("symbol"), Some(&Value::Str("BTC".into())));
755 assert_eq!(correlated.get("A.value"), Some(&Value::Float(100.0)));
756 assert_eq!(correlated.get("B.value"), Some(&Value::Float(200.0)));
757 }
758
759 #[test]
760 fn test_join_buffer_checkpoint_empty() {
761 let sources = vec!["A".to_string(), "B".to_string()];
762 let mut join_keys = FxHashMap::default();
763 join_keys.insert("A".to_string(), "symbol".to_string());
764 join_keys.insert("B".to_string(), "symbol".to_string());
765
766 let buffer = JoinBuffer::new(sources.clone(), join_keys.clone(), Duration::minutes(1));
767
768 let cp = buffer.checkpoint();
770
771 for keyed in cp.buffers.values() {
773 assert!(keyed.is_empty(), "Checkpoint buffers should be empty");
774 }
775
776 let mut buffer2 = JoinBuffer::new(sources, join_keys, Duration::minutes(1));
778 buffer2.restore(&cp);
779
780 let event_a = create_event("A", "BTC", 100.0);
782 let result = buffer2.add_event("A", event_a);
783 assert!(
784 result.is_none(),
785 "Should not correlate with just one source"
786 );
787
788 let event_b = create_event("B", "BTC", 200.0);
789 let result = buffer2.add_event("B", event_b);
790 assert!(
791 result.is_some(),
792 "Should correlate normally after restoring from empty checkpoint"
793 );
794
795 let correlated = result.unwrap();
796 assert_eq!(correlated.get("symbol"), Some(&Value::Str("BTC".into())));
797 }
798
799 #[test]
804 fn test_left_join_emits_on_left_without_right() {
805 let sources = vec!["A".to_string(), "B".to_string()];
806 let mut join_keys = FxHashMap::default();
807 join_keys.insert("A".to_string(), "symbol".to_string());
808 join_keys.insert("B".to_string(), "symbol".to_string());
809
810 let mut buffer = JoinBuffer::new(sources, join_keys, Duration::minutes(1))
811 .with_join_type(JoinType::Left);
812
813 let event_a = create_event("A", "BTC", 100.0);
815 let result = buffer.add_event("A", event_a);
816 assert!(result.is_some(), "Left join should emit on left event");
817
818 let correlated = result.unwrap();
819 assert_eq!(correlated.get("A.value"), Some(&Value::Float(100.0)));
820 assert_eq!(correlated.get("B.symbol"), Some(&Value::Null));
822 }
823
824 #[test]
825 fn test_left_join_does_not_emit_on_right_alone() {
826 let sources = vec!["A".to_string(), "B".to_string()];
827 let mut join_keys = FxHashMap::default();
828 join_keys.insert("A".to_string(), "symbol".to_string());
829 join_keys.insert("B".to_string(), "symbol".to_string());
830
831 let mut buffer = JoinBuffer::new(sources, join_keys, Duration::minutes(1))
832 .with_join_type(JoinType::Left);
833
834 let event_b = create_event("B", "BTC", 200.0);
836 let result = buffer.add_event("B", event_b);
837 assert!(
838 result.is_none(),
839 "Left join should not emit on right-only event"
840 );
841 }
842
843 #[test]
844 fn test_left_join_full_match_emits() {
845 let sources = vec!["A".to_string(), "B".to_string()];
846 let mut join_keys = FxHashMap::default();
847 join_keys.insert("A".to_string(), "symbol".to_string());
848 join_keys.insert("B".to_string(), "symbol".to_string());
849
850 let mut buffer = JoinBuffer::new(sources, join_keys, Duration::minutes(1))
851 .with_join_type(JoinType::Left);
852
853 buffer.add_event("A", create_event("A", "BTC", 100.0));
854 let result = buffer.add_event("B", create_event("B", "BTC", 200.0));
855 assert!(result.is_some(), "Left join should emit on full match");
856
857 let correlated = result.unwrap();
858 assert_eq!(correlated.get("A.value"), Some(&Value::Float(100.0)));
859 assert_eq!(correlated.get("B.value"), Some(&Value::Float(200.0)));
860 }
861
862 #[test]
863 fn test_right_join_emits_on_right_without_left() {
864 let sources = vec!["A".to_string(), "B".to_string()];
865 let mut join_keys = FxHashMap::default();
866 join_keys.insert("A".to_string(), "symbol".to_string());
867 join_keys.insert("B".to_string(), "symbol".to_string());
868
869 let mut buffer = JoinBuffer::new(sources, join_keys, Duration::minutes(1))
870 .with_join_type(JoinType::Right);
871
872 let event_b = create_event("B", "BTC", 200.0);
874 let result = buffer.add_event("B", event_b);
875 assert!(result.is_some(), "Right join should emit on right event");
876
877 let correlated = result.unwrap();
878 assert_eq!(correlated.get("B.value"), Some(&Value::Float(200.0)));
879 assert_eq!(correlated.get("A.symbol"), Some(&Value::Null));
880 }
881
882 #[test]
883 fn test_right_join_does_not_emit_on_left_alone() {
884 let sources = vec!["A".to_string(), "B".to_string()];
885 let mut join_keys = FxHashMap::default();
886 join_keys.insert("A".to_string(), "symbol".to_string());
887 join_keys.insert("B".to_string(), "symbol".to_string());
888
889 let mut buffer = JoinBuffer::new(sources, join_keys, Duration::minutes(1))
890 .with_join_type(JoinType::Right);
891
892 let event_a = create_event("A", "BTC", 100.0);
893 let result = buffer.add_event("A", event_a);
894 assert!(
895 result.is_none(),
896 "Right join should not emit on left-only event"
897 );
898 }
899
900 #[test]
901 fn test_full_join_emits_on_either_side() {
902 let sources = vec!["A".to_string(), "B".to_string()];
903 let mut join_keys = FxHashMap::default();
904 join_keys.insert("A".to_string(), "symbol".to_string());
905 join_keys.insert("B".to_string(), "symbol".to_string());
906
907 let mut buffer = JoinBuffer::new(sources, join_keys, Duration::minutes(1))
908 .with_join_type(JoinType::Full);
909
910 let result = buffer.add_event("A", create_event("A", "BTC", 100.0));
912 assert!(result.is_some(), "Full join should emit on left event");
913 let correlated = result.unwrap();
914 assert_eq!(correlated.get("A.value"), Some(&Value::Float(100.0)));
915 assert_eq!(correlated.get("B.symbol"), Some(&Value::Null));
916
917 let result = buffer.add_event("B", create_event("B", "BTC", 200.0));
919 assert!(result.is_some(), "Full join should emit on right event");
920 let correlated = result.unwrap();
921 assert_eq!(correlated.get("A.value"), Some(&Value::Float(100.0)));
922 assert_eq!(correlated.get("B.value"), Some(&Value::Float(200.0)));
923 }
924
925 #[test]
926 fn test_inner_join_unchanged_behavior() {
927 let sources = vec!["A".to_string(), "B".to_string()];
928 let mut join_keys = FxHashMap::default();
929 join_keys.insert("A".to_string(), "symbol".to_string());
930 join_keys.insert("B".to_string(), "symbol".to_string());
931
932 let mut buffer = JoinBuffer::new(sources, join_keys, Duration::minutes(1))
933 .with_join_type(JoinType::Inner);
934
935 let result = buffer.add_event("A", create_event("A", "BTC", 100.0));
937 assert!(result.is_none(), "Inner join should not emit with one side");
938
939 let result = buffer.add_event("B", create_event("B", "BTC", 200.0));
941 assert!(result.is_some(), "Inner join should emit with both sides");
942 }
943}