1use alloc::collections::VecDeque;
33use alloc::vec::Vec;
34use core::time::Duration;
35use std::time::Instant;
36
37use crate::emit::Emit;
38use crate::source::Infallible;
39use crate::stage::Stage;
40
41pub trait Clock: Send {
47 fn now(&self) -> Instant;
49}
50
51#[derive(Debug, Default, Clone, Copy)]
53pub struct SystemClock;
54
55impl Clock for SystemClock {
56 fn now(&self) -> Instant {
57 Instant::now()
58 }
59}
60
61#[derive(Debug, Clone, Copy, PartialEq, Eq)]
67pub enum WindowPolicy {
68 Tumbling {
72 size: Duration,
74 },
75 Sliding {
79 size: Duration,
81 slide: Duration,
83 },
84 Session {
89 idle: Duration,
91 },
92}
93
94#[derive(Debug, Clone)]
101pub struct Window<T> {
102 items: Vec<T>,
103 start: Instant,
104 end: Instant,
105}
106
107impl<T> Window<T> {
108 #[must_use]
111 pub fn new(items: Vec<T>, start: Instant, end: Instant) -> Self {
112 Self { items, start, end }
113 }
114
115 #[must_use]
117 pub fn items(&self) -> &[T] {
118 &self.items
119 }
120
121 #[must_use]
123 pub fn len(&self) -> usize {
124 self.items.len()
125 }
126
127 #[must_use]
129 pub fn is_empty(&self) -> bool {
130 self.items.is_empty()
131 }
132
133 #[must_use]
135 pub fn start(&self) -> Instant {
136 self.start
137 }
138
139 #[must_use]
142 pub fn end(&self) -> Instant {
143 self.end
144 }
145
146 #[must_use]
148 pub fn into_inner(self) -> Vec<T> {
149 self.items
150 }
151}
152
153impl<T> IntoIterator for Window<T> {
154 type Item = T;
155 type IntoIter = alloc::vec::IntoIter<T>;
156 fn into_iter(self) -> Self::IntoIter {
157 self.items.into_iter()
158 }
159}
160
161impl<'a, T> IntoIterator for &'a Window<T> {
162 type Item = &'a T;
163 type IntoIter = core::slice::Iter<'a, T>;
164 fn into_iter(self) -> Self::IntoIter {
165 self.items.iter()
166 }
167}
168
169struct PendingWindow<T> {
179 start: Instant,
180 end: Instant,
181 items: Vec<T>,
182}
183
184pub(crate) struct WindowStage<T: Clone, C: Clock> {
185 policy: WindowPolicy,
186 clock: C,
187 state: WindowState<T>,
188}
189
190enum WindowState<T> {
191 Tumbling {
192 start: Option<Instant>,
193 items: Vec<T>,
194 },
195 Sliding {
196 windows: VecDeque<PendingWindow<T>>,
197 next_window_start: Option<Instant>,
198 },
199 Session {
200 start: Option<Instant>,
201 last_seen: Option<Instant>,
202 items: Vec<T>,
203 },
204}
205
206impl<T: Clone, C: Clock> WindowStage<T, C> {
207 pub(crate) fn new(policy: WindowPolicy, clock: C) -> Self {
208 let state = match policy {
209 WindowPolicy::Tumbling { .. } => WindowState::Tumbling {
210 start: None,
211 items: Vec::new(),
212 },
213 WindowPolicy::Sliding { .. } => WindowState::Sliding {
214 windows: VecDeque::new(),
215 next_window_start: None,
216 },
217 WindowPolicy::Session { .. } => WindowState::Session {
218 start: None,
219 last_seen: None,
220 items: Vec::new(),
221 },
222 };
223 Self {
224 policy,
225 clock,
226 state,
227 }
228 }
229}
230
231impl<T, C> Stage for WindowStage<T, C>
232where
233 T: Clone + Send + 'static,
234 C: Clock + 'static,
235{
236 type Input = T;
237 type Output = Window<T>;
238 type Error = Infallible;
239
240 fn process(
241 &mut self,
242 item: Self::Input,
243 out: &mut dyn Emit<Item = Self::Output>,
244 ) -> Result<(), Self::Error> {
245 let now = self.clock.now();
246 match (&self.policy, &mut self.state) {
247 (WindowPolicy::Tumbling { size }, WindowState::Tumbling { start, items }) => {
248 if start.is_none() {
249 *start = Some(now);
250 }
251 while let Some(s) = *start {
253 if now.saturating_duration_since(s) >= *size {
254 let window_items = core::mem::take(items);
255 let _ = out.emit(Window::new(window_items, s, s + *size));
256 *start = Some(s + *size);
257 } else {
258 break;
259 }
260 }
261 items.push(item);
262 }
263 (
264 WindowPolicy::Session { idle },
265 WindowState::Session {
266 start,
267 last_seen,
268 items,
269 },
270 ) => {
271 if let Some(ls) = *last_seen {
272 if now.saturating_duration_since(ls) > *idle {
273 if let Some(s) = *start {
274 let window_items = core::mem::take(items);
275 let _ = out.emit(Window::new(window_items, s, ls));
276 }
277 *start = Some(now);
278 }
279 } else {
280 *start = Some(now);
281 }
282 *last_seen = Some(now);
283 items.push(item);
284 }
285 (
286 WindowPolicy::Sliding { size, slide },
287 WindowState::Sliding {
288 windows,
289 next_window_start,
290 },
291 ) => {
292 if next_window_start.is_none() {
293 *next_window_start = Some(now);
294 }
295 while let Some(s) = *next_window_start {
297 if s <= now {
298 windows.push_back(PendingWindow {
299 start: s,
300 end: s + *size,
301 items: Vec::new(),
302 });
303 *next_window_start = Some(s + *slide);
304 } else {
305 break;
306 }
307 }
308 while let Some(w) = windows.front() {
310 if w.end <= now {
311 let w = windows.pop_front().expect("front exists");
312 let _ = out.emit(Window::new(w.items, w.start, w.end));
313 } else {
314 break;
315 }
316 }
317 for w in windows.iter_mut() {
319 if w.start <= now && now < w.end {
320 w.items.push(item.clone());
321 }
322 }
323 }
324 _ => unreachable!(
325 "policy/state mismatch is impossible by construction; \
326 WindowStage::new enforces alignment"
327 ),
328 }
329 Ok(())
330 }
331
332 fn flush(&mut self, out: &mut dyn Emit<Item = Self::Output>) -> Result<(), Self::Error> {
333 let now = self.clock.now();
334 match &mut self.state {
335 WindowState::Tumbling { start, items } => {
336 if !items.is_empty() {
337 let s = start.unwrap_or(now);
338 let window_items = core::mem::take(items);
339 let _ = out.emit(Window::new(window_items, s, now));
340 }
341 }
342 WindowState::Session {
343 start,
344 last_seen,
345 items,
346 } => {
347 if !items.is_empty() {
348 let s = start.unwrap_or(now);
349 let e = last_seen.unwrap_or(now);
350 let window_items = core::mem::take(items);
351 let _ = out.emit(Window::new(window_items, s, e));
352 }
353 }
354 WindowState::Sliding { windows, .. } => {
355 while let Some(w) = windows.pop_front() {
356 if !w.items.is_empty() {
357 let _ = out.emit(Window::new(w.items, w.start, w.end));
358 }
359 }
360 }
361 }
362 Ok(())
363 }
364}
365
366#[cfg(test)]
367mod tests {
368 use super::*;
369 use crate::emit::EmitError;
370 use std::sync::{Arc, Mutex};
371
372 #[derive(Clone)]
375 struct FakeClock {
376 inner: Arc<Mutex<Instant>>,
377 }
378
379 impl FakeClock {
380 fn new(start: Instant) -> Self {
381 Self {
382 inner: Arc::new(Mutex::new(start)),
383 }
384 }
385
386 fn advance(&self, by: Duration) {
387 let mut g = self.inner.lock().unwrap();
388 *g += by;
389 }
390 }
391
392 impl Clock for FakeClock {
393 fn now(&self) -> Instant {
394 *self.inner.lock().unwrap()
395 }
396 }
397
398 struct Collect<T> {
399 out: Vec<Window<T>>,
400 }
401 impl<T> Emit for Collect<T> {
402 type Item = Window<T>;
403 fn emit(&mut self, w: Window<T>) -> Result<(), EmitError> {
404 self.out.push(w);
405 Ok(())
406 }
407 }
408
409 #[test]
410 fn tumbling_emits_on_boundary() {
411 let t0 = Instant::now();
412 let clock = FakeClock::new(t0);
413 let mut stage = WindowStage::<u32, _>::new(
414 WindowPolicy::Tumbling {
415 size: Duration::from_secs(10),
416 },
417 clock.clone(),
418 );
419 let mut emit = Collect::<u32> { out: Vec::new() };
420
421 stage.process(1, &mut emit).unwrap();
423 assert!(emit.out.is_empty());
424
425 clock.advance(Duration::from_secs(5));
427 stage.process(2, &mut emit).unwrap();
428 assert!(emit.out.is_empty());
429
430 clock.advance(Duration::from_secs(5));
432 stage.process(3, &mut emit).unwrap();
433 assert_eq!(emit.out.len(), 1);
434 assert_eq!(emit.out[0].items(), &[1, 2]);
435
436 clock.advance(Duration::from_secs(10));
438 stage.process(4, &mut emit).unwrap();
439 assert_eq!(emit.out.len(), 2);
440 assert_eq!(emit.out[1].items(), &[3]);
441
442 stage.flush(&mut emit).unwrap();
444 assert_eq!(emit.out.len(), 3);
445 assert_eq!(emit.out[2].items(), &[4]);
446 }
447
448 #[test]
449 fn session_closes_after_idle() {
450 let t0 = Instant::now();
451 let clock = FakeClock::new(t0);
452 let mut stage = WindowStage::<u32, _>::new(
453 WindowPolicy::Session {
454 idle: Duration::from_secs(5),
455 },
456 clock.clone(),
457 );
458 let mut emit = Collect::<u32> { out: Vec::new() };
459
460 stage.process(1, &mut emit).unwrap();
462 clock.advance(Duration::from_secs(2));
464 stage.process(2, &mut emit).unwrap();
465 clock.advance(Duration::from_secs(2));
467 stage.process(3, &mut emit).unwrap();
468 assert!(emit.out.is_empty());
469
470 clock.advance(Duration::from_secs(16));
472 stage.process(4, &mut emit).unwrap();
473 assert_eq!(emit.out.len(), 1);
474 assert_eq!(emit.out[0].items(), &[1, 2, 3]);
475
476 stage.flush(&mut emit).unwrap();
478 assert_eq!(emit.out.len(), 2);
479 assert_eq!(emit.out[1].items(), &[4]);
480 }
481
482 #[test]
483 fn sliding_overlapping_windows() {
484 let t0 = Instant::now();
485 let clock = FakeClock::new(t0);
486 let mut stage = WindowStage::<u32, _>::new(
488 WindowPolicy::Sliding {
489 size: Duration::from_secs(10),
490 slide: Duration::from_secs(5),
491 },
492 clock.clone(),
493 );
494 let mut emit = Collect::<u32> { out: Vec::new() };
495
496 stage.process(1, &mut emit).unwrap();
498 clock.advance(Duration::from_secs(3));
500 stage.process(2, &mut emit).unwrap();
501 clock.advance(Duration::from_secs(2));
503 stage.process(3, &mut emit).unwrap();
504 clock.advance(Duration::from_secs(5));
506 stage.process(4, &mut emit).unwrap();
507 assert_eq!(emit.out.len(), 1);
508 assert_eq!(emit.out[0].items(), &[1, 2, 3]);
509
510 stage.flush(&mut emit).unwrap();
512 assert_eq!(emit.out.len(), 3);
513 assert_eq!(emit.out[1].items(), &[3, 4]);
514 assert_eq!(emit.out[2].items(), &[4]);
515 }
516
517 #[test]
518 fn tumbling_flush_emits_partial() {
519 let t0 = Instant::now();
520 let clock = FakeClock::new(t0);
521 let mut stage = WindowStage::<u32, _>::new(
522 WindowPolicy::Tumbling {
523 size: Duration::from_secs(10),
524 },
525 clock.clone(),
526 );
527 let mut emit = Collect::<u32> { out: Vec::new() };
528
529 stage.process(1, &mut emit).unwrap();
530 stage.process(2, &mut emit).unwrap();
531 stage.flush(&mut emit).unwrap();
532 assert_eq!(emit.out.len(), 1);
533 assert_eq!(emit.out[0].items(), &[1, 2]);
534 }
535
536 #[test]
537 fn window_into_inner_returns_items() {
538 let t = Instant::now();
539 let w = Window::new(alloc::vec![10u32, 20, 30], t, t);
540 assert_eq!(w.len(), 3);
541 assert!(!w.is_empty());
542 assert_eq!(w.into_inner(), alloc::vec![10, 20, 30]);
543 }
544}