Skip to main content

tl_stream/
stream.rs

1// ThinkingLanguage — Stream processing definitions
2
3use crate::window::{WindowEvent, WindowState, WindowType};
4use std::fmt;
5
6/// Definition of a stream processor.
7#[derive(Debug, Clone)]
8pub struct StreamDef {
9    pub name: String,
10    pub window: Option<WindowType>,
11    pub watermark_ms: Option<u64>,
12}
13
14impl fmt::Display for StreamDef {
15    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
16        match &self.window {
17            Some(w) => write!(f, "<stream {} window={}>", self.name, w),
18            None => write!(f, "<stream {}>", self.name),
19        }
20    }
21}
22
23/// A streaming event with key, value, and timestamp.
24#[derive(Debug, Clone)]
25pub struct StreamEvent {
26    pub key: Option<String>,
27    pub value: String,
28    pub timestamp: u64,
29}
30
31/// Stream runner that processes events through optional windowing.
32pub struct StreamRunner {
33    pub def: StreamDef,
34    window_state: Option<WindowState>,
35    events_processed: u64,
36}
37
38impl StreamRunner {
39    pub fn new(def: StreamDef) -> Self {
40        let window_state = def.window.as_ref().map(|w| WindowState::new(w.clone()));
41        StreamRunner {
42            def,
43            window_state,
44            events_processed: 0,
45        }
46    }
47
48    /// Process a single event. Returns any window outputs.
49    pub fn process_event(&mut self, event: StreamEvent) -> Vec<Vec<WindowEvent>> {
50        self.events_processed += 1;
51        let mut outputs = Vec::new();
52
53        if let Some(ref mut state) = self.window_state {
54            state.add_event(event.value, event.timestamp);
55            if state.should_fire(event.timestamp) {
56                outputs.push(state.fire());
57            }
58        }
59        // If no window, events pass through immediately (handled by caller)
60
61        outputs
62    }
63
64    /// Force-flush any remaining window state.
65    pub fn flush(&mut self) -> Option<Vec<WindowEvent>> {
66        if let Some(ref mut state) = self.window_state
67            && !state.is_empty()
68        {
69            return Some(state.fire());
70        }
71        None
72    }
73
74    /// Number of events processed so far.
75    pub fn events_processed(&self) -> u64 {
76        self.events_processed
77    }
78
79    /// Whether this stream has windowing enabled.
80    pub fn has_window(&self) -> bool {
81        self.window_state.is_some()
82    }
83}
84
85#[cfg(test)]
86mod tests {
87    use super::*;
88    use crate::window::WindowType;
89
90    #[test]
91    fn test_stream_runner_no_window() {
92        let def = StreamDef {
93            name: "passthrough".to_string(),
94            window: None,
95            watermark_ms: None,
96        };
97        let mut runner = StreamRunner::new(def);
98        assert!(!runner.has_window());
99
100        let outputs = runner.process_event(StreamEvent {
101            key: None,
102            value: "hello".to_string(),
103            timestamp: 0,
104        });
105        assert!(outputs.is_empty()); // no window, no buffered output
106        assert_eq!(runner.events_processed(), 1);
107    }
108
109    #[test]
110    fn test_stream_runner_tumbling_window() {
111        let def = StreamDef {
112            name: "windowed".to_string(),
113            window: Some(WindowType::Tumbling { duration_ms: 1000 }),
114            watermark_ms: None,
115        };
116        let mut runner = StreamRunner::new(def);
117        assert!(runner.has_window());
118
119        // Add events within window
120        let out1 = runner.process_event(StreamEvent {
121            key: None,
122            value: "a".to_string(),
123            timestamp: 0,
124        });
125        assert!(out1.is_empty());
126
127        let out2 = runner.process_event(StreamEvent {
128            key: None,
129            value: "b".to_string(),
130            timestamp: 500,
131        });
132        assert!(out2.is_empty());
133
134        // Event at window boundary triggers fire
135        let out3 = runner.process_event(StreamEvent {
136            key: None,
137            value: "c".to_string(),
138            timestamp: 1000,
139        });
140        assert_eq!(out3.len(), 1);
141        assert_eq!(out3[0].len(), 3); // a, b, c
142    }
143
144    #[test]
145    fn test_stream_runner_flush() {
146        let def = StreamDef {
147            name: "flush_test".to_string(),
148            window: Some(WindowType::Tumbling { duration_ms: 5000 }),
149            watermark_ms: None,
150        };
151        let mut runner = StreamRunner::new(def);
152
153        runner.process_event(StreamEvent {
154            key: None,
155            value: "x".to_string(),
156            timestamp: 100,
157        });
158
159        // Flush before window fires
160        let flushed = runner.flush();
161        assert!(flushed.is_some());
162        assert_eq!(flushed.unwrap().len(), 1);
163
164        // Second flush is empty
165        let flushed2 = runner.flush();
166        assert!(flushed2.is_none());
167    }
168
169    #[test]
170    fn test_stream_def_display() {
171        let def = StreamDef {
172            name: "test_stream".to_string(),
173            window: Some(WindowType::Tumbling { duration_ms: 5000 }),
174            watermark_ms: None,
175        };
176        let s = format!("{def}");
177        assert!(s.contains("test_stream"));
178        assert!(s.contains("tumbling"));
179    }
180}