1#![allow(missing_docs)]
7
8use crate::streaming::event::StreamEvent;
9use crate::streaming::window::WindowType;
10use std::collections::VecDeque;
11use std::time::{Duration, SystemTime, UNIX_EPOCH};
12
13#[derive(Debug, Clone)]
21pub struct StreamAlphaNode {
22 pub stream_name: String,
24
25 pub event_type: Option<String>,
27
28 pub window: Option<WindowSpec>,
30
31 events: VecDeque<StreamEvent>,
33
34 max_events: usize,
36
37 last_window_start: u64,
39}
40
41#[derive(Debug, Clone, PartialEq)]
43pub struct WindowSpec {
44 pub duration: Duration,
45 pub window_type: WindowType,
46}
47
48impl StreamAlphaNode {
49 pub fn new(
73 stream_name: impl Into<String>,
74 event_type: Option<String>,
75 window: Option<WindowSpec>,
76 ) -> Self {
77 Self {
78 stream_name: stream_name.into(),
79 event_type,
80 window,
81 events: VecDeque::new(),
82 max_events: 10_000, last_window_start: 0,
84 }
85 }
86
87 pub fn with_max_events(mut self, max_events: usize) -> Self {
89 self.max_events = max_events;
90 self
91 }
92
93 pub fn process_event(&mut self, event: &StreamEvent) -> bool {
102 if event.metadata.source != self.stream_name {
104 return false;
105 }
106
107 if let Some(ref expected_type) = self.event_type {
109 if &event.event_type != expected_type {
110 return false;
111 }
112 }
113
114 let matches = if self.window.is_none() {
116 true
117 } else {
118 self.is_in_window(event.metadata.timestamp)
120 };
121
122 if matches {
123 self.add_event(event.clone());
125 self.evict_expired_events();
126 }
127
128 matches
129 }
130
131 fn add_event(&mut self, event: StreamEvent) {
133 self.events.push_back(event);
134
135 while self.events.len() > self.max_events {
137 self.events.pop_front();
138 }
139 }
140
141 fn is_in_window(&self, timestamp: u64) -> bool {
143 match &self.window {
144 None => true,
145 Some(spec) => {
146 let current_time = Self::current_time_ms();
147 let window_duration_ms = spec.duration.as_millis() as u64;
148
149 match spec.window_type {
150 WindowType::Sliding => {
151 timestamp >= current_time.saturating_sub(window_duration_ms)
153 && timestamp <= current_time
154 }
155 WindowType::Tumbling => {
156 let window_start = (current_time / window_duration_ms) * window_duration_ms;
158 let window_end = window_start + window_duration_ms;
159
160 timestamp >= window_start && timestamp < window_end
161 }
162 WindowType::Session { .. } => {
163 timestamp >= current_time.saturating_sub(window_duration_ms)
166 && timestamp <= current_time
167 }
168 }
169 }
170 }
171 }
172
173 fn evict_expired_events(&mut self) {
175 if let Some(spec) = &self.window {
176 let current_time = Self::current_time_ms();
177 let window_duration_ms = spec.duration.as_millis() as u64;
178
179 match spec.window_type {
180 WindowType::Sliding => {
181 let cutoff_time = current_time.saturating_sub(window_duration_ms);
182
183 while let Some(event) = self.events.front() {
185 if event.metadata.timestamp < cutoff_time {
186 self.events.pop_front();
187 } else {
188 break;
189 }
190 }
191 }
192 WindowType::Tumbling => {
193 let window_start = (current_time / window_duration_ms) * window_duration_ms;
194
195 if self.last_window_start != 0 && window_start != self.last_window_start {
197 self.events.clear();
198 self.last_window_start = window_start;
199 } else if self.last_window_start == 0 {
200 self.last_window_start = window_start;
201 }
202
203 while let Some(event) = self.events.front() {
205 if event.metadata.timestamp < window_start {
206 self.events.pop_front();
207 } else {
208 break;
209 }
210 }
211 }
212 WindowType::Session { .. } => {
213 let cutoff_time = current_time.saturating_sub(window_duration_ms);
216
217 while let Some(event) = self.events.front() {
218 if event.metadata.timestamp < cutoff_time {
219 self.events.pop_front();
220 } else {
221 break;
222 }
223 }
224 }
225 }
226 }
227 }
228
229 pub fn get_events(&self) -> &VecDeque<StreamEvent> {
234 &self.events
235 }
236
237 pub fn event_count(&self) -> usize {
239 self.events.len()
240 }
241
242 fn current_time_ms() -> u64 {
244 SystemTime::now()
245 .duration_since(UNIX_EPOCH)
246 .unwrap()
247 .as_millis() as u64
248 }
249
250 pub fn clear(&mut self) {
252 self.events.clear();
253 self.last_window_start = 0;
254 }
255
256 pub fn window_stats(&self) -> WindowStats {
258 WindowStats {
259 event_count: self.events.len(),
260 oldest_event_timestamp: self.events.front().map(|e| e.metadata.timestamp),
261 newest_event_timestamp: self.events.back().map(|e| e.metadata.timestamp),
262 window_duration_ms: self.window.as_ref().map(|w| w.duration.as_millis() as u64),
263 }
264 }
265}
266
267#[derive(Debug, Clone)]
269pub struct WindowStats {
270 pub event_count: usize,
271 pub oldest_event_timestamp: Option<u64>,
272 pub newest_event_timestamp: Option<u64>,
273 pub window_duration_ms: Option<u64>,
274}
275
276#[cfg(test)]
277mod tests {
278 use super::*;
279 use crate::streaming::event::StreamEvent;
280 use crate::types::Value;
281 use std::collections::HashMap;
282
283 fn create_test_event(stream_name: &str, event_type: &str, timestamp: u64) -> StreamEvent {
284 let mut data = HashMap::new();
285 data.insert(
286 "test_field".to_string(),
287 Value::String("test_value".to_string()),
288 );
289
290 StreamEvent::with_timestamp(event_type, data, stream_name, timestamp)
291 }
292
293 #[test]
294 fn test_stream_alpha_node_basic() {
295 let mut node = StreamAlphaNode::new("user-events", None, None);
296 let event = create_test_event("user-events", "LoginEvent", 1000);
297
298 assert!(node.process_event(&event));
299 assert_eq!(node.event_count(), 1);
300 }
301
302 #[test]
303 fn test_stream_name_filtering() {
304 let mut node = StreamAlphaNode::new("user-events", None, None);
305
306 let matching_event = create_test_event("user-events", "LoginEvent", 1000);
307 let non_matching_event = create_test_event("other-stream", "LoginEvent", 1000);
308
309 assert!(node.process_event(&matching_event));
310 assert!(!node.process_event(&non_matching_event));
311 assert_eq!(node.event_count(), 1);
312 }
313
314 #[test]
315 fn test_event_type_filtering() {
316 let mut node = StreamAlphaNode::new("user-events", Some("LoginEvent".to_string()), None);
317
318 let matching_event = create_test_event("user-events", "LoginEvent", 1000);
319 let non_matching_event = create_test_event("user-events", "LogoutEvent", 1000);
320
321 assert!(node.process_event(&matching_event));
322 assert!(!node.process_event(&non_matching_event));
323 assert_eq!(node.event_count(), 1);
324 }
325
326 #[test]
327 fn test_sliding_window() {
328 let window = WindowSpec {
329 duration: Duration::from_secs(5),
330 window_type: WindowType::Sliding,
331 };
332
333 let mut node = StreamAlphaNode::new("sensors", None, Some(window));
334
335 let current_time = StreamAlphaNode::current_time_ms();
336
337 let recent_event = create_test_event("sensors", "TempReading", current_time - 2000);
339 assert!(node.process_event(&recent_event));
340
341 let old_event = create_test_event("sensors", "TempReading", current_time - 6000);
343 assert!(!node.process_event(&old_event));
344
345 assert_eq!(node.event_count(), 1);
346 }
347
348 #[test]
349 fn test_tumbling_window() {
350 let window = WindowSpec {
351 duration: Duration::from_secs(10),
352 window_type: WindowType::Tumbling,
353 };
354
355 let mut node = StreamAlphaNode::new("sensors", None, Some(window));
356
357 let current_time = StreamAlphaNode::current_time_ms();
358 let window_start = (current_time / 10_000) * 10_000;
359
360 let event1 = create_test_event("sensors", "TempReading", window_start + 1000);
362 assert!(node.process_event(&event1));
363
364 let event2 = create_test_event("sensors", "TempReading", window_start + 5000);
366 assert!(node.process_event(&event2));
367
368 let old_event = create_test_event("sensors", "TempReading", window_start - 5000);
370 assert!(!node.process_event(&old_event));
371
372 assert_eq!(node.event_count(), 2);
373 }
374
375 #[test]
376 fn test_eviction() {
377 let window = WindowSpec {
378 duration: Duration::from_millis(100),
379 window_type: WindowType::Sliding,
380 };
381
382 let mut node = StreamAlphaNode::new("test-stream", None, Some(window));
383
384 let current_time = StreamAlphaNode::current_time_ms();
385
386 let event1 = create_test_event("test-stream", "TestEvent", current_time - 50);
388 node.process_event(&event1);
389
390 assert_eq!(node.event_count(), 1);
391
392 std::thread::sleep(Duration::from_millis(150));
394
395 let event2 = create_test_event(
397 "test-stream",
398 "TestEvent",
399 StreamAlphaNode::current_time_ms(),
400 );
401 node.process_event(&event2);
402
403 assert_eq!(node.event_count(), 1);
405 }
406
407 #[test]
408 fn test_max_events_limit() {
409 let mut node = StreamAlphaNode::new("test-stream", None, None).with_max_events(5);
410
411 let current_time = StreamAlphaNode::current_time_ms();
412
413 for i in 0..10 {
415 let event = create_test_event("test-stream", "TestEvent", current_time + i);
416 node.process_event(&event);
417 }
418
419 assert_eq!(node.event_count(), 5);
421 }
422
423 #[test]
424 fn test_clear() {
425 let mut node = StreamAlphaNode::new("test-stream", None, None);
426
427 let event = create_test_event("test-stream", "TestEvent", 1000);
428 node.process_event(&event);
429
430 assert_eq!(node.event_count(), 1);
431
432 node.clear();
433 assert_eq!(node.event_count(), 0);
434 }
435
436 #[test]
437 fn test_window_stats() {
438 let window = WindowSpec {
439 duration: Duration::from_secs(60),
440 window_type: WindowType::Sliding,
441 };
442
443 let mut node = StreamAlphaNode::new("test-stream", None, Some(window));
444
445 let current_time = StreamAlphaNode::current_time_ms();
446 let event1 = create_test_event("test-stream", "TestEvent", current_time - 10_000);
447 let event2 = create_test_event("test-stream", "TestEvent", current_time - 5_000);
448
449 node.process_event(&event1);
450 node.process_event(&event2);
451
452 let stats = node.window_stats();
453 assert_eq!(stats.event_count, 2);
454 assert_eq!(stats.oldest_event_timestamp, Some(current_time - 10_000));
455 assert_eq!(stats.newest_event_timestamp, Some(current_time - 5_000));
456 assert_eq!(stats.window_duration_ms, Some(60_000));
457 }
458
459 #[test]
460 fn test_get_events() {
461 let mut node = StreamAlphaNode::new("test-stream", None, None);
462
463 let event1 = create_test_event("test-stream", "Event1", 1000);
464 let event2 = create_test_event("test-stream", "Event2", 2000);
465
466 node.process_event(&event1);
467 node.process_event(&event2);
468
469 let events = node.get_events();
470 assert_eq!(events.len(), 2);
471 assert_eq!(events[0].event_type, "Event1");
472 assert_eq!(events[1].event_type, "Event2");
473 }
474}