#![allow(missing_docs)]
use crate::streaming::event::StreamEvent;
use crate::streaming::window::WindowType;
use std::collections::VecDeque;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
#[derive(Debug, Clone)]
pub struct StreamAlphaNode {
pub stream_name: String,
pub event_type: Option<String>,
pub window: Option<WindowSpec>,
events: VecDeque<StreamEvent>,
max_events: usize,
last_window_start: u64,
last_session_event_timestamp: Option<u64>,
}
#[derive(Debug, Clone, PartialEq)]
pub struct WindowSpec {
pub duration: Duration,
pub window_type: WindowType,
}
impl StreamAlphaNode {
pub fn new(
stream_name: impl Into<String>,
event_type: Option<String>,
window: Option<WindowSpec>,
) -> Self {
Self {
stream_name: stream_name.into(),
event_type,
window,
events: VecDeque::new(),
max_events: 10_000, last_window_start: 0,
last_session_event_timestamp: None,
}
}
pub fn with_max_events(mut self, max_events: usize) -> Self {
self.max_events = max_events;
self
}
pub fn process_event(&mut self, event: &StreamEvent) -> bool {
if event.metadata.source != self.stream_name {
return false;
}
if let Some(ref expected_type) = self.event_type {
if &event.event_type != expected_type {
return false;
}
}
let matches = if self.window.is_none() {
true
} else {
self.is_in_window(event.metadata.timestamp)
};
if matches {
if let Some(WindowSpec {
window_type: WindowType::Session { timeout },
..
}) = &self.window
{
if let Some(last_time) = self.last_session_event_timestamp {
let gap = event.metadata.timestamp.saturating_sub(last_time);
let timeout_ms = timeout.as_millis() as u64;
if gap > timeout_ms {
self.events.clear();
self.last_session_event_timestamp = None;
}
}
}
self.add_event(event.clone());
self.evict_expired_events();
}
matches
}
fn add_event(&mut self, event: StreamEvent) {
let event_timestamp = event.metadata.timestamp;
self.events.push_back(event);
if let Some(WindowSpec {
window_type: WindowType::Session { .. },
..
}) = &self.window
{
self.last_session_event_timestamp = Some(event_timestamp);
}
while self.events.len() > self.max_events {
self.events.pop_front();
}
}
fn is_in_window(&self, timestamp: u64) -> bool {
match &self.window {
None => true,
Some(spec) => {
let current_time = Self::current_time_ms();
let window_duration_ms = spec.duration.as_millis() as u64;
match spec.window_type {
WindowType::Sliding => {
timestamp >= current_time.saturating_sub(window_duration_ms)
&& timestamp <= current_time
}
WindowType::Tumbling => {
let window_start = (current_time / window_duration_ms) * window_duration_ms;
let window_end = window_start + window_duration_ms;
timestamp >= window_start && timestamp < window_end
}
WindowType::Session { timeout } => {
let timeout_ms = timeout.as_millis() as u64;
match self.last_session_event_timestamp {
None => {
true
}
Some(last_event_time) => {
let gap = timestamp.saturating_sub(last_event_time);
if gap > timeout_ms {
true
} else {
true
}
}
}
}
}
}
}
}
fn evict_expired_events(&mut self) {
if let Some(spec) = &self.window {
let current_time = Self::current_time_ms();
let window_duration_ms = spec.duration.as_millis() as u64;
match spec.window_type {
WindowType::Sliding => {
let cutoff_time = current_time.saturating_sub(window_duration_ms);
while let Some(event) = self.events.front() {
if event.metadata.timestamp < cutoff_time {
self.events.pop_front();
} else {
break;
}
}
}
WindowType::Tumbling => {
let window_start = (current_time / window_duration_ms) * window_duration_ms;
if self.last_window_start != 0 && window_start != self.last_window_start {
self.events.clear();
self.last_window_start = window_start;
} else if self.last_window_start == 0 {
self.last_window_start = window_start;
}
while let Some(event) = self.events.front() {
if event.metadata.timestamp < window_start {
self.events.pop_front();
} else {
break;
}
}
}
WindowType::Session { timeout } => {
let timeout_ms = timeout.as_millis() as u64;
if let Some(last_event_time) = self.last_session_event_timestamp {
let gap_since_last = current_time.saturating_sub(last_event_time);
if gap_since_last > timeout_ms {
self.events.clear();
self.last_session_event_timestamp = None;
}
}
}
}
}
}
pub fn get_events(&self) -> &VecDeque<StreamEvent> {
&self.events
}
pub fn event_count(&self) -> usize {
self.events.len()
}
fn current_time_ms() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64
}
pub fn clear(&mut self) {
self.events.clear();
self.last_window_start = 0;
self.last_session_event_timestamp = None;
}
pub fn window_stats(&self) -> WindowStats {
WindowStats {
event_count: self.events.len(),
oldest_event_timestamp: self.events.front().map(|e| e.metadata.timestamp),
newest_event_timestamp: self.events.back().map(|e| e.metadata.timestamp),
window_duration_ms: self.window.as_ref().map(|w| w.duration.as_millis() as u64),
}
}
}
#[derive(Debug, Clone)]
pub struct WindowStats {
pub event_count: usize,
pub oldest_event_timestamp: Option<u64>,
pub newest_event_timestamp: Option<u64>,
pub window_duration_ms: Option<u64>,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::streaming::event::StreamEvent;
use crate::types::Value;
use std::collections::HashMap;
fn create_test_event(stream_name: &str, event_type: &str, timestamp: u64) -> StreamEvent {
let mut data = HashMap::new();
data.insert(
"test_field".to_string(),
Value::String("test_value".to_string()),
);
StreamEvent::with_timestamp(event_type, data, stream_name, timestamp)
}
#[test]
fn test_stream_alpha_node_basic() {
let mut node = StreamAlphaNode::new("user-events", None, None);
let event = create_test_event("user-events", "LoginEvent", 1000);
assert!(node.process_event(&event));
assert_eq!(node.event_count(), 1);
}
#[test]
fn test_stream_name_filtering() {
let mut node = StreamAlphaNode::new("user-events", None, None);
let matching_event = create_test_event("user-events", "LoginEvent", 1000);
let non_matching_event = create_test_event("other-stream", "LoginEvent", 1000);
assert!(node.process_event(&matching_event));
assert!(!node.process_event(&non_matching_event));
assert_eq!(node.event_count(), 1);
}
#[test]
fn test_event_type_filtering() {
let mut node = StreamAlphaNode::new("user-events", Some("LoginEvent".to_string()), None);
let matching_event = create_test_event("user-events", "LoginEvent", 1000);
let non_matching_event = create_test_event("user-events", "LogoutEvent", 1000);
assert!(node.process_event(&matching_event));
assert!(!node.process_event(&non_matching_event));
assert_eq!(node.event_count(), 1);
}
#[test]
fn test_sliding_window() {
let window = WindowSpec {
duration: Duration::from_secs(5),
window_type: WindowType::Sliding,
};
let mut node = StreamAlphaNode::new("sensors", None, Some(window));
let current_time = StreamAlphaNode::current_time_ms();
let recent_event = create_test_event("sensors", "TempReading", current_time - 2000);
assert!(node.process_event(&recent_event));
let old_event = create_test_event("sensors", "TempReading", current_time - 6000);
assert!(!node.process_event(&old_event));
assert_eq!(node.event_count(), 1);
}
#[test]
fn test_tumbling_window() {
let window = WindowSpec {
duration: Duration::from_secs(10),
window_type: WindowType::Tumbling,
};
let mut node = StreamAlphaNode::new("sensors", None, Some(window));
let current_time = StreamAlphaNode::current_time_ms();
let window_start = (current_time / 10_000) * 10_000;
let event1 = create_test_event("sensors", "TempReading", window_start + 1000);
assert!(node.process_event(&event1));
let event2 = create_test_event("sensors", "TempReading", window_start + 5000);
assert!(node.process_event(&event2));
let old_event = create_test_event("sensors", "TempReading", window_start - 5000);
assert!(!node.process_event(&old_event));
assert_eq!(node.event_count(), 2);
}
#[test]
fn test_eviction() {
let window = WindowSpec {
duration: Duration::from_millis(100),
window_type: WindowType::Sliding,
};
let mut node = StreamAlphaNode::new("test-stream", None, Some(window));
let current_time = StreamAlphaNode::current_time_ms();
let event1 = create_test_event("test-stream", "TestEvent", current_time - 50);
node.process_event(&event1);
assert_eq!(node.event_count(), 1);
std::thread::sleep(Duration::from_millis(150));
let event2 = create_test_event(
"test-stream",
"TestEvent",
StreamAlphaNode::current_time_ms(),
);
node.process_event(&event2);
assert_eq!(node.event_count(), 1);
}
#[test]
fn test_max_events_limit() {
let mut node = StreamAlphaNode::new("test-stream", None, None).with_max_events(5);
let current_time = StreamAlphaNode::current_time_ms();
for i in 0..10 {
let event = create_test_event("test-stream", "TestEvent", current_time + i);
node.process_event(&event);
}
assert_eq!(node.event_count(), 5);
}
#[test]
fn test_clear() {
let mut node = StreamAlphaNode::new("test-stream", None, None);
let event = create_test_event("test-stream", "TestEvent", 1000);
node.process_event(&event);
assert_eq!(node.event_count(), 1);
node.clear();
assert_eq!(node.event_count(), 0);
}
#[test]
fn test_window_stats() {
let window = WindowSpec {
duration: Duration::from_secs(60),
window_type: WindowType::Sliding,
};
let mut node = StreamAlphaNode::new("test-stream", None, Some(window));
let current_time = StreamAlphaNode::current_time_ms();
let event1 = create_test_event("test-stream", "TestEvent", current_time - 10_000);
let event2 = create_test_event("test-stream", "TestEvent", current_time - 5_000);
node.process_event(&event1);
node.process_event(&event2);
let stats = node.window_stats();
assert_eq!(stats.event_count, 2);
assert_eq!(stats.oldest_event_timestamp, Some(current_time - 10_000));
assert_eq!(stats.newest_event_timestamp, Some(current_time - 5_000));
assert_eq!(stats.window_duration_ms, Some(60_000));
}
#[test]
fn test_get_events() {
let mut node = StreamAlphaNode::new("test-stream", None, None);
let event1 = create_test_event("test-stream", "Event1", 1000);
let event2 = create_test_event("test-stream", "Event2", 2000);
node.process_event(&event1);
node.process_event(&event2);
let events = node.get_events();
assert_eq!(events.len(), 2);
assert_eq!(events[0].event_type, "Event1");
assert_eq!(events[1].event_type, "Event2");
}
#[test]
fn test_session_window_basic() {
let window = WindowSpec {
duration: Duration::from_secs(60), window_type: WindowType::Session {
timeout: Duration::from_secs(5),
},
};
let mut node = StreamAlphaNode::new("test-stream", None, Some(window));
let current_time = StreamAlphaNode::current_time_ms();
let event1 = create_test_event("test-stream", "Event1", current_time);
assert!(node.process_event(&event1));
assert_eq!(node.event_count(), 1);
let event2 = create_test_event("test-stream", "Event2", current_time + 2000);
assert!(node.process_event(&event2));
assert_eq!(node.event_count(), 2);
let event3 = create_test_event("test-stream", "Event3", current_time + 3000);
assert!(node.process_event(&event3));
assert_eq!(node.event_count(), 3);
}
#[test]
fn test_session_window_timeout_new_session() {
let window = WindowSpec {
duration: Duration::from_secs(60),
window_type: WindowType::Session {
timeout: Duration::from_millis(100),
},
};
let mut node = StreamAlphaNode::new("test-stream", None, Some(window));
let current_time = StreamAlphaNode::current_time_ms();
let event1 = create_test_event("test-stream", "Event1", current_time);
node.process_event(&event1);
assert_eq!(node.event_count(), 1);
std::thread::sleep(Duration::from_millis(150));
let event2 = create_test_event("test-stream", "Event2", StreamAlphaNode::current_time_ms());
node.process_event(&event2);
assert_eq!(node.event_count(), 1);
assert_eq!(node.get_events()[0].event_type, "Event2");
}
#[test]
fn test_session_window_gap_detection() {
let window = WindowSpec {
duration: Duration::from_secs(60),
window_type: WindowType::Session {
timeout: Duration::from_secs(2),
},
};
let mut node = StreamAlphaNode::new("test-stream", None, Some(window));
let base_time = StreamAlphaNode::current_time_ms();
let event1 = create_test_event("test-stream", "S1_Event1", base_time);
let event2 = create_test_event("test-stream", "S1_Event2", base_time + 1000);
node.process_event(&event1);
node.process_event(&event2);
assert_eq!(node.event_count(), 2);
let event3 = create_test_event("test-stream", "S2_Event1", base_time + 5000);
node.process_event(&event3);
assert!(node
.get_events()
.iter()
.any(|e| e.event_type == "S2_Event1"));
}
#[test]
fn test_session_window_eviction_after_timeout() {
let window = WindowSpec {
duration: Duration::from_secs(60),
window_type: WindowType::Session {
timeout: Duration::from_millis(200),
},
};
let mut node = StreamAlphaNode::new("test-stream", None, Some(window));
let base_time = StreamAlphaNode::current_time_ms();
let event1 = create_test_event("test-stream", "Event1", base_time);
let event2 = create_test_event("test-stream", "Event2", base_time + 50);
node.process_event(&event1);
node.process_event(&event2);
assert_eq!(node.event_count(), 2);
let event3 = create_test_event("test-stream", "Event3", base_time + 300);
node.process_event(&event3);
assert_eq!(node.event_count(), 1);
assert_eq!(node.get_events()[0].event_type, "Event3");
}
#[test]
fn test_session_window_clear_resets_state() {
let window = WindowSpec {
duration: Duration::from_secs(60),
window_type: WindowType::Session {
timeout: Duration::from_secs(5),
},
};
let mut node = StreamAlphaNode::new("test-stream", None, Some(window));
let current_time = StreamAlphaNode::current_time_ms();
let event = create_test_event("test-stream", "Event1", current_time);
node.process_event(&event);
assert_eq!(node.event_count(), 1);
assert!(node.last_session_event_timestamp.is_some());
node.clear();
assert_eq!(node.event_count(), 0);
assert!(node.last_session_event_timestamp.is_none());
}
#[test]
fn test_session_window_continuous_activity() {
let window = WindowSpec {
duration: Duration::from_secs(60),
window_type: WindowType::Session {
timeout: Duration::from_secs(1),
},
};
let mut node = StreamAlphaNode::new("test-stream", None, Some(window));
let base_time = StreamAlphaNode::current_time_ms();
for i in 0..5 {
let event =
create_test_event("test-stream", &format!("Event{}", i), base_time + (i * 500));
node.process_event(&event);
}
assert_eq!(node.event_count(), 5);
}
#[test]
fn test_session_window_multiple_sessions() {
let window = WindowSpec {
duration: Duration::from_secs(60),
window_type: WindowType::Session {
timeout: Duration::from_millis(500),
},
};
let mut node = StreamAlphaNode::new("test-stream", None, Some(window));
let base_time = StreamAlphaNode::current_time_ms();
node.process_event(&create_test_event("test-stream", "S1_E1", base_time));
node.process_event(&create_test_event("test-stream", "S1_E2", base_time + 200));
node.process_event(&create_test_event("test-stream", "S2_E1", base_time + 1000));
node.process_event(&create_test_event("test-stream", "S3_E1", base_time + 2000));
assert!(node.event_count() > 0);
}
}