Skip to main content

varpulis_runtime/
join.rs

1//! Join buffer for correlating events from multiple streams
2//!
3//! The JoinBuffer maintains events from multiple source streams and correlates
4//! them when events with matching join keys arrive within a specified time window.
5
6use std::cmp::Reverse;
7use std::collections::{BinaryHeap, HashMap};
8
9use chrono::{DateTime, Duration, Utc};
10use rustc_hash::FxHashMap;
11use tracing::{debug, trace};
12use varpulis_core::ast::JoinType;
13use varpulis_core::Value;
14
15use crate::event::Event;
16
17/// Type alias for timestamped events stored by key value
18type KeyedEventBuffer = FxHashMap<String, Vec<(DateTime<Utc>, Event)>>;
19
20/// Buffer for join operations - stores events from each source and correlates them
21#[derive(Debug)]
22pub struct JoinBuffer {
23    /// Events by source name, keyed by join key value
24    /// Structure: source_name -> (join_key_value -> Vec<(timestamp, event)>)
25    buffers: FxHashMap<String, KeyedEventBuffer>,
26    /// Names of the source streams being joined
27    sources: Vec<String>,
28    /// Join key field name for each source (extracted from .on() clause)
29    /// Structure: source_name -> field_name
30    join_keys: FxHashMap<String, String>,
31    /// Window duration for correlation
32    window_duration: Duration,
33    /// Maximum events to keep per source/key (prevents unbounded growth)
34    max_events_per_key: usize,
35    /// Expiry queue for O(log n) garbage collection
36    /// Contains (expiry_time, source, key) tuples ordered by expiry_time
37    expiry_queue: BinaryHeap<Reverse<(DateTime<Utc>, String, String)>>,
38    /// Last GC time to avoid running on every event
39    last_gc: Option<DateTime<Utc>>,
40    /// Minimum interval between GC runs (default: 100ms of window duration)
41    gc_interval: Duration,
42    /// Join type (Inner, Left, Right, Full)
43    join_type: JoinType,
44}
45
46impl JoinBuffer {
47    /// Create a new JoinBuffer for correlating events from multiple sources
48    ///
49    /// # Arguments
50    /// * `sources` - Names of the source streams to join
51    /// * `join_keys` - Map of source name to the field used as join key for that source
52    /// * `window_duration` - How long to keep events for potential correlation
53    pub fn new(
54        sources: Vec<String>,
55        join_keys: FxHashMap<String, String>,
56        window_duration: Duration,
57    ) -> Self {
58        let mut buffers = FxHashMap::default();
59        for source in &sources {
60            buffers.insert(source.clone(), FxHashMap::default());
61        }
62
63        // GC interval is 10% of window duration, minimum 10ms, maximum 1 second
64        let gc_interval_ms = (window_duration.num_milliseconds() / 10).clamp(10, 1000);
65        let gc_interval = Duration::milliseconds(gc_interval_ms);
66
67        Self {
68            buffers,
69            sources,
70            join_keys,
71            window_duration,
72            max_events_per_key: 1000, // Default limit
73            expiry_queue: BinaryHeap::new(),
74            last_gc: None,
75            gc_interval,
76            join_type: JoinType::Inner,
77        }
78    }
79
80    /// Set the join type (Inner, Left, Right, Full)
81    pub const fn with_join_type(mut self, join_type: JoinType) -> Self {
82        self.join_type = join_type;
83        self
84    }
85
86    /// Set the maximum number of events to keep per source/key combination
87    pub const fn with_max_events(mut self, max_events: usize) -> Self {
88        self.max_events_per_key = max_events;
89        self
90    }
91
92    /// Add an event from a source stream and attempt to correlate
93    ///
94    /// # Arguments
95    /// * `source_name` - Which source stream this event came from
96    /// * `event` - The event to add
97    ///
98    /// # Returns
99    /// If events from all sources with matching keys exist within the window,
100    /// returns a correlated event containing fields from all sources.
101    pub fn add_event(&mut self, source_name: &str, event: Event) -> Option<Event> {
102        // Get the join key for this source
103        let join_key_field = match self.join_keys.get(source_name) {
104            Some(field) => field.clone(),
105            None => {
106                // Try to find a common key field
107                if let Some(field) = self.find_common_key_field(&event) {
108                    field
109                } else {
110                    debug!(
111                        "No join key field found for source '{}', skipping",
112                        source_name
113                    );
114                    return None;
115                }
116            }
117        };
118
119        // Extract the join key value from the event
120        let key_value = match event.get(&join_key_field) {
121            Some(v) => v.to_partition_key().into_owned(),
122            None => {
123                debug!(
124                    "Event missing join key field '{}', skipping",
125                    join_key_field
126                );
127                return None;
128            }
129        };
130
131        trace!(
132            "JoinBuffer: Adding event from '{}' with key '{}' = '{}'",
133            source_name,
134            join_key_field,
135            key_value
136        );
137
138        // Clean up expired events (uses expiry queue for O(log n) instead of O(n))
139        self.cleanup_expired(event.timestamp);
140
141        // Add event to the appropriate buffer
142        if let Some(source_buffer) = self.buffers.get_mut(source_name) {
143            let key_events = source_buffer.entry(key_value.clone()).or_default();
144
145            // Enforce max events limit
146            while key_events.len() >= self.max_events_per_key {
147                key_events.remove(0);
148            }
149
150            key_events.push((event.timestamp, event.clone()));
151
152            // Add to expiry queue for efficient GC
153            let expiry_time = event.timestamp + self.window_duration;
154            self.expiry_queue.push(Reverse((
155                expiry_time,
156                source_name.to_string(),
157                key_value.clone(),
158            )));
159        }
160
161        // Try to correlate events
162        let source = source_name.to_string();
163        self.try_correlate(&key_value, event.timestamp, Some(&source))
164    }
165
166    /// Try to find a matching event set for the given key.
167    ///
168    /// For inner joins: requires events from all sources.
169    /// For outer joins: emits with `Value::Null` for missing source fields.
170    fn try_correlate(
171        &mut self,
172        key_value: &str,
173        current_time: DateTime<Utc>,
174        triggering_source: Option<&str>,
175    ) -> Option<Event> {
176        let cutoff = current_time - self.window_duration;
177
178        // Collect events per source (Some = found, None = missing)
179        let mut source_events: Vec<(&str, Option<&Event>)> = Vec::new();
180        let mut missing_count = 0;
181
182        for source in &self.sources {
183            if let Some(source_buffer) = self.buffers.get(source) {
184                if let Some(key_events) = source_buffer.get(key_value) {
185                    let valid_event = key_events
186                        .iter()
187                        .rev()
188                        .find(|(ts, _)| *ts >= cutoff)
189                        .map(|(_, e)| e);
190
191                    match valid_event {
192                        Some(event) => {
193                            source_events.push((source.as_str(), Some(event)));
194                        }
195                        None => {
196                            trace!(
197                                "JoinBuffer: No valid event from '{}' for key '{}'",
198                                source,
199                                key_value
200                            );
201                            source_events.push((source.as_str(), None));
202                            missing_count += 1;
203                        }
204                    }
205                } else {
206                    trace!(
207                        "JoinBuffer: No events from '{}' for key '{}'",
208                        source,
209                        key_value
210                    );
211                    source_events.push((source.as_str(), None));
212                    missing_count += 1;
213                }
214            } else {
215                source_events.push((source.as_str(), None));
216                missing_count += 1;
217            }
218        }
219
220        // For inner join: all sources must be present
221        if missing_count == 0 {
222            debug!(
223                "JoinBuffer: Correlating {} events for key '{}'",
224                source_events.len(),
225                key_value
226            );
227            return Some(self.create_correlated_event_outer(&source_events));
228        }
229
230        // For inner join, any missing source means no emit
231        if self.join_type == JoinType::Inner {
232            return None;
233        }
234
235        // All sources missing — nothing to emit
236        if missing_count == self.sources.len() {
237            return None;
238        }
239
240        let triggering_source = triggering_source?;
241
242        // Determine if we should emit based on join type and which source triggered
243        let first_source = self.sources.first().map_or("", |s| s.as_str());
244        let is_left_trigger = triggering_source == first_source;
245
246        let should_emit = match self.join_type {
247            JoinType::Left => is_left_trigger,
248            JoinType::Right => !is_left_trigger,
249            JoinType::Full => true,
250            JoinType::Inner => false, // already handled above
251        };
252
253        if should_emit {
254            debug!(
255                "JoinBuffer: Outer join ({:?}) emitting for key '{}' from source '{}'",
256                self.join_type, key_value, triggering_source
257            );
258            Some(self.create_correlated_event_outer(&source_events))
259        } else {
260            None
261        }
262    }
263
264    /// Create a correlated event, filling `Value::Null` for missing sources (outer joins).
265    fn create_correlated_event_outer(&self, source_events: &[(&str, Option<&Event>)]) -> Event {
266        let mut correlated = Event::new("JoinedEvent");
267
268        // Use the most recent timestamp from available events
269        let max_ts = source_events
270            .iter()
271            .filter_map(|(_, e)| e.map(|ev| ev.timestamp))
272            .max()
273            .unwrap_or_else(Utc::now);
274        correlated.timestamp = max_ts;
275
276        // Merge fields from all events, prefixed by source name
277        for (source, maybe_event) in source_events {
278            match maybe_event {
279                Some(event) => {
280                    for (field, value) in &event.data {
281                        let prefixed_key = format!("{source}.{field}");
282                        correlated.data.insert(prefixed_key.into(), value.clone());
283
284                        if *source != &*event.event_type {
285                            let et_prefixed_key = format!("{}.{}", event.event_type, field);
286                            correlated
287                                .data
288                                .insert(et_prefixed_key.into(), value.clone());
289                        }
290
291                        if !correlated.data.contains_key(field) {
292                            correlated.data.insert(field.clone(), value.clone());
293                        }
294                    }
295                }
296                None => {
297                    // Outer join: source has no matching event — add null-prefixed marker
298                    // so downstream can detect missing sources via Source.field = null
299                    // The join key field gets a null value for the missing source
300                    if let Some(key_field) = self.join_keys.get(*source) {
301                        let prefixed_key = format!("{source}.{key_field}");
302                        correlated.data.insert(prefixed_key.into(), Value::Null);
303                    }
304                }
305            }
306        }
307
308        correlated
309    }
310
311    /// Remove events that have expired (outside the window)
312    ///
313    /// Uses an expiry queue for O(log n) cleanup instead of O(n) iteration over all keys.
314    /// Only runs periodically based on gc_interval to avoid overhead on every event.
315    fn cleanup_expired(&mut self, current_time: DateTime<Utc>) {
316        // Check if enough time has passed since last GC
317        if let Some(last_gc) = self.last_gc {
318            if current_time - last_gc < self.gc_interval {
319                return;
320            }
321        }
322        self.last_gc = Some(current_time);
323
324        let cutoff = current_time - self.window_duration;
325
326        // Process only expired entries from the queue - O(k log n) where k is expired entries
327        while let Some(Reverse((expiry_time, _, _))) = self.expiry_queue.peek() {
328            if *expiry_time > current_time {
329                // No more expired entries
330                break;
331            }
332
333            // Pop the expired entry (safe: we just peeked it above)
334            let Some(Reverse((_, source, key))) = self.expiry_queue.pop() else {
335                break;
336            };
337
338            // Clean up the specific key in the specific source buffer
339            if let Some(source_buffer) = self.buffers.get_mut(&source) {
340                if let Some(key_events) = source_buffer.get_mut(&key) {
341                    // Use binary search to find expired events
342                    let cutoff_idx = key_events.partition_point(|(ts, _)| *ts < cutoff);
343                    if cutoff_idx > 0 {
344                        key_events.drain(..cutoff_idx);
345                    }
346                    // Remove the key entry if empty
347                    if key_events.is_empty() {
348                        source_buffer.remove(&key);
349                    }
350                }
351            }
352        }
353    }
354
355    /// Try to find a common key field from the event's fields
356    fn find_common_key_field(&self, event: &Event) -> Option<String> {
357        // Common join key field names
358        const COMMON_KEYS: &[&str] = &["symbol", "key", "id", "user_id", "order_id"];
359
360        for key in COMMON_KEYS {
361            if event.data.contains_key(*key) {
362                return Some((*key).to_string());
363            }
364        }
365        None
366    }
367
368    /// Get statistics about the buffer state (for debugging)
369    pub fn stats(&self) -> JoinBufferStats {
370        let mut total_events = 0;
371        let mut events_per_source = FxHashMap::default();
372
373        for (source, buffer) in &self.buffers {
374            let source_count: usize = buffer.values().map(|v| v.len()).sum();
375            events_per_source.insert(source.clone(), source_count);
376            total_events += source_count;
377        }
378
379        JoinBufferStats {
380            total_events,
381            events_per_source,
382            sources: self.sources.clone(),
383        }
384    }
385}
386
387impl JoinBuffer {
388    /// Create a checkpoint of the join buffer state.
389    pub fn checkpoint(&self) -> crate::persistence::JoinCheckpoint {
390        use crate::persistence::SerializableEvent;
391        let mut buffers = HashMap::new();
392        for (source, keyed_buffer) in &self.buffers {
393            let mut keyed = HashMap::new();
394            for (key, events) in keyed_buffer {
395                let serialized: Vec<(i64, SerializableEvent)> = events
396                    .iter()
397                    .map(|(ts, e)| (ts.timestamp_millis(), SerializableEvent::from(e)))
398                    .collect();
399                keyed.insert(key.clone(), serialized);
400            }
401            buffers.insert(source.clone(), keyed);
402        }
403
404        crate::persistence::JoinCheckpoint {
405            buffers,
406            sources: self.sources.clone(),
407            join_keys: self
408                .join_keys
409                .iter()
410                .map(|(k, v)| (k.clone(), v.clone()))
411                .collect(),
412            window_duration_ms: self.window_duration.num_milliseconds(),
413        }
414    }
415
416    /// Restore join buffer state from a checkpoint.
417    pub fn restore(&mut self, cp: &crate::persistence::JoinCheckpoint) {
418        use std::cmp::Reverse;
419
420        use crate::event::Event;
421
422        self.buffers.clear();
423        self.expiry_queue = BinaryHeap::new();
424
425        for (source, keyed) in &cp.buffers {
426            let mut keyed_buffer: KeyedEventBuffer = FxHashMap::default();
427            for (key, events) in keyed {
428                let restored: Vec<(DateTime<Utc>, Event)> = events
429                    .iter()
430                    .filter_map(|(ts_ms, se)| {
431                        let ts = DateTime::from_timestamp_millis(*ts_ms)?;
432                        let event = Event::from(se.clone());
433                        Some((ts, event))
434                    })
435                    .collect();
436
437                // Rebuild expiry queue entries
438                for (ts, _) in &restored {
439                    let expiry_time = *ts + self.window_duration;
440                    self.expiry_queue
441                        .push(Reverse((expiry_time, source.clone(), key.clone())));
442                }
443
444                keyed_buffer.insert(key.clone(), restored);
445            }
446            self.buffers.insert(source.clone(), keyed_buffer);
447        }
448    }
449}
450
451/// Statistics about the JoinBuffer state
452#[derive(Debug)]
453pub struct JoinBufferStats {
454    pub total_events: usize,
455    pub events_per_source: FxHashMap<String, usize>,
456    pub sources: Vec<String>,
457}
458
459#[cfg(test)]
460mod tests {
461    use varpulis_core::Value;
462
463    use super::*;
464
465    fn create_event(event_type: &str, symbol: &str, value: f64) -> Event {
466        Event::new(event_type)
467            .with_field("symbol", symbol)
468            .with_field("value", value)
469    }
470
471    #[test]
472    fn test_join_buffer_correlates_matching_events() {
473        let sources = vec!["A".to_string(), "B".to_string()];
474        let mut join_keys = FxHashMap::default();
475        join_keys.insert("A".to_string(), "symbol".to_string());
476        join_keys.insert("B".to_string(), "symbol".to_string());
477
478        let mut buffer = JoinBuffer::new(sources, join_keys, Duration::minutes(1));
479
480        // Add event from source A
481        let event_a = create_event("A", "BTC", 100.0);
482        let result = buffer.add_event("A", event_a);
483        assert!(result.is_none(), "Should not correlate with just one event");
484
485        // Add event from source B with same symbol
486        let event_b = create_event("B", "BTC", 200.0);
487        let result = buffer.add_event("B", event_b);
488        assert!(
489            result.is_some(),
490            "Should correlate when both sources present"
491        );
492
493        let correlated = result.unwrap();
494        assert_eq!(correlated.get("symbol"), Some(&Value::Str("BTC".into())));
495        assert_eq!(correlated.get("A.value"), Some(&Value::Float(100.0)));
496        assert_eq!(correlated.get("B.value"), Some(&Value::Float(200.0)));
497    }
498
499    #[test]
500    fn test_join_buffer_no_correlation_different_keys() {
501        let sources = vec!["A".to_string(), "B".to_string()];
502        let mut join_keys = FxHashMap::default();
503        join_keys.insert("A".to_string(), "symbol".to_string());
504        join_keys.insert("B".to_string(), "symbol".to_string());
505
506        let mut buffer = JoinBuffer::new(sources, join_keys, Duration::minutes(1));
507
508        // Add event from source A with symbol "BTC"
509        let event_a = create_event("A", "BTC", 100.0);
510        buffer.add_event("A", event_a);
511
512        // Add event from source B with different symbol "ETH"
513        let event_b = create_event("B", "ETH", 200.0);
514        let result = buffer.add_event("B", event_b);
515        assert!(result.is_none(), "Should not correlate with different keys");
516    }
517
518    #[test]
519    fn test_join_buffer_window_expiration() {
520        let sources = vec!["A".to_string(), "B".to_string()];
521        let mut join_keys = FxHashMap::default();
522        join_keys.insert("A".to_string(), "symbol".to_string());
523        join_keys.insert("B".to_string(), "symbol".to_string());
524
525        let mut buffer = JoinBuffer::new(sources, join_keys, Duration::seconds(1));
526
527        let base_time = Utc::now();
528
529        // Add event from source A
530        let event_a = Event::new("A")
531            .with_timestamp(base_time)
532            .with_field("symbol", "BTC")
533            .with_field("value", 100.0f64);
534        buffer.add_event("A", event_a);
535
536        // Add event from source B much later (outside window)
537        let event_b = Event::new("B")
538            .with_timestamp(base_time + Duration::seconds(5))
539            .with_field("symbol", "BTC")
540            .with_field("value", 200.0f64);
541        let result = buffer.add_event("B", event_b);
542
543        assert!(result.is_none(), "Should not correlate - event A expired");
544    }
545
546    #[test]
547    fn test_join_buffer_stats() {
548        let sources = vec!["A".to_string(), "B".to_string()];
549        let mut join_keys = FxHashMap::default();
550        join_keys.insert("A".to_string(), "symbol".to_string());
551        join_keys.insert("B".to_string(), "symbol".to_string());
552
553        let mut buffer = JoinBuffer::new(sources, join_keys, Duration::minutes(1));
554
555        // Add events
556        buffer.add_event("A", create_event("A", "BTC", 100.0));
557        buffer.add_event("A", create_event("A", "ETH", 150.0));
558        buffer.add_event("B", create_event("B", "BTC", 200.0));
559
560        let stats = buffer.stats();
561        assert_eq!(stats.total_events, 3);
562        assert_eq!(stats.events_per_source.get("A"), Some(&2));
563        assert_eq!(stats.events_per_source.get("B"), Some(&1));
564    }
565
566    #[test]
567    fn test_join_buffer_multiple_matches() {
568        let sources = vec!["A".to_string(), "B".to_string()];
569        let mut join_keys = FxHashMap::default();
570        join_keys.insert("A".to_string(), "symbol".to_string());
571        join_keys.insert("B".to_string(), "symbol".to_string());
572
573        let mut buffer = JoinBuffer::new(sources, join_keys, Duration::minutes(1));
574
575        // Add multiple events from A for BTC
576        buffer.add_event("A", create_event("A", "BTC", 100.0));
577        buffer.add_event("A", create_event("A", "BTC", 110.0));
578
579        // Add event from B - should correlate with most recent A event
580        let event_b = create_event("B", "BTC", 200.0);
581        let result = buffer.add_event("B", event_b);
582
583        assert!(result.is_some());
584        let correlated = result.unwrap();
585        // Should use the most recent event from A (value=110)
586        assert_eq!(correlated.get("A.value"), Some(&Value::Float(110.0)));
587    }
588
589    #[test]
590    fn test_join_buffer_three_way_join() {
591        let sources = vec!["A".to_string(), "B".to_string(), "C".to_string()];
592        let mut join_keys = FxHashMap::default();
593        join_keys.insert("A".to_string(), "symbol".to_string());
594        join_keys.insert("B".to_string(), "symbol".to_string());
595        join_keys.insert("C".to_string(), "symbol".to_string());
596
597        let mut buffer = JoinBuffer::new(sources, join_keys, Duration::minutes(1));
598
599        // Add events from A and B
600        buffer.add_event("A", create_event("A", "BTC", 100.0));
601        let result = buffer.add_event("B", create_event("B", "BTC", 200.0));
602        assert!(
603            result.is_none(),
604            "Should not correlate with just 2 of 3 sources"
605        );
606
607        // Add event from C - should now correlate
608        let result = buffer.add_event("C", create_event("C", "BTC", 300.0));
609        assert!(result.is_some(), "Should correlate with all 3 sources");
610
611        let correlated = result.unwrap();
612        assert_eq!(correlated.get("A.value"), Some(&Value::Float(100.0)));
613        assert_eq!(correlated.get("B.value"), Some(&Value::Float(200.0)));
614        assert_eq!(correlated.get("C.value"), Some(&Value::Float(300.0)));
615    }
616
617    #[test]
618    fn test_join_buffer_max_events_limit() {
619        let sources = vec!["A".to_string(), "B".to_string()];
620        let mut join_keys = FxHashMap::default();
621        join_keys.insert("A".to_string(), "symbol".to_string());
622        join_keys.insert("B".to_string(), "symbol".to_string());
623
624        let mut buffer =
625            JoinBuffer::new(sources, join_keys, Duration::minutes(1)).with_max_events(3);
626
627        // Add 5 events from A for same symbol
628        for i in 0..5 {
629            buffer.add_event("A", create_event("A", "BTC", i as f64));
630        }
631
632        // Should only keep 3 events (most recent)
633        let stats = buffer.stats();
634        assert_eq!(stats.events_per_source.get("A"), Some(&3));
635    }
636
637    #[test]
638    fn test_join_buffer_missing_key_field() {
639        let sources = vec!["A".to_string(), "B".to_string()];
640        let mut join_keys = FxHashMap::default();
641        join_keys.insert("A".to_string(), "symbol".to_string());
642        join_keys.insert("B".to_string(), "symbol".to_string());
643
644        let mut buffer = JoinBuffer::new(sources, join_keys, Duration::minutes(1));
645
646        // Add event from A
647        buffer.add_event("A", create_event("A", "BTC", 100.0));
648
649        // Add event from B without symbol field
650        let event_b = Event::new("B").with_field("value", 200.0f64);
651        let result = buffer.add_event("B", event_b);
652
653        assert!(result.is_none(), "Should not correlate - missing key field");
654    }
655
656    #[test]
657    fn test_join_buffer_common_key_detection() {
658        let sources = vec!["A".to_string(), "B".to_string()];
659        // Empty join keys - should detect common "symbol" field
660        let join_keys = FxHashMap::default();
661
662        let mut buffer = JoinBuffer::new(sources, join_keys, Duration::minutes(1));
663
664        // Add events with symbol field
665        buffer.add_event("A", create_event("A", "BTC", 100.0));
666        let result = buffer.add_event("B", create_event("B", "BTC", 200.0));
667
668        // Should auto-detect "symbol" as common key
669        assert!(
670            result.is_some(),
671            "Should correlate using auto-detected symbol key"
672        );
673    }
674
675    #[test]
676    fn test_join_buffer_continuous_correlation() {
677        let sources = vec!["A".to_string(), "B".to_string()];
678        let mut join_keys = FxHashMap::default();
679        join_keys.insert("A".to_string(), "symbol".to_string());
680        join_keys.insert("B".to_string(), "symbol".to_string());
681
682        let mut buffer = JoinBuffer::new(sources, join_keys, Duration::minutes(1));
683
684        // First correlation
685        buffer.add_event("A", create_event("A", "BTC", 100.0));
686        let result1 = buffer.add_event("B", create_event("B", "BTC", 200.0));
687        assert!(result1.is_some());
688
689        // New events should also correlate
690        let result2 = buffer.add_event("A", create_event("A", "BTC", 150.0));
691        assert!(
692            result2.is_some(),
693            "Should correlate again with existing B event"
694        );
695
696        let result3 = buffer.add_event("B", create_event("B", "BTC", 250.0));
697        assert!(result3.is_some(), "Should correlate with recent A event");
698    }
699
700    #[test]
701    fn test_join_buffer_multiple_symbols() {
702        let sources = vec!["A".to_string(), "B".to_string()];
703        let mut join_keys = FxHashMap::default();
704        join_keys.insert("A".to_string(), "symbol".to_string());
705        join_keys.insert("B".to_string(), "symbol".to_string());
706
707        let mut buffer = JoinBuffer::new(sources, join_keys, Duration::minutes(1));
708
709        // Add events for different symbols
710        buffer.add_event("A", create_event("A", "BTC", 100.0));
711        buffer.add_event("A", create_event("A", "ETH", 50.0));
712        buffer.add_event("B", create_event("B", "ETH", 60.0)); // Should correlate with ETH
713
714        let stats = buffer.stats();
715        // After correlation, events are still in buffer
716        assert!(stats.total_events >= 2);
717    }
718
719    #[test]
720    fn test_join_buffer_checkpoint_restore() {
721        let sources = vec!["A".to_string(), "B".to_string()];
722        let mut join_keys = FxHashMap::default();
723        join_keys.insert("A".to_string(), "symbol".to_string());
724        join_keys.insert("B".to_string(), "symbol".to_string());
725
726        let mut buffer = JoinBuffer::new(sources.clone(), join_keys.clone(), Duration::minutes(1));
727
728        // Add event from source A
729        let event_a = create_event("A", "BTC", 100.0);
730        let result = buffer.add_event("A", event_a);
731        assert!(
732            result.is_none(),
733            "Should not correlate with just one source"
734        );
735
736        // Checkpoint the buffer state
737        let cp = buffer.checkpoint();
738
739        // Create a new buffer with the same configuration
740        let mut buffer2 = JoinBuffer::new(sources, join_keys, Duration::minutes(1));
741
742        // Restore from the checkpoint
743        buffer2.restore(&cp);
744
745        // Add event from source B with matching symbol to the restored buffer
746        let event_b = create_event("B", "BTC", 200.0);
747        let result = buffer2.add_event("B", event_b);
748        assert!(
749            result.is_some(),
750            "Should correlate after restoring source A event from checkpoint"
751        );
752
753        let correlated = result.unwrap();
754        assert_eq!(correlated.get("symbol"), Some(&Value::Str("BTC".into())));
755        assert_eq!(correlated.get("A.value"), Some(&Value::Float(100.0)));
756        assert_eq!(correlated.get("B.value"), Some(&Value::Float(200.0)));
757    }
758
759    #[test]
760    fn test_join_buffer_checkpoint_empty() {
761        let sources = vec!["A".to_string(), "B".to_string()];
762        let mut join_keys = FxHashMap::default();
763        join_keys.insert("A".to_string(), "symbol".to_string());
764        join_keys.insert("B".to_string(), "symbol".to_string());
765
766        let buffer = JoinBuffer::new(sources.clone(), join_keys.clone(), Duration::minutes(1));
767
768        // Checkpoint with no events added
769        let cp = buffer.checkpoint();
770
771        // Verify the checkpoint has empty buffers
772        for keyed in cp.buffers.values() {
773            assert!(keyed.is_empty(), "Checkpoint buffers should be empty");
774        }
775
776        // Restore into a new buffer
777        let mut buffer2 = JoinBuffer::new(sources, join_keys, Duration::minutes(1));
778        buffer2.restore(&cp);
779
780        // Verify the restored buffer works normally
781        let event_a = create_event("A", "BTC", 100.0);
782        let result = buffer2.add_event("A", event_a);
783        assert!(
784            result.is_none(),
785            "Should not correlate with just one source"
786        );
787
788        let event_b = create_event("B", "BTC", 200.0);
789        let result = buffer2.add_event("B", event_b);
790        assert!(
791            result.is_some(),
792            "Should correlate normally after restoring from empty checkpoint"
793        );
794
795        let correlated = result.unwrap();
796        assert_eq!(correlated.get("symbol"), Some(&Value::Str("BTC".into())));
797    }
798
799    // ==========================================================================
800    // Outer Join Tests
801    // ==========================================================================
802
803    #[test]
804    fn test_left_join_emits_on_left_without_right() {
805        let sources = vec!["A".to_string(), "B".to_string()];
806        let mut join_keys = FxHashMap::default();
807        join_keys.insert("A".to_string(), "symbol".to_string());
808        join_keys.insert("B".to_string(), "symbol".to_string());
809
810        let mut buffer = JoinBuffer::new(sources, join_keys, Duration::minutes(1))
811            .with_join_type(JoinType::Left);
812
813        // Add event from left source A — should emit even without B
814        let event_a = create_event("A", "BTC", 100.0);
815        let result = buffer.add_event("A", event_a);
816        assert!(result.is_some(), "Left join should emit on left event");
817
818        let correlated = result.unwrap();
819        assert_eq!(correlated.get("A.value"), Some(&Value::Float(100.0)));
820        // B fields should be null
821        assert_eq!(correlated.get("B.symbol"), Some(&Value::Null));
822    }
823
824    #[test]
825    fn test_left_join_does_not_emit_on_right_alone() {
826        let sources = vec!["A".to_string(), "B".to_string()];
827        let mut join_keys = FxHashMap::default();
828        join_keys.insert("A".to_string(), "symbol".to_string());
829        join_keys.insert("B".to_string(), "symbol".to_string());
830
831        let mut buffer = JoinBuffer::new(sources, join_keys, Duration::minutes(1))
832            .with_join_type(JoinType::Left);
833
834        // Add event from right source B only — should NOT emit
835        let event_b = create_event("B", "BTC", 200.0);
836        let result = buffer.add_event("B", event_b);
837        assert!(
838            result.is_none(),
839            "Left join should not emit on right-only event"
840        );
841    }
842
843    #[test]
844    fn test_left_join_full_match_emits() {
845        let sources = vec!["A".to_string(), "B".to_string()];
846        let mut join_keys = FxHashMap::default();
847        join_keys.insert("A".to_string(), "symbol".to_string());
848        join_keys.insert("B".to_string(), "symbol".to_string());
849
850        let mut buffer = JoinBuffer::new(sources, join_keys, Duration::minutes(1))
851            .with_join_type(JoinType::Left);
852
853        buffer.add_event("A", create_event("A", "BTC", 100.0));
854        let result = buffer.add_event("B", create_event("B", "BTC", 200.0));
855        assert!(result.is_some(), "Left join should emit on full match");
856
857        let correlated = result.unwrap();
858        assert_eq!(correlated.get("A.value"), Some(&Value::Float(100.0)));
859        assert_eq!(correlated.get("B.value"), Some(&Value::Float(200.0)));
860    }
861
862    #[test]
863    fn test_right_join_emits_on_right_without_left() {
864        let sources = vec!["A".to_string(), "B".to_string()];
865        let mut join_keys = FxHashMap::default();
866        join_keys.insert("A".to_string(), "symbol".to_string());
867        join_keys.insert("B".to_string(), "symbol".to_string());
868
869        let mut buffer = JoinBuffer::new(sources, join_keys, Duration::minutes(1))
870            .with_join_type(JoinType::Right);
871
872        // Add event from right source B — should emit even without A
873        let event_b = create_event("B", "BTC", 200.0);
874        let result = buffer.add_event("B", event_b);
875        assert!(result.is_some(), "Right join should emit on right event");
876
877        let correlated = result.unwrap();
878        assert_eq!(correlated.get("B.value"), Some(&Value::Float(200.0)));
879        assert_eq!(correlated.get("A.symbol"), Some(&Value::Null));
880    }
881
882    #[test]
883    fn test_right_join_does_not_emit_on_left_alone() {
884        let sources = vec!["A".to_string(), "B".to_string()];
885        let mut join_keys = FxHashMap::default();
886        join_keys.insert("A".to_string(), "symbol".to_string());
887        join_keys.insert("B".to_string(), "symbol".to_string());
888
889        let mut buffer = JoinBuffer::new(sources, join_keys, Duration::minutes(1))
890            .with_join_type(JoinType::Right);
891
892        let event_a = create_event("A", "BTC", 100.0);
893        let result = buffer.add_event("A", event_a);
894        assert!(
895            result.is_none(),
896            "Right join should not emit on left-only event"
897        );
898    }
899
900    #[test]
901    fn test_full_join_emits_on_either_side() {
902        let sources = vec!["A".to_string(), "B".to_string()];
903        let mut join_keys = FxHashMap::default();
904        join_keys.insert("A".to_string(), "symbol".to_string());
905        join_keys.insert("B".to_string(), "symbol".to_string());
906
907        let mut buffer = JoinBuffer::new(sources, join_keys, Duration::minutes(1))
908            .with_join_type(JoinType::Full);
909
910        // Left source arrives — should emit
911        let result = buffer.add_event("A", create_event("A", "BTC", 100.0));
912        assert!(result.is_some(), "Full join should emit on left event");
913        let correlated = result.unwrap();
914        assert_eq!(correlated.get("A.value"), Some(&Value::Float(100.0)));
915        assert_eq!(correlated.get("B.symbol"), Some(&Value::Null));
916
917        // Right source arrives — should emit with both
918        let result = buffer.add_event("B", create_event("B", "BTC", 200.0));
919        assert!(result.is_some(), "Full join should emit on right event");
920        let correlated = result.unwrap();
921        assert_eq!(correlated.get("A.value"), Some(&Value::Float(100.0)));
922        assert_eq!(correlated.get("B.value"), Some(&Value::Float(200.0)));
923    }
924
925    #[test]
926    fn test_inner_join_unchanged_behavior() {
927        let sources = vec!["A".to_string(), "B".to_string()];
928        let mut join_keys = FxHashMap::default();
929        join_keys.insert("A".to_string(), "symbol".to_string());
930        join_keys.insert("B".to_string(), "symbol".to_string());
931
932        let mut buffer = JoinBuffer::new(sources, join_keys, Duration::minutes(1))
933            .with_join_type(JoinType::Inner);
934
935        // Left alone should NOT emit
936        let result = buffer.add_event("A", create_event("A", "BTC", 100.0));
937        assert!(result.is_none(), "Inner join should not emit with one side");
938
939        // Both sides should emit
940        let result = buffer.add_event("B", create_event("B", "BTC", 200.0));
941        assert!(result.is_some(), "Inner join should emit with both sides");
942    }
943}