use crate::rete::stream_alpha_node::StreamAlphaNode;
use crate::streaming::event::StreamEvent;
use crate::types::Value;
use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime};
#[derive(Debug, Clone)]
pub enum BetaInput {
Alpha(Arc<Mutex<StreamAlphaNode>>),
Beta(Arc<Mutex<StreamBetaNode>>),
}
#[derive(Debug, Clone)]
pub struct MultiStreamJoinResult {
pub events: Vec<StreamEvent>,
pub join_timestamp: SystemTime,
}
impl MultiStreamJoinResult {
pub fn from_two_events(left: StreamEvent, right: StreamEvent, timestamp: SystemTime) -> Self {
Self {
events: vec![left, right],
join_timestamp: timestamp,
}
}
pub fn from_result_and_event(
result: MultiStreamJoinResult,
event: StreamEvent,
timestamp: SystemTime,
) -> Self {
let mut events = result.events;
events.push(event);
Self {
events,
join_timestamp: timestamp,
}
}
pub fn get_event(&self, index: usize) -> Option<&StreamEvent> {
self.events.get(index)
}
pub fn left_event(&self) -> &StreamEvent {
&self.events[0]
}
pub fn right_event(&self) -> &StreamEvent {
&self.events[1]
}
}
#[derive(Debug, Clone)]
pub struct JoinCondition {
pub left_field: String,
pub right_field: String,
pub operator: JoinOperator,
}
#[derive(Debug, Clone, PartialEq)]
pub enum JoinOperator {
Equal,
}
#[derive(Debug, Clone)]
pub enum JoinStrategy {
TimeWindow { duration: Duration },
ExactTimestamp,
}
#[derive(Debug)]
pub struct StreamBetaNode {
pub name: String,
pub left_input: BetaInput,
pub right_input: BetaInput,
pub join_conditions: Vec<JoinCondition>,
pub strategy: JoinStrategy,
left_buffer: Vec<(SystemTime, MultiStreamJoinResult)>,
right_buffer: Vec<(SystemTime, MultiStreamJoinResult)>,
}
#[derive(Debug, Clone)]
pub struct JoinedStreamEvent {
pub left_event: StreamEvent,
pub right_event: StreamEvent,
pub join_timestamp: SystemTime,
}
impl StreamBetaNode {
pub fn new(
name: String,
left_input: BetaInput,
right_input: BetaInput,
join_conditions: Vec<JoinCondition>,
strategy: JoinStrategy,
) -> Self {
Self {
name,
left_input,
right_input,
join_conditions,
strategy,
left_buffer: Vec::new(),
right_buffer: Vec::new(),
}
}
pub fn from_alpha_nodes(
name: String,
left_alpha: Arc<Mutex<StreamAlphaNode>>,
right_alpha: Arc<Mutex<StreamAlphaNode>>,
join_conditions: Vec<JoinCondition>,
strategy: JoinStrategy,
) -> Self {
Self::new(
name,
BetaInput::Alpha(left_alpha),
BetaInput::Alpha(right_alpha),
join_conditions,
strategy,
)
}
pub fn from_beta_and_alpha(
name: String,
left_beta: Arc<Mutex<StreamBetaNode>>,
right_alpha: Arc<Mutex<StreamAlphaNode>>,
join_conditions: Vec<JoinCondition>,
strategy: JoinStrategy,
) -> Self {
Self::new(
name,
BetaInput::Beta(left_beta),
BetaInput::Alpha(right_alpha),
join_conditions,
strategy,
)
}
pub fn process_left_event(&mut self, event: StreamEvent) -> Vec<MultiStreamJoinResult> {
let now = SystemTime::now();
let wrapped = MultiStreamJoinResult {
events: vec![event],
join_timestamp: now,
};
self.process_left_result(wrapped)
}
pub fn process_right_event(&mut self, event: StreamEvent) -> Vec<MultiStreamJoinResult> {
let now = SystemTime::now();
let wrapped = MultiStreamJoinResult {
events: vec![event],
join_timestamp: now,
};
self.process_right_result(wrapped)
}
pub fn process_left_result(
&mut self,
result: MultiStreamJoinResult,
) -> Vec<MultiStreamJoinResult> {
let now = SystemTime::now();
self.left_buffer.push((now, result.clone()));
self.cleanup_buffers(now);
self.find_matches(&result, &self.right_buffer, true)
}
pub fn process_right_result(
&mut self,
result: MultiStreamJoinResult,
) -> Vec<MultiStreamJoinResult> {
let now = SystemTime::now();
self.right_buffer.push((now, result.clone()));
self.cleanup_buffers(now);
self.find_matches(&result, &self.left_buffer, false)
}
fn find_matches(
&self,
new_result: &MultiStreamJoinResult,
other_buffer: &[(SystemTime, MultiStreamJoinResult)],
is_left: bool,
) -> Vec<MultiStreamJoinResult> {
let mut matches = Vec::new();
for (timestamp, buffered_result) in other_buffer {
if self.check_join_conditions_multi(new_result, buffered_result, is_left) {
let joined = if is_left {
self.combine_results(new_result.clone(), buffered_result.clone(), *timestamp)
} else {
self.combine_results(buffered_result.clone(), new_result.clone(), *timestamp)
};
matches.push(joined);
}
}
matches
}
fn combine_results(
&self,
left: MultiStreamJoinResult,
right: MultiStreamJoinResult,
timestamp: SystemTime,
) -> MultiStreamJoinResult {
let mut all_events = left.events;
all_events.extend(right.events);
MultiStreamJoinResult {
events: all_events,
join_timestamp: timestamp,
}
}
fn check_join_conditions_multi(
&self,
left_result: &MultiStreamJoinResult,
right_result: &MultiStreamJoinResult,
is_left: bool,
) -> bool {
let left_event = left_result.events.last().unwrap();
let right_event = right_result.events.first().unwrap();
for condition in &self.join_conditions {
let (left_field, right_field) = if is_left {
(&condition.left_field, &condition.right_field)
} else {
(&condition.right_field, &condition.left_field)
};
let left_value = Self::extract_field_value(left_event, left_field);
let right_value = Self::extract_field_value(right_event, right_field);
match condition.operator {
JoinOperator::Equal => {
if left_value != right_value {
return false;
}
}
}
}
true
}
fn extract_field_value(event: &StreamEvent, field: &str) -> Option<String> {
event.data.get(field).and_then(|v| match v {
Value::String(s) => Some(s.clone()),
Value::Integer(i) => Some(i.to_string()),
Value::Number(n) => Some(n.to_string()),
_ => None,
})
}
fn cleanup_buffers(&mut self, now: SystemTime) {
match &self.strategy {
JoinStrategy::TimeWindow { duration } => {
let cutoff = now.checked_sub(*duration).unwrap_or(SystemTime::UNIX_EPOCH);
self.left_buffer.retain(|(ts, _)| *ts >= cutoff);
self.right_buffer.retain(|(ts, _)| *ts >= cutoff);
}
JoinStrategy::ExactTimestamp => {
const MAX_BUFFER_SIZE: usize = 100;
if self.left_buffer.len() > MAX_BUFFER_SIZE {
self.left_buffer
.drain(0..self.left_buffer.len() - MAX_BUFFER_SIZE);
}
if self.right_buffer.len() > MAX_BUFFER_SIZE {
self.right_buffer
.drain(0..self.right_buffer.len() - MAX_BUFFER_SIZE);
}
}
}
}
pub fn get_stats(&self) -> BetaNodeStats {
BetaNodeStats {
left_buffer_size: self.left_buffer.len(),
right_buffer_size: self.right_buffer.len(),
}
}
}
#[derive(Debug, Clone)]
pub struct BetaNodeStats {
pub left_buffer_size: usize,
pub right_buffer_size: usize,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::rete::stream_alpha_node::WindowSpec;
use crate::streaming::window::WindowType;
use std::collections::HashMap;
#[test]
fn test_stream_beta_node_join() {
let left_alpha = Arc::new(Mutex::new(StreamAlphaNode::new(
"moisture-sensors",
Some("MoistureSensor".to_string()),
Some(WindowSpec {
duration: Duration::from_secs(300),
window_type: WindowType::Sliding,
}),
)));
let right_alpha = Arc::new(Mutex::new(StreamAlphaNode::new(
"temperature-sensors",
Some("TemperatureSensor".to_string()),
Some(WindowSpec {
duration: Duration::from_secs(300),
window_type: WindowType::Sliding,
}),
)));
let mut beta = StreamBetaNode::from_alpha_nodes(
"irrigation_join".to_string(),
left_alpha,
right_alpha,
vec![JoinCondition {
left_field: "zone_id".to_string(),
right_field: "zone_id".to_string(),
operator: JoinOperator::Equal,
}],
JoinStrategy::TimeWindow {
duration: Duration::from_secs(300),
},
);
let mut moisture_data = HashMap::new();
moisture_data.insert("zone_id".to_string(), Value::String("zone_1".to_string()));
moisture_data.insert("moisture_level".to_string(), Value::Number(25.5));
use crate::streaming::event::EventMetadata;
let moisture_event = StreamEvent {
id: "m1".to_string(),
event_type: "MoistureSensor".to_string(),
data: moisture_data,
metadata: EventMetadata {
timestamp: 1000,
source: "sensor-1".to_string(),
sequence: 1,
tags: HashMap::new(),
},
};
let mut temp_data = HashMap::new();
temp_data.insert("zone_id".to_string(), Value::String("zone_1".to_string()));
temp_data.insert("temperature".to_string(), Value::Number(35.0));
let temp_event = StreamEvent {
id: "t1".to_string(),
event_type: "TemperatureSensor".to_string(),
data: temp_data,
metadata: EventMetadata {
timestamp: 1100,
source: "sensor-2".to_string(),
sequence: 2,
tags: HashMap::new(),
},
};
let left_matches = beta.process_left_event(moisture_event);
assert_eq!(left_matches.len(), 0);
let right_matches = beta.process_right_event(temp_event);
assert_eq!(right_matches.len(), 1);
let joined = &right_matches[0];
assert_eq!(joined.events.len(), 2); assert_eq!(
joined.events[0].data.get("zone_id").unwrap(),
&Value::String("zone_1".to_string())
);
assert_eq!(
joined.events[1].data.get("zone_id").unwrap(),
&Value::String("zone_1".to_string())
);
}
#[test]
fn test_nested_beta_three_stream_join() {
use crate::streaming::event::EventMetadata;
let moisture_alpha = Arc::new(Mutex::new(StreamAlphaNode::new(
"moisture-sensors",
Some("MoistureSensor".to_string()),
Some(WindowSpec {
duration: Duration::from_secs(300),
window_type: WindowType::Sliding,
}),
)));
let temp_alpha = Arc::new(Mutex::new(StreamAlphaNode::new(
"temperature-sensors",
Some("TemperatureSensor".to_string()),
Some(WindowSpec {
duration: Duration::from_secs(300),
window_type: WindowType::Sliding,
}),
)));
let weather_alpha = Arc::new(Mutex::new(StreamAlphaNode::new(
"weather-events",
Some("WeatherEvent".to_string()),
Some(WindowSpec {
duration: Duration::from_secs(300),
window_type: WindowType::Sliding,
}),
)));
let beta1 = Arc::new(Mutex::new(StreamBetaNode::from_alpha_nodes(
"moisture_temp_join".to_string(),
moisture_alpha,
temp_alpha,
vec![JoinCondition {
left_field: "zone_id".to_string(),
right_field: "zone_id".to_string(),
operator: JoinOperator::Equal,
}],
JoinStrategy::TimeWindow {
duration: Duration::from_secs(300),
},
)));
let mut beta2 = StreamBetaNode::from_beta_and_alpha(
"full_join".to_string(),
beta1.clone(),
weather_alpha,
vec![JoinCondition {
left_field: "zone_id".to_string(), right_field: "zone_id".to_string(), operator: JoinOperator::Equal,
}],
JoinStrategy::TimeWindow {
duration: Duration::from_secs(300),
},
);
let mut moisture_data = HashMap::new();
moisture_data.insert("zone_id".to_string(), Value::String("zone_1".to_string()));
moisture_data.insert("moisture_level".to_string(), Value::Number(20.0));
let moisture_event = StreamEvent {
id: "m1".to_string(),
event_type: "MoistureSensor".to_string(),
data: moisture_data,
metadata: EventMetadata {
timestamp: 1000,
source: "sensor-1".to_string(),
sequence: 1,
tags: HashMap::new(),
},
};
let mut temp_data = HashMap::new();
temp_data.insert("zone_id".to_string(), Value::String("zone_1".to_string()));
temp_data.insert("temperature".to_string(), Value::Number(35.0));
let temp_event = StreamEvent {
id: "t1".to_string(),
event_type: "TemperatureSensor".to_string(),
data: temp_data,
metadata: EventMetadata {
timestamp: 1100,
source: "sensor-2".to_string(),
sequence: 2,
tags: HashMap::new(),
},
};
let mut weather_data = HashMap::new();
weather_data.insert("zone_id".to_string(), Value::String("zone_1".to_string()));
weather_data.insert("condition".to_string(), Value::String("sunny".to_string()));
let weather_event = StreamEvent {
id: "w1".to_string(),
event_type: "WeatherEvent".to_string(),
data: weather_data,
metadata: EventMetadata {
timestamp: 1200,
source: "weather-1".to_string(),
sequence: 3,
tags: HashMap::new(),
},
};
let beta1_result = {
let mut b1 = beta1.lock().unwrap();
b1.process_left_event(moisture_event);
b1.process_right_event(temp_event)
};
assert_eq!(beta1_result.len(), 1); assert_eq!(beta1_result[0].events.len(), 2);
let beta2_left_result = beta2.process_left_result(beta1_result[0].clone());
assert_eq!(beta2_left_result.len(), 0);
let beta2_final_result = beta2.process_right_event(weather_event);
assert_eq!(beta2_final_result.len(), 1);
let final_joined = &beta2_final_result[0];
assert_eq!(final_joined.events.len(), 3); assert_eq!(final_joined.events[0].event_type, "MoistureSensor");
assert_eq!(final_joined.events[1].event_type, "TemperatureSensor");
assert_eq!(final_joined.events[2].event_type, "WeatherEvent");
for event in &final_joined.events {
assert_eq!(
event.data.get("zone_id").unwrap(),
&Value::String("zone_1".to_string())
);
}
println!("✅ 3-Stream Join Success!");
println!(
" Events: {} + {} + {}",
final_joined.events[0].event_type,
final_joined.events[1].event_type,
final_joined.events[2].event_type
);
}
}