1use crate::rete::stream_alpha_node::StreamAlphaNode;
18use crate::streaming::event::StreamEvent;
19use crate::types::Value;
20use std::sync::{Arc, Mutex};
21use std::time::{Duration, SystemTime};
22
23#[derive(Debug, Clone)]
26pub enum BetaInput {
27 Alpha(Arc<Mutex<StreamAlphaNode>>),
29 Beta(Arc<Mutex<StreamBetaNode>>),
31}
32
33#[derive(Debug, Clone)]
35pub struct MultiStreamJoinResult {
36 pub events: Vec<StreamEvent>,
38 pub join_timestamp: SystemTime,
40}
41
42impl MultiStreamJoinResult {
43 pub fn from_two_events(left: StreamEvent, right: StreamEvent, timestamp: SystemTime) -> Self {
45 Self {
46 events: vec![left, right],
47 join_timestamp: timestamp,
48 }
49 }
50
51 pub fn from_result_and_event(
53 result: MultiStreamJoinResult,
54 event: StreamEvent,
55 timestamp: SystemTime,
56 ) -> Self {
57 let mut events = result.events;
58 events.push(event);
59 Self {
60 events,
61 join_timestamp: timestamp,
62 }
63 }
64
65 pub fn get_event(&self, index: usize) -> Option<&StreamEvent> {
67 self.events.get(index)
68 }
69
70 pub fn left_event(&self) -> &StreamEvent {
72 &self.events[0]
73 }
74
75 pub fn right_event(&self) -> &StreamEvent {
77 &self.events[1]
78 }
79}
80
81#[derive(Debug, Clone)]
83pub struct JoinCondition {
84 pub left_field: String,
86 pub right_field: String,
88 pub operator: JoinOperator,
90}
91
92#[derive(Debug, Clone, PartialEq)]
93pub enum JoinOperator {
94 Equal,
95 }
97
98#[derive(Debug, Clone)]
100pub enum JoinStrategy {
101 TimeWindow { duration: Duration },
103 ExactTimestamp,
105}
106
107#[derive(Debug)]
110pub struct StreamBetaNode {
111 pub name: String,
113 pub left_input: BetaInput,
115 pub right_input: BetaInput,
117 pub join_conditions: Vec<JoinCondition>,
119 pub strategy: JoinStrategy,
121 left_buffer: Vec<(SystemTime, MultiStreamJoinResult)>,
123 right_buffer: Vec<(SystemTime, MultiStreamJoinResult)>,
125}
126
127#[derive(Debug, Clone)]
129pub struct JoinedStreamEvent {
130 pub left_event: StreamEvent,
131 pub right_event: StreamEvent,
132 pub join_timestamp: SystemTime,
133}
134
135impl StreamBetaNode {
136 pub fn new(
138 name: String,
139 left_input: BetaInput,
140 right_input: BetaInput,
141 join_conditions: Vec<JoinCondition>,
142 strategy: JoinStrategy,
143 ) -> Self {
144 Self {
145 name,
146 left_input,
147 right_input,
148 join_conditions,
149 strategy,
150 left_buffer: Vec::new(),
151 right_buffer: Vec::new(),
152 }
153 }
154
155 pub fn from_alpha_nodes(
157 name: String,
158 left_alpha: Arc<Mutex<StreamAlphaNode>>,
159 right_alpha: Arc<Mutex<StreamAlphaNode>>,
160 join_conditions: Vec<JoinCondition>,
161 strategy: JoinStrategy,
162 ) -> Self {
163 Self::new(
164 name,
165 BetaInput::Alpha(left_alpha),
166 BetaInput::Alpha(right_alpha),
167 join_conditions,
168 strategy,
169 )
170 }
171
172 pub fn from_beta_and_alpha(
174 name: String,
175 left_beta: Arc<Mutex<StreamBetaNode>>,
176 right_alpha: Arc<Mutex<StreamAlphaNode>>,
177 join_conditions: Vec<JoinCondition>,
178 strategy: JoinStrategy,
179 ) -> Self {
180 Self::new(
181 name,
182 BetaInput::Beta(left_beta),
183 BetaInput::Alpha(right_alpha),
184 join_conditions,
185 strategy,
186 )
187 }
188
189 pub fn process_left_event(&mut self, event: StreamEvent) -> Vec<MultiStreamJoinResult> {
191 let now = SystemTime::now();
192 let wrapped = MultiStreamJoinResult {
193 events: vec![event],
194 join_timestamp: now,
195 };
196 self.process_left_result(wrapped)
197 }
198
199 pub fn process_right_event(&mut self, event: StreamEvent) -> Vec<MultiStreamJoinResult> {
201 let now = SystemTime::now();
202 let wrapped = MultiStreamJoinResult {
203 events: vec![event],
204 join_timestamp: now,
205 };
206 self.process_right_result(wrapped)
207 }
208
209 pub fn process_left_result(
211 &mut self,
212 result: MultiStreamJoinResult,
213 ) -> Vec<MultiStreamJoinResult> {
214 let now = SystemTime::now();
215
216 self.left_buffer.push((now, result.clone()));
218
219 self.cleanup_buffers(now);
221
222 self.find_matches(&result, &self.right_buffer, true)
224 }
225
226 pub fn process_right_result(
228 &mut self,
229 result: MultiStreamJoinResult,
230 ) -> Vec<MultiStreamJoinResult> {
231 let now = SystemTime::now();
232
233 self.right_buffer.push((now, result.clone()));
235
236 self.cleanup_buffers(now);
238
239 self.find_matches(&result, &self.left_buffer, false)
241 }
242
243 fn find_matches(
245 &self,
246 new_result: &MultiStreamJoinResult,
247 other_buffer: &[(SystemTime, MultiStreamJoinResult)],
248 is_left: bool,
249 ) -> Vec<MultiStreamJoinResult> {
250 let mut matches = Vec::new();
251
252 for (timestamp, buffered_result) in other_buffer {
253 if self.check_join_conditions_multi(new_result, buffered_result, is_left) {
255 let joined = if is_left {
257 self.combine_results(new_result.clone(), buffered_result.clone(), *timestamp)
259 } else {
260 self.combine_results(buffered_result.clone(), new_result.clone(), *timestamp)
262 };
263 matches.push(joined);
264 }
265 }
266
267 matches
268 }
269
270 fn combine_results(
272 &self,
273 left: MultiStreamJoinResult,
274 right: MultiStreamJoinResult,
275 timestamp: SystemTime,
276 ) -> MultiStreamJoinResult {
277 let mut all_events = left.events;
278 all_events.extend(right.events);
279 MultiStreamJoinResult {
280 events: all_events,
281 join_timestamp: timestamp,
282 }
283 }
284
285 fn check_join_conditions_multi(
288 &self,
289 left_result: &MultiStreamJoinResult,
290 right_result: &MultiStreamJoinResult,
291 is_left: bool,
292 ) -> bool {
293 let left_event = left_result.events.last().unwrap();
297 let right_event = right_result.events.first().unwrap();
298
299 for condition in &self.join_conditions {
300 let (left_field, right_field) = if is_left {
301 (&condition.left_field, &condition.right_field)
302 } else {
303 (&condition.right_field, &condition.left_field)
304 };
305
306 let left_value = Self::extract_field_value(left_event, left_field);
307 let right_value = Self::extract_field_value(right_event, right_field);
308
309 match condition.operator {
310 JoinOperator::Equal => {
311 if left_value != right_value {
312 return false;
313 }
314 }
315 }
316 }
317
318 true
319 }
320
321 fn extract_field_value(event: &StreamEvent, field: &str) -> Option<String> {
323 event.data.get(field).and_then(|v| match v {
324 Value::String(s) => Some(s.clone()),
325 Value::Integer(i) => Some(i.to_string()),
326 Value::Number(n) => Some(n.to_string()),
327 _ => None,
328 })
329 }
330
331 fn cleanup_buffers(&mut self, now: SystemTime) {
333 match &self.strategy {
334 JoinStrategy::TimeWindow { duration } => {
335 let cutoff = now.checked_sub(*duration).unwrap_or(SystemTime::UNIX_EPOCH);
336
337 self.left_buffer.retain(|(ts, _)| *ts >= cutoff);
338 self.right_buffer.retain(|(ts, _)| *ts >= cutoff);
339 }
340 JoinStrategy::ExactTimestamp => {
341 const MAX_BUFFER_SIZE: usize = 100;
344 if self.left_buffer.len() > MAX_BUFFER_SIZE {
345 self.left_buffer
346 .drain(0..self.left_buffer.len() - MAX_BUFFER_SIZE);
347 }
348 if self.right_buffer.len() > MAX_BUFFER_SIZE {
349 self.right_buffer
350 .drain(0..self.right_buffer.len() - MAX_BUFFER_SIZE);
351 }
352 }
353 }
354 }
355
356 pub fn get_stats(&self) -> BetaNodeStats {
358 BetaNodeStats {
359 left_buffer_size: self.left_buffer.len(),
360 right_buffer_size: self.right_buffer.len(),
361 }
362 }
363}
364
365#[derive(Debug, Clone)]
366pub struct BetaNodeStats {
367 pub left_buffer_size: usize,
368 pub right_buffer_size: usize,
369}
370
371#[cfg(test)]
372mod tests {
373 use super::*;
374 use crate::rete::stream_alpha_node::WindowSpec;
375 use crate::streaming::window::WindowType;
376 use std::collections::HashMap;
377
378 #[test]
379 fn test_stream_beta_node_join() {
380 let left_alpha = Arc::new(Mutex::new(StreamAlphaNode::new(
382 "moisture-sensors",
383 Some("MoistureSensor".to_string()),
384 Some(WindowSpec {
385 duration: Duration::from_secs(300),
386 window_type: WindowType::Sliding,
387 }),
388 )));
389
390 let right_alpha = Arc::new(Mutex::new(StreamAlphaNode::new(
391 "temperature-sensors",
392 Some("TemperatureSensor".to_string()),
393 Some(WindowSpec {
394 duration: Duration::from_secs(300),
395 window_type: WindowType::Sliding,
396 }),
397 )));
398
399 let mut beta = StreamBetaNode::from_alpha_nodes(
401 "irrigation_join".to_string(),
402 left_alpha,
403 right_alpha,
404 vec![JoinCondition {
405 left_field: "zone_id".to_string(),
406 right_field: "zone_id".to_string(),
407 operator: JoinOperator::Equal,
408 }],
409 JoinStrategy::TimeWindow {
410 duration: Duration::from_secs(300),
411 },
412 );
413
414 let mut moisture_data = HashMap::new();
416 moisture_data.insert("zone_id".to_string(), Value::String("zone_1".to_string()));
417 moisture_data.insert("moisture_level".to_string(), Value::Number(25.5));
418
419 use crate::streaming::event::EventMetadata;
420
421 let moisture_event = StreamEvent {
422 id: "m1".to_string(),
423 event_type: "MoistureSensor".to_string(),
424 data: moisture_data,
425 metadata: EventMetadata {
426 timestamp: 1000,
427 source: "sensor-1".to_string(),
428 sequence: 1,
429 tags: HashMap::new(),
430 },
431 };
432
433 let mut temp_data = HashMap::new();
434 temp_data.insert("zone_id".to_string(), Value::String("zone_1".to_string()));
435 temp_data.insert("temperature".to_string(), Value::Number(35.0));
436
437 let temp_event = StreamEvent {
438 id: "t1".to_string(),
439 event_type: "TemperatureSensor".to_string(),
440 data: temp_data,
441 metadata: EventMetadata {
442 timestamp: 1100,
443 source: "sensor-2".to_string(),
444 sequence: 2,
445 tags: HashMap::new(),
446 },
447 };
448
449 let left_matches = beta.process_left_event(moisture_event);
451 assert_eq!(left_matches.len(), 0); let right_matches = beta.process_right_event(temp_event);
454 assert_eq!(right_matches.len(), 1); let joined = &right_matches[0];
458 assert_eq!(joined.events.len(), 2); assert_eq!(
460 joined.events[0].data.get("zone_id").unwrap(),
461 &Value::String("zone_1".to_string())
462 );
463 assert_eq!(
464 joined.events[1].data.get("zone_id").unwrap(),
465 &Value::String("zone_1".to_string())
466 );
467 }
468
469 #[test]
470 fn test_nested_beta_three_stream_join() {
471 use crate::streaming::event::EventMetadata;
472
473 let moisture_alpha = Arc::new(Mutex::new(StreamAlphaNode::new(
475 "moisture-sensors",
476 Some("MoistureSensor".to_string()),
477 Some(WindowSpec {
478 duration: Duration::from_secs(300),
479 window_type: WindowType::Sliding,
480 }),
481 )));
482
483 let temp_alpha = Arc::new(Mutex::new(StreamAlphaNode::new(
484 "temperature-sensors",
485 Some("TemperatureSensor".to_string()),
486 Some(WindowSpec {
487 duration: Duration::from_secs(300),
488 window_type: WindowType::Sliding,
489 }),
490 )));
491
492 let weather_alpha = Arc::new(Mutex::new(StreamAlphaNode::new(
493 "weather-events",
494 Some("WeatherEvent".to_string()),
495 Some(WindowSpec {
496 duration: Duration::from_secs(300),
497 window_type: WindowType::Sliding,
498 }),
499 )));
500
501 let beta1 = Arc::new(Mutex::new(StreamBetaNode::from_alpha_nodes(
503 "moisture_temp_join".to_string(),
504 moisture_alpha,
505 temp_alpha,
506 vec![JoinCondition {
507 left_field: "zone_id".to_string(),
508 right_field: "zone_id".to_string(),
509 operator: JoinOperator::Equal,
510 }],
511 JoinStrategy::TimeWindow {
512 duration: Duration::from_secs(300),
513 },
514 )));
515
516 let mut beta2 = StreamBetaNode::from_beta_and_alpha(
518 "full_join".to_string(),
519 beta1.clone(),
520 weather_alpha,
521 vec![JoinCondition {
522 left_field: "zone_id".to_string(), right_field: "zone_id".to_string(), operator: JoinOperator::Equal,
525 }],
526 JoinStrategy::TimeWindow {
527 duration: Duration::from_secs(300),
528 },
529 );
530
531 let mut moisture_data = HashMap::new();
533 moisture_data.insert("zone_id".to_string(), Value::String("zone_1".to_string()));
534 moisture_data.insert("moisture_level".to_string(), Value::Number(20.0));
535
536 let moisture_event = StreamEvent {
537 id: "m1".to_string(),
538 event_type: "MoistureSensor".to_string(),
539 data: moisture_data,
540 metadata: EventMetadata {
541 timestamp: 1000,
542 source: "sensor-1".to_string(),
543 sequence: 1,
544 tags: HashMap::new(),
545 },
546 };
547
548 let mut temp_data = HashMap::new();
549 temp_data.insert("zone_id".to_string(), Value::String("zone_1".to_string()));
550 temp_data.insert("temperature".to_string(), Value::Number(35.0));
551
552 let temp_event = StreamEvent {
553 id: "t1".to_string(),
554 event_type: "TemperatureSensor".to_string(),
555 data: temp_data,
556 metadata: EventMetadata {
557 timestamp: 1100,
558 source: "sensor-2".to_string(),
559 sequence: 2,
560 tags: HashMap::new(),
561 },
562 };
563
564 let mut weather_data = HashMap::new();
565 weather_data.insert("zone_id".to_string(), Value::String("zone_1".to_string()));
566 weather_data.insert("condition".to_string(), Value::String("sunny".to_string()));
567
568 let weather_event = StreamEvent {
569 id: "w1".to_string(),
570 event_type: "WeatherEvent".to_string(),
571 data: weather_data,
572 metadata: EventMetadata {
573 timestamp: 1200,
574 source: "weather-1".to_string(),
575 sequence: 3,
576 tags: HashMap::new(),
577 },
578 };
579
580 let beta1_result = {
582 let mut b1 = beta1.lock().unwrap();
583 b1.process_left_event(moisture_event);
584 b1.process_right_event(temp_event)
585 };
586
587 assert_eq!(beta1_result.len(), 1); assert_eq!(beta1_result[0].events.len(), 2); let beta2_left_result = beta2.process_left_result(beta1_result[0].clone());
592 assert_eq!(beta2_left_result.len(), 0); let beta2_final_result = beta2.process_right_event(weather_event);
595 assert_eq!(beta2_final_result.len(), 1); let final_joined = &beta2_final_result[0];
599 assert_eq!(final_joined.events.len(), 3); assert_eq!(final_joined.events[0].event_type, "MoistureSensor");
601 assert_eq!(final_joined.events[1].event_type, "TemperatureSensor");
602 assert_eq!(final_joined.events[2].event_type, "WeatherEvent");
603
604 for event in &final_joined.events {
606 assert_eq!(
607 event.data.get("zone_id").unwrap(),
608 &Value::String("zone_1".to_string())
609 );
610 }
611
612 println!("✅ 3-Stream Join Success!");
613 println!(
614 " Events: {} + {} + {}",
615 final_joined.events[0].event_type,
616 final_joined.events[1].event_type,
617 final_joined.events[2].event_type
618 );
619 }
620}