1use crate::window::{WindowEvent, WindowState, WindowType};
4use std::fmt;
5
6#[derive(Debug, Clone)]
8pub struct StreamDef {
9 pub name: String,
10 pub window: Option<WindowType>,
11 pub watermark_ms: Option<u64>,
12}
13
14impl fmt::Display for StreamDef {
15 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
16 match &self.window {
17 Some(w) => write!(f, "<stream {} window={}>", self.name, w),
18 None => write!(f, "<stream {}>", self.name),
19 }
20 }
21}
22
23#[derive(Debug, Clone)]
25pub struct StreamEvent {
26 pub key: Option<String>,
27 pub value: String,
28 pub timestamp: u64,
29}
30
31pub struct StreamRunner {
33 pub def: StreamDef,
34 window_state: Option<WindowState>,
35 events_processed: u64,
36}
37
38impl StreamRunner {
39 pub fn new(def: StreamDef) -> Self {
40 let window_state = def.window.as_ref().map(|w| WindowState::new(w.clone()));
41 StreamRunner {
42 def,
43 window_state,
44 events_processed: 0,
45 }
46 }
47
48 pub fn process_event(&mut self, event: StreamEvent) -> Vec<Vec<WindowEvent>> {
50 self.events_processed += 1;
51 let mut outputs = Vec::new();
52
53 if let Some(ref mut state) = self.window_state {
54 state.add_event(event.value, event.timestamp);
55 if state.should_fire(event.timestamp) {
56 outputs.push(state.fire());
57 }
58 }
59 outputs
62 }
63
64 pub fn flush(&mut self) -> Option<Vec<WindowEvent>> {
66 if let Some(ref mut state) = self.window_state
67 && !state.is_empty()
68 {
69 return Some(state.fire());
70 }
71 None
72 }
73
74 pub fn events_processed(&self) -> u64 {
76 self.events_processed
77 }
78
79 pub fn has_window(&self) -> bool {
81 self.window_state.is_some()
82 }
83}
84
85#[cfg(test)]
86mod tests {
87 use super::*;
88 use crate::window::WindowType;
89
90 #[test]
91 fn test_stream_runner_no_window() {
92 let def = StreamDef {
93 name: "passthrough".to_string(),
94 window: None,
95 watermark_ms: None,
96 };
97 let mut runner = StreamRunner::new(def);
98 assert!(!runner.has_window());
99
100 let outputs = runner.process_event(StreamEvent {
101 key: None,
102 value: "hello".to_string(),
103 timestamp: 0,
104 });
105 assert!(outputs.is_empty()); assert_eq!(runner.events_processed(), 1);
107 }
108
109 #[test]
110 fn test_stream_runner_tumbling_window() {
111 let def = StreamDef {
112 name: "windowed".to_string(),
113 window: Some(WindowType::Tumbling { duration_ms: 1000 }),
114 watermark_ms: None,
115 };
116 let mut runner = StreamRunner::new(def);
117 assert!(runner.has_window());
118
119 let out1 = runner.process_event(StreamEvent {
121 key: None,
122 value: "a".to_string(),
123 timestamp: 0,
124 });
125 assert!(out1.is_empty());
126
127 let out2 = runner.process_event(StreamEvent {
128 key: None,
129 value: "b".to_string(),
130 timestamp: 500,
131 });
132 assert!(out2.is_empty());
133
134 let out3 = runner.process_event(StreamEvent {
136 key: None,
137 value: "c".to_string(),
138 timestamp: 1000,
139 });
140 assert_eq!(out3.len(), 1);
141 assert_eq!(out3[0].len(), 3); }
143
144 #[test]
145 fn test_stream_runner_flush() {
146 let def = StreamDef {
147 name: "flush_test".to_string(),
148 window: Some(WindowType::Tumbling { duration_ms: 5000 }),
149 watermark_ms: None,
150 };
151 let mut runner = StreamRunner::new(def);
152
153 runner.process_event(StreamEvent {
154 key: None,
155 value: "x".to_string(),
156 timestamp: 100,
157 });
158
159 let flushed = runner.flush();
161 assert!(flushed.is_some());
162 assert_eq!(flushed.unwrap().len(), 1);
163
164 let flushed2 = runner.flush();
166 assert!(flushed2.is_none());
167 }
168
169 #[test]
170 fn test_stream_def_display() {
171 let def = StreamDef {
172 name: "test_stream".to_string(),
173 window: Some(WindowType::Tumbling { duration_ms: 5000 }),
174 watermark_ms: None,
175 };
176 let s = format!("{def}");
177 assert!(s.contains("test_stream"));
178 assert!(s.contains("tumbling"));
179 }
180}