rust_rule_engine/rete/
stream_beta_node.rs

1//! Stream Beta Node - Multi-Stream Join Processing
2//!
3//! Handles joins between multiple stream patterns in RETE network.
4//! Correlates events from different streams based on join conditions.
5//!
6//! Supports:
7//! - Two-stream joins: moisture && temp
8//! - Three+ stream joins: moisture && temp && weather (nested beta nodes)
9//!
10//! Example:
11//! ```grl
12//! moisture: MoistureSensor from stream("moisture-sensors") over window(5 min, sliding) &&
13//! temp: TemperatureSensor from stream("temperature-sensors") over window(5 min, sliding) &&
14//! moisture.zone_id == temp.zone_id
15//! ```
16
17use crate::rete::stream_alpha_node::StreamAlphaNode;
18use crate::streaming::event::StreamEvent;
19use crate::types::Value;
20use std::sync::{Arc, Mutex};
21use std::time::{Duration, SystemTime};
22
23/// Input to a beta node - can be either an alpha node or another beta node
24/// This enables nested beta nodes for 3+ stream joins
25#[derive(Debug, Clone)]
26pub enum BetaInput {
27    /// Direct alpha node (single stream)
28    Alpha(Arc<Mutex<StreamAlphaNode>>),
29    /// Nested beta node (already joined streams)
30    Beta(Arc<Mutex<StreamBetaNode>>),
31}
32
33/// Result of multi-stream join (supports 2+ streams)
34#[derive(Debug, Clone)]
35pub struct MultiStreamJoinResult {
36    /// All events that participated in the join (ordered)
37    pub events: Vec<StreamEvent>,
38    /// Timestamp when join was completed
39    pub join_timestamp: SystemTime,
40}
41
42impl MultiStreamJoinResult {
43    /// Create from two events (basic 2-stream join)
44    pub fn from_two_events(left: StreamEvent, right: StreamEvent, timestamp: SystemTime) -> Self {
45        Self {
46            events: vec![left, right],
47            join_timestamp: timestamp,
48        }
49    }
50
51    /// Create from existing result + new event (nested join)
52    pub fn from_result_and_event(
53        result: MultiStreamJoinResult,
54        event: StreamEvent,
55        timestamp: SystemTime,
56    ) -> Self {
57        let mut events = result.events;
58        events.push(event);
59        Self {
60            events,
61            join_timestamp: timestamp,
62        }
63    }
64
65    /// Get event by index
66    pub fn get_event(&self, index: usize) -> Option<&StreamEvent> {
67        self.events.get(index)
68    }
69
70    /// Get first event (for backward compatibility)
71    pub fn left_event(&self) -> &StreamEvent {
72        &self.events[0]
73    }
74
75    /// Get second event (for backward compatibility)
76    pub fn right_event(&self) -> &StreamEvent {
77        &self.events[1]
78    }
79}
80
81/// Join condition between two streams
82#[derive(Debug, Clone)]
83pub struct JoinCondition {
84    /// Field from left stream (e.g., "zone_id")
85    pub left_field: String,
86    /// Field from right stream (e.g., "zone_id")
87    pub right_field: String,
88    /// Join operator (currently only "==" supported)
89    pub operator: JoinOperator,
90}
91
92#[derive(Debug, Clone, PartialEq)]
93pub enum JoinOperator {
94    Equal,
95    // Future: NotEqual, GreaterThan, etc.
96}
97
98/// Join strategy
99#[derive(Debug, Clone)]
100pub enum JoinStrategy {
101    /// Join within time window
102    TimeWindow { duration: Duration },
103    /// Join on exact timestamp match
104    ExactTimestamp,
105}
106
107/// Beta node for joining two streams (or stream + joined result)
108/// Supports nested joins for 3+ stream correlation
109#[derive(Debug)]
110pub struct StreamBetaNode {
111    /// Name for debugging
112    pub name: String,
113    /// Left input (alpha node or nested beta node)
114    pub left_input: BetaInput,
115    /// Right input (alpha node or nested beta node)
116    pub right_input: BetaInput,
117    /// Join conditions (e.g., zone_id == zone_id)
118    pub join_conditions: Vec<JoinCondition>,
119    /// Join strategy
120    pub strategy: JoinStrategy,
121    /// Buffered results from left input (wrapped in MultiStreamJoinResult)
122    left_buffer: Vec<(SystemTime, MultiStreamJoinResult)>,
123    /// Buffered results from right input (wrapped in MultiStreamJoinResult)
124    right_buffer: Vec<(SystemTime, MultiStreamJoinResult)>,
125}
126
127/// Result of a successful join
128#[derive(Debug, Clone)]
129pub struct JoinedStreamEvent {
130    pub left_event: StreamEvent,
131    pub right_event: StreamEvent,
132    pub join_timestamp: SystemTime,
133}
134
135impl StreamBetaNode {
136    /// Create a new beta node for stream join
137    pub fn new(
138        name: String,
139        left_input: BetaInput,
140        right_input: BetaInput,
141        join_conditions: Vec<JoinCondition>,
142        strategy: JoinStrategy,
143    ) -> Self {
144        Self {
145            name,
146            left_input,
147            right_input,
148            join_conditions,
149            strategy,
150            left_buffer: Vec::new(),
151            right_buffer: Vec::new(),
152        }
153    }
154
155    /// Create beta node from two alpha nodes (simple 2-stream join)
156    pub fn from_alpha_nodes(
157        name: String,
158        left_alpha: Arc<Mutex<StreamAlphaNode>>,
159        right_alpha: Arc<Mutex<StreamAlphaNode>>,
160        join_conditions: Vec<JoinCondition>,
161        strategy: JoinStrategy,
162    ) -> Self {
163        Self::new(
164            name,
165            BetaInput::Alpha(left_alpha),
166            BetaInput::Alpha(right_alpha),
167            join_conditions,
168            strategy,
169        )
170    }
171
172    /// Create beta node for nested join (beta + alpha)
173    pub fn from_beta_and_alpha(
174        name: String,
175        left_beta: Arc<Mutex<StreamBetaNode>>,
176        right_alpha: Arc<Mutex<StreamAlphaNode>>,
177        join_conditions: Vec<JoinCondition>,
178        strategy: JoinStrategy,
179    ) -> Self {
180        Self::new(
181            name,
182            BetaInput::Beta(left_beta),
183            BetaInput::Alpha(right_alpha),
184            join_conditions,
185            strategy,
186        )
187    }
188
189    /// Process event from left input (wrap in MultiStreamJoinResult)
190    pub fn process_left_event(&mut self, event: StreamEvent) -> Vec<MultiStreamJoinResult> {
191        let now = SystemTime::now();
192        let wrapped = MultiStreamJoinResult {
193            events: vec![event],
194            join_timestamp: now,
195        };
196        self.process_left_result(wrapped)
197    }
198
199    /// Process event from right input (wrap in MultiStreamJoinResult)
200    pub fn process_right_event(&mut self, event: StreamEvent) -> Vec<MultiStreamJoinResult> {
201        let now = SystemTime::now();
202        let wrapped = MultiStreamJoinResult {
203            events: vec![event],
204            join_timestamp: now,
205        };
206        self.process_right_result(wrapped)
207    }
208
209    /// Process join result from left input (for nested beta nodes)
210    pub fn process_left_result(
211        &mut self,
212        result: MultiStreamJoinResult,
213    ) -> Vec<MultiStreamJoinResult> {
214        let now = SystemTime::now();
215
216        // Add to left buffer
217        self.left_buffer.push((now, result.clone()));
218
219        // Clean old results based on strategy
220        self.cleanup_buffers(now);
221
222        // Try to find matching results in right buffer
223        self.find_matches(&result, &self.right_buffer, true)
224    }
225
226    /// Process join result from right input (for nested beta nodes)
227    pub fn process_right_result(
228        &mut self,
229        result: MultiStreamJoinResult,
230    ) -> Vec<MultiStreamJoinResult> {
231        let now = SystemTime::now();
232
233        // Add to right buffer
234        self.right_buffer.push((now, result.clone()));
235
236        // Clean old results based on strategy
237        self.cleanup_buffers(now);
238
239        // Try to find matching results in left buffer
240        self.find_matches(&result, &self.left_buffer, false)
241    }
242
243    /// Find matching results for join
244    fn find_matches(
245        &self,
246        new_result: &MultiStreamJoinResult,
247        other_buffer: &[(SystemTime, MultiStreamJoinResult)],
248        is_left: bool,
249    ) -> Vec<MultiStreamJoinResult> {
250        let mut matches = Vec::new();
251
252        for (timestamp, buffered_result) in other_buffer {
253            // Check if results satisfy all join conditions
254            if self.check_join_conditions_multi(new_result, buffered_result, is_left) {
255                // Combine the two results
256                let joined = if is_left {
257                    // new_result (left) + buffered_result (right)
258                    self.combine_results(new_result.clone(), buffered_result.clone(), *timestamp)
259                } else {
260                    // buffered_result (left) + new_result (right)
261                    self.combine_results(buffered_result.clone(), new_result.clone(), *timestamp)
262                };
263                matches.push(joined);
264            }
265        }
266
267        matches
268    }
269
270    /// Combine two MultiStreamJoinResults into one
271    fn combine_results(
272        &self,
273        left: MultiStreamJoinResult,
274        right: MultiStreamJoinResult,
275        timestamp: SystemTime,
276    ) -> MultiStreamJoinResult {
277        let mut all_events = left.events;
278        all_events.extend(right.events);
279        MultiStreamJoinResult {
280            events: all_events,
281            join_timestamp: timestamp,
282        }
283    }
284
285    /// Check if two MultiStreamJoinResults satisfy all join conditions
286    /// For nested joins, compares the LAST event in left result with FIRST event in right result
287    fn check_join_conditions_multi(
288        &self,
289        left_result: &MultiStreamJoinResult,
290        right_result: &MultiStreamJoinResult,
291        is_left: bool,
292    ) -> bool {
293        // Get the events to compare
294        // For left: use last event (most recently joined)
295        // For right: use first event (typically a single new event)
296        let left_event = left_result.events.last().unwrap();
297        let right_event = right_result.events.first().unwrap();
298
299        for condition in &self.join_conditions {
300            let (left_field, right_field) = if is_left {
301                (&condition.left_field, &condition.right_field)
302            } else {
303                (&condition.right_field, &condition.left_field)
304            };
305
306            let left_value = Self::extract_field_value(left_event, left_field);
307            let right_value = Self::extract_field_value(right_event, right_field);
308
309            match condition.operator {
310                JoinOperator::Equal => {
311                    if left_value != right_value {
312                        return false;
313                    }
314                }
315            }
316        }
317
318        true
319    }
320
321    /// Extract field value from event
322    fn extract_field_value(event: &StreamEvent, field: &str) -> Option<String> {
323        event.data.get(field).and_then(|v| match v {
324            Value::String(s) => Some(s.clone()),
325            Value::Integer(i) => Some(i.to_string()),
326            Value::Number(n) => Some(n.to_string()),
327            _ => None,
328        })
329    }
330
331    /// Clean up old events from buffers based on strategy
332    fn cleanup_buffers(&mut self, now: SystemTime) {
333        match &self.strategy {
334            JoinStrategy::TimeWindow { duration } => {
335                let cutoff = now.checked_sub(*duration).unwrap_or(SystemTime::UNIX_EPOCH);
336
337                self.left_buffer.retain(|(ts, _)| *ts >= cutoff);
338                self.right_buffer.retain(|(ts, _)| *ts >= cutoff);
339            }
340            JoinStrategy::ExactTimestamp => {
341                // For exact timestamp, we can be more aggressive
342                // Keep only recent events (e.g., last 100)
343                const MAX_BUFFER_SIZE: usize = 100;
344                if self.left_buffer.len() > MAX_BUFFER_SIZE {
345                    self.left_buffer
346                        .drain(0..self.left_buffer.len() - MAX_BUFFER_SIZE);
347                }
348                if self.right_buffer.len() > MAX_BUFFER_SIZE {
349                    self.right_buffer
350                        .drain(0..self.right_buffer.len() - MAX_BUFFER_SIZE);
351                }
352            }
353        }
354    }
355
356    /// Get statistics about buffer sizes
357    pub fn get_stats(&self) -> BetaNodeStats {
358        BetaNodeStats {
359            left_buffer_size: self.left_buffer.len(),
360            right_buffer_size: self.right_buffer.len(),
361        }
362    }
363}
364
365#[derive(Debug, Clone)]
366pub struct BetaNodeStats {
367    pub left_buffer_size: usize,
368    pub right_buffer_size: usize,
369}
370
371#[cfg(test)]
372mod tests {
373    use super::*;
374    use crate::rete::stream_alpha_node::WindowSpec;
375    use crate::streaming::window::WindowType;
376    use std::collections::HashMap;
377
378    #[test]
379    fn test_stream_beta_node_join() {
380        // Create alpha nodes
381        let left_alpha = Arc::new(Mutex::new(StreamAlphaNode::new(
382            "moisture-sensors",
383            Some("MoistureSensor".to_string()),
384            Some(WindowSpec {
385                duration: Duration::from_secs(300),
386                window_type: WindowType::Sliding,
387            }),
388        )));
389
390        let right_alpha = Arc::new(Mutex::new(StreamAlphaNode::new(
391            "temperature-sensors",
392            Some("TemperatureSensor".to_string()),
393            Some(WindowSpec {
394                duration: Duration::from_secs(300),
395                window_type: WindowType::Sliding,
396            }),
397        )));
398
399        // Create beta node with join condition using from_alpha_nodes
400        let mut beta = StreamBetaNode::from_alpha_nodes(
401            "irrigation_join".to_string(),
402            left_alpha,
403            right_alpha,
404            vec![JoinCondition {
405                left_field: "zone_id".to_string(),
406                right_field: "zone_id".to_string(),
407                operator: JoinOperator::Equal,
408            }],
409            JoinStrategy::TimeWindow {
410                duration: Duration::from_secs(300),
411            },
412        );
413
414        // Create test events
415        let mut moisture_data = HashMap::new();
416        moisture_data.insert("zone_id".to_string(), Value::String("zone_1".to_string()));
417        moisture_data.insert("moisture_level".to_string(), Value::Number(25.5));
418
419        use crate::streaming::event::EventMetadata;
420
421        let moisture_event = StreamEvent {
422            id: "m1".to_string(),
423            event_type: "MoistureSensor".to_string(),
424            data: moisture_data,
425            metadata: EventMetadata {
426                timestamp: 1000,
427                source: "sensor-1".to_string(),
428                sequence: 1,
429                tags: HashMap::new(),
430            },
431        };
432
433        let mut temp_data = HashMap::new();
434        temp_data.insert("zone_id".to_string(), Value::String("zone_1".to_string()));
435        temp_data.insert("temperature".to_string(), Value::Number(35.0));
436
437        let temp_event = StreamEvent {
438            id: "t1".to_string(),
439            event_type: "TemperatureSensor".to_string(),
440            data: temp_data,
441            metadata: EventMetadata {
442                timestamp: 1100,
443                source: "sensor-2".to_string(),
444                sequence: 2,
445                tags: HashMap::new(),
446            },
447        };
448
449        // Process events - now returns MultiStreamJoinResult
450        let left_matches = beta.process_left_event(moisture_event);
451        assert_eq!(left_matches.len(), 0); // No match yet
452
453        let right_matches = beta.process_right_event(temp_event);
454        assert_eq!(right_matches.len(), 1); // Should match!
455
456        // Verify the joined result
457        let joined = &right_matches[0];
458        assert_eq!(joined.events.len(), 2); // Two events joined
459        assert_eq!(
460            joined.events[0].data.get("zone_id").unwrap(),
461            &Value::String("zone_1".to_string())
462        );
463        assert_eq!(
464            joined.events[1].data.get("zone_id").unwrap(),
465            &Value::String("zone_1".to_string())
466        );
467    }
468
469    #[test]
470    fn test_nested_beta_three_stream_join() {
471        use crate::streaming::event::EventMetadata;
472
473        // Create alpha nodes for 3 streams
474        let moisture_alpha = Arc::new(Mutex::new(StreamAlphaNode::new(
475            "moisture-sensors",
476            Some("MoistureSensor".to_string()),
477            Some(WindowSpec {
478                duration: Duration::from_secs(300),
479                window_type: WindowType::Sliding,
480            }),
481        )));
482
483        let temp_alpha = Arc::new(Mutex::new(StreamAlphaNode::new(
484            "temperature-sensors",
485            Some("TemperatureSensor".to_string()),
486            Some(WindowSpec {
487                duration: Duration::from_secs(300),
488                window_type: WindowType::Sliding,
489            }),
490        )));
491
492        let weather_alpha = Arc::new(Mutex::new(StreamAlphaNode::new(
493            "weather-events",
494            Some("WeatherEvent".to_string()),
495            Some(WindowSpec {
496                duration: Duration::from_secs(300),
497                window_type: WindowType::Sliding,
498            }),
499        )));
500
501        // Create Beta1: moisture + temp
502        let beta1 = Arc::new(Mutex::new(StreamBetaNode::from_alpha_nodes(
503            "moisture_temp_join".to_string(),
504            moisture_alpha,
505            temp_alpha,
506            vec![JoinCondition {
507                left_field: "zone_id".to_string(),
508                right_field: "zone_id".to_string(),
509                operator: JoinOperator::Equal,
510            }],
511            JoinStrategy::TimeWindow {
512                duration: Duration::from_secs(300),
513            },
514        )));
515
516        // Create Beta2: (moisture+temp) + weather
517        let mut beta2 = StreamBetaNode::from_beta_and_alpha(
518            "full_join".to_string(),
519            beta1.clone(),
520            weather_alpha,
521            vec![JoinCondition {
522                left_field: "zone_id".to_string(),  // from temp (last in beta1)
523                right_field: "zone_id".to_string(), // from weather
524                operator: JoinOperator::Equal,
525            }],
526            JoinStrategy::TimeWindow {
527                duration: Duration::from_secs(300),
528            },
529        );
530
531        // Create test events
532        let mut moisture_data = HashMap::new();
533        moisture_data.insert("zone_id".to_string(), Value::String("zone_1".to_string()));
534        moisture_data.insert("moisture_level".to_string(), Value::Number(20.0));
535
536        let moisture_event = StreamEvent {
537            id: "m1".to_string(),
538            event_type: "MoistureSensor".to_string(),
539            data: moisture_data,
540            metadata: EventMetadata {
541                timestamp: 1000,
542                source: "sensor-1".to_string(),
543                sequence: 1,
544                tags: HashMap::new(),
545            },
546        };
547
548        let mut temp_data = HashMap::new();
549        temp_data.insert("zone_id".to_string(), Value::String("zone_1".to_string()));
550        temp_data.insert("temperature".to_string(), Value::Number(35.0));
551
552        let temp_event = StreamEvent {
553            id: "t1".to_string(),
554            event_type: "TemperatureSensor".to_string(),
555            data: temp_data,
556            metadata: EventMetadata {
557                timestamp: 1100,
558                source: "sensor-2".to_string(),
559                sequence: 2,
560                tags: HashMap::new(),
561            },
562        };
563
564        let mut weather_data = HashMap::new();
565        weather_data.insert("zone_id".to_string(), Value::String("zone_1".to_string()));
566        weather_data.insert("condition".to_string(), Value::String("sunny".to_string()));
567
568        let weather_event = StreamEvent {
569            id: "w1".to_string(),
570            event_type: "WeatherEvent".to_string(),
571            data: weather_data,
572            metadata: EventMetadata {
573                timestamp: 1200,
574                source: "weather-1".to_string(),
575                sequence: 3,
576                tags: HashMap::new(),
577            },
578        };
579
580        // Step 1: Join moisture + temp in beta1
581        let beta1_result = {
582            let mut b1 = beta1.lock().unwrap();
583            b1.process_left_event(moisture_event);
584            b1.process_right_event(temp_event)
585        };
586
587        assert_eq!(beta1_result.len(), 1); // moisture + temp matched
588        assert_eq!(beta1_result[0].events.len(), 2); // Two events
589
590        // Step 2: Pass beta1 result to beta2, then add weather
591        let beta2_left_result = beta2.process_left_result(beta1_result[0].clone());
592        assert_eq!(beta2_left_result.len(), 0); // No match yet (no weather)
593
594        let beta2_final_result = beta2.process_right_event(weather_event);
595        assert_eq!(beta2_final_result.len(), 1); // All 3 streams matched!
596
597        // Verify final result contains all 3 events
598        let final_joined = &beta2_final_result[0];
599        assert_eq!(final_joined.events.len(), 3); // moisture + temp + weather
600        assert_eq!(final_joined.events[0].event_type, "MoistureSensor");
601        assert_eq!(final_joined.events[1].event_type, "TemperatureSensor");
602        assert_eq!(final_joined.events[2].event_type, "WeatherEvent");
603
604        // All should have same zone_id
605        for event in &final_joined.events {
606            assert_eq!(
607                event.data.get("zone_id").unwrap(),
608                &Value::String("zone_1".to_string())
609            );
610        }
611
612        println!("✅ 3-Stream Join Success!");
613        println!(
614            "   Events: {} + {} + {}",
615            final_joined.events[0].event_type,
616            final_joined.events[1].event_type,
617            final_joined.events[2].event_type
618        );
619    }
620}