Skip to main content

tl_stream/
window.rs

1// ThinkingLanguage — Window types for stream processing
2
3use std::fmt;
4
5/// Window type with duration in milliseconds.
6#[derive(Debug, Clone)]
7pub enum WindowType {
8    /// Fixed-size, non-overlapping window.
9    Tumbling { duration_ms: u64 },
10    /// Overlapping window with window size and slide interval.
11    Sliding { window_ms: u64, slide_ms: u64 },
12    /// Session window based on activity gap.
13    Session { gap_ms: u64 },
14}
15
16impl fmt::Display for WindowType {
17    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
18        match self {
19            WindowType::Tumbling { duration_ms } => write!(f, "tumbling({duration_ms}ms)"),
20            WindowType::Sliding {
21                window_ms,
22                slide_ms,
23            } => {
24                write!(f, "sliding({window_ms}ms, {slide_ms}ms)")
25            }
26            WindowType::Session { gap_ms } => write!(f, "session({gap_ms}ms)"),
27        }
28    }
29}
30
31/// State for a window, buffering events and determining when to fire.
32#[derive(Debug)]
33pub struct WindowState {
34    pub window_type: WindowType,
35    pub events: Vec<WindowEvent>,
36    pub window_start: u64,
37    pub last_event_time: u64,
38}
39
40/// An event in a window buffer.
41#[derive(Debug, Clone)]
42pub struct WindowEvent {
43    pub value: String,
44    pub timestamp: u64,
45}
46
47impl WindowState {
48    pub fn new(window_type: WindowType) -> Self {
49        WindowState {
50            window_type,
51            events: Vec::new(),
52            window_start: 0,
53            last_event_time: 0,
54        }
55    }
56
57    /// Add an event to the window buffer.
58    pub fn add_event(&mut self, value: String, timestamp: u64) {
59        if self.events.is_empty() && self.window_start == 0 && self.last_event_time == 0 {
60            // First event ever — initialize window start
61            self.window_start = timestamp;
62        }
63        self.last_event_time = timestamp;
64        self.events.push(WindowEvent { value, timestamp });
65    }
66
67    /// Check if the window should fire (emit results).
68    pub fn should_fire(&self, current_time: u64) -> bool {
69        if self.events.is_empty() {
70            return false;
71        }
72        match &self.window_type {
73            WindowType::Tumbling { duration_ms } => current_time >= self.window_start + duration_ms,
74            WindowType::Sliding { slide_ms, .. } => current_time >= self.window_start + slide_ms,
75            WindowType::Session { gap_ms } => current_time >= self.last_event_time + gap_ms,
76        }
77    }
78
79    /// Fire the window: drain events and reset.
80    /// Returns the events that were in the window.
81    pub fn fire(&mut self) -> Vec<WindowEvent> {
82        let events = std::mem::take(&mut self.events);
83        match &self.window_type {
84            WindowType::Tumbling { duration_ms } => {
85                self.window_start += duration_ms;
86            }
87            WindowType::Sliding { slide_ms, .. } => {
88                // For sliding windows, keep events within the window
89                self.window_start += slide_ms;
90            }
91            WindowType::Session { .. } => {
92                self.window_start = 0;
93                self.last_event_time = 0;
94            }
95        }
96        events
97    }
98
99    /// Number of buffered events.
100    pub fn len(&self) -> usize {
101        self.events.len()
102    }
103
104    /// Whether the buffer is empty.
105    pub fn is_empty(&self) -> bool {
106        self.events.is_empty()
107    }
108}
109
110#[cfg(test)]
111mod tests {
112    use super::*;
113
114    #[test]
115    fn test_tumbling_window_fires_after_duration() {
116        let mut state = WindowState::new(WindowType::Tumbling { duration_ms: 1000 });
117        state.add_event("a".to_string(), 0);
118        state.add_event("b".to_string(), 500);
119
120        assert!(!state.should_fire(500));
121        assert!(!state.should_fire(999));
122        assert!(state.should_fire(1000));
123        assert!(state.should_fire(1500));
124
125        let events = state.fire();
126        assert_eq!(events.len(), 2);
127        assert_eq!(events[0].value, "a");
128        assert_eq!(events[1].value, "b");
129        assert!(state.is_empty());
130    }
131
132    #[test]
133    fn test_tumbling_window_resets_after_fire() {
134        let mut state = WindowState::new(WindowType::Tumbling { duration_ms: 1000 });
135        state.add_event("a".to_string(), 0);
136        assert!(state.should_fire(1000));
137        state.fire();
138
139        // New window starts at 1000
140        state.add_event("b".to_string(), 1200);
141        assert!(!state.should_fire(1500));
142        assert!(state.should_fire(2000));
143    }
144
145    #[test]
146    fn test_sliding_window_fires_on_slide() {
147        let mut state = WindowState::new(WindowType::Sliding {
148            window_ms: 1000,
149            slide_ms: 500,
150        });
151        state.add_event("a".to_string(), 0);
152        state.add_event("b".to_string(), 300);
153
154        assert!(!state.should_fire(400));
155        assert!(state.should_fire(500));
156    }
157
158    #[test]
159    fn test_session_window_fires_on_gap() {
160        let mut state = WindowState::new(WindowType::Session { gap_ms: 500 });
161        state.add_event("a".to_string(), 0);
162        state.add_event("b".to_string(), 200);
163
164        assert!(!state.should_fire(600));
165        assert!(state.should_fire(700));
166        assert!(state.should_fire(800));
167
168        let events = state.fire();
169        assert_eq!(events.len(), 2);
170    }
171
172    #[test]
173    fn test_empty_window_never_fires() {
174        let state = WindowState::new(WindowType::Tumbling { duration_ms: 1000 });
175        assert!(!state.should_fire(5000));
176    }
177
178    #[test]
179    fn test_window_type_display() {
180        assert_eq!(
181            format!("{}", WindowType::Tumbling { duration_ms: 5000 }),
182            "tumbling(5000ms)"
183        );
184        assert_eq!(
185            format!(
186                "{}",
187                WindowType::Sliding {
188                    window_ms: 10000,
189                    slide_ms: 1000
190                }
191            ),
192            "sliding(10000ms, 1000ms)"
193        );
194        assert_eq!(
195            format!("{}", WindowType::Session { gap_ms: 30000 }),
196            "session(30000ms)"
197        );
198    }
199}