Skip to main content

pipe_io/
window.rs

1//! Windowing primitives: [`Clock`], [`SystemClock`], [`WindowPolicy`],
2//! and [`Window<T>`].
3//!
4//! Windowing groups items by wall-clock time. Three policies are
5//! supported:
6//!
7//! * [`WindowPolicy::Tumbling`] - fixed, non-overlapping windows of
8//!   duration `size`.
9//! * [`WindowPolicy::Sliding`] - overlapping windows of width `size`
10//!   that advance by `slide` each tick. Each item belongs to multiple
11//!   active windows, so `T: Clone` is required.
12//! * [`WindowPolicy::Session`] - adaptive windows that close after
13//!   `idle` of no activity.
14//!
15//! Windows close on the *next* item arriving after the close condition
16//! is satisfied (or at end-of-stream via [`crate::Stage::flush`]). With
17//! pure synchronous execution there is no background timer that fires
18//! while items are not flowing; if a session window is `idle` and no
19//! further items arrive, it closes only when [`crate::Pipeline::run`]
20//! reaches end-of-stream and flushes.
21//!
22//! # Example
23//!
24//! ```
25//! use core::time::Duration;
26//! use pipe_io::WindowPolicy;
27//!
28//! let p = WindowPolicy::Tumbling { size: Duration::from_secs(5) };
29//! assert!(matches!(p, WindowPolicy::Tumbling { .. }));
30//! ```
31
32use alloc::collections::VecDeque;
33use alloc::vec::Vec;
34use core::time::Duration;
35use std::time::Instant;
36
37use crate::emit::Emit;
38use crate::source::Infallible;
39use crate::stage::Stage;
40
41/// Source of wall-clock time used by [`crate::PipelineBuilder::window`].
42///
43/// The default impl is [`SystemClock`], which wraps
44/// [`std::time::Instant::now`]. Custom impls let tests advance time
45/// deterministically.
46pub trait Clock: Send {
47    /// Return the current instant.
48    fn now(&self) -> Instant;
49}
50
51/// Default [`Clock`] using [`std::time::Instant::now`].
52#[derive(Debug, Default, Clone, Copy)]
53pub struct SystemClock;
54
55impl Clock for SystemClock {
56    fn now(&self) -> Instant {
57        Instant::now()
58    }
59}
60
61/// Window emission strategy.
62///
63/// Pass to [`crate::PipelineBuilder::window`] to install a windowing
64/// stage. The carrier type changes from `T` to [`Window<T>`] after
65/// the call.
66#[derive(Debug, Clone, Copy, PartialEq, Eq)]
67pub enum WindowPolicy {
68    /// Fixed-width, non-overlapping windows. The first window starts
69    /// when the first item arrives; subsequent windows are anchored
70    /// at multiples of `size` from that point.
71    Tumbling {
72        /// Width of each window.
73        size: Duration,
74    },
75    /// Overlapping windows of width `size`. A new window starts every
76    /// `slide` interval. Each item is duplicated into every active
77    /// window, so the item type must be `Clone`.
78    Sliding {
79        /// Width of each window.
80        size: Duration,
81        /// Time between successive window starts.
82        slide: Duration,
83    },
84    /// Adaptive window that stays open as long as new items arrive
85    /// within `idle`. Closes (and emits) on the next item arriving
86    /// after `idle` has elapsed since the last item, or at
87    /// end-of-stream.
88    Session {
89        /// Idle gap after which the window closes.
90        idle: Duration,
91    },
92}
93
94/// A closed window of items with a start and end instant.
95///
96/// `start <= end` always holds. For tumbling and sliding windows
97/// `end - start == size` (modulo end-of-stream flushes). For session
98/// windows `end` is the instant of the last item that arrived inside
99/// the session.
100#[derive(Debug, Clone)]
101pub struct Window<T> {
102    items: Vec<T>,
103    start: Instant,
104    end: Instant,
105}
106
107impl<T> Window<T> {
108    /// Construct a window directly from parts. Primarily for tests
109    /// and custom Stage implementations.
110    #[must_use]
111    pub fn new(items: Vec<T>, start: Instant, end: Instant) -> Self {
112        Self { items, start, end }
113    }
114
115    /// Items in this window, in arrival order.
116    #[must_use]
117    pub fn items(&self) -> &[T] {
118        &self.items
119    }
120
121    /// Number of items in the window.
122    #[must_use]
123    pub fn len(&self) -> usize {
124        self.items.len()
125    }
126
127    /// True if the window holds no items.
128    #[must_use]
129    pub fn is_empty(&self) -> bool {
130        self.items.is_empty()
131    }
132
133    /// Window start instant.
134    #[must_use]
135    pub fn start(&self) -> Instant {
136        self.start
137    }
138
139    /// Window end instant. For tumbling/sliding, this is `start + size`.
140    /// For session, this is the timestamp of the last item.
141    #[must_use]
142    pub fn end(&self) -> Instant {
143        self.end
144    }
145
146    /// Unwrap the inner item vec.
147    #[must_use]
148    pub fn into_inner(self) -> Vec<T> {
149        self.items
150    }
151}
152
153impl<T> IntoIterator for Window<T> {
154    type Item = T;
155    type IntoIter = alloc::vec::IntoIter<T>;
156    fn into_iter(self) -> Self::IntoIter {
157        self.items.into_iter()
158    }
159}
160
161impl<'a, T> IntoIterator for &'a Window<T> {
162    type Item = &'a T;
163    type IntoIter = core::slice::Iter<'a, T>;
164    fn into_iter(self) -> Self::IntoIter {
165        self.items.iter()
166    }
167}
168
169// ---------------------------------------------------------------------
170// Internal stage implementation. Used by PipelineBuilder::window.
171// Requires T: Clone universally because sliding windows duplicate
172// items across overlapping windows. Tumbling/session do not strictly
173// need Clone, but keeping a unified stage type simplifies the builder.
174// Consumers with non-Clone item types can use `.batch()` with a
175// `max_age` trigger as a substitute for tumbling windows.
176// ---------------------------------------------------------------------
177
178struct PendingWindow<T> {
179    start: Instant,
180    end: Instant,
181    items: Vec<T>,
182}
183
184pub(crate) struct WindowStage<T: Clone, C: Clock> {
185    policy: WindowPolicy,
186    clock: C,
187    state: WindowState<T>,
188}
189
190enum WindowState<T> {
191    Tumbling {
192        start: Option<Instant>,
193        items: Vec<T>,
194    },
195    Sliding {
196        windows: VecDeque<PendingWindow<T>>,
197        next_window_start: Option<Instant>,
198    },
199    Session {
200        start: Option<Instant>,
201        last_seen: Option<Instant>,
202        items: Vec<T>,
203    },
204}
205
206impl<T: Clone, C: Clock> WindowStage<T, C> {
207    pub(crate) fn new(policy: WindowPolicy, clock: C) -> Self {
208        let state = match policy {
209            WindowPolicy::Tumbling { .. } => WindowState::Tumbling {
210                start: None,
211                items: Vec::new(),
212            },
213            WindowPolicy::Sliding { .. } => WindowState::Sliding {
214                windows: VecDeque::new(),
215                next_window_start: None,
216            },
217            WindowPolicy::Session { .. } => WindowState::Session {
218                start: None,
219                last_seen: None,
220                items: Vec::new(),
221            },
222        };
223        Self {
224            policy,
225            clock,
226            state,
227        }
228    }
229}
230
231impl<T, C> Stage for WindowStage<T, C>
232where
233    T: Clone + Send + 'static,
234    C: Clock + 'static,
235{
236    type Input = T;
237    type Output = Window<T>;
238    type Error = Infallible;
239
240    fn process(
241        &mut self,
242        item: Self::Input,
243        out: &mut dyn Emit<Item = Self::Output>,
244    ) -> Result<(), Self::Error> {
245        let now = self.clock.now();
246        match (&self.policy, &mut self.state) {
247            (WindowPolicy::Tumbling { size }, WindowState::Tumbling { start, items }) => {
248                if start.is_none() {
249                    *start = Some(now);
250                }
251                // Advance window boundaries past any elapsed sizes.
252                while let Some(s) = *start {
253                    if now.saturating_duration_since(s) >= *size {
254                        let window_items = core::mem::take(items);
255                        let _ = out.emit(Window::new(window_items, s, s + *size));
256                        *start = Some(s + *size);
257                    } else {
258                        break;
259                    }
260                }
261                items.push(item);
262            }
263            (
264                WindowPolicy::Session { idle },
265                WindowState::Session {
266                    start,
267                    last_seen,
268                    items,
269                },
270            ) => {
271                if let Some(ls) = *last_seen {
272                    if now.saturating_duration_since(ls) > *idle {
273                        if let Some(s) = *start {
274                            let window_items = core::mem::take(items);
275                            let _ = out.emit(Window::new(window_items, s, ls));
276                        }
277                        *start = Some(now);
278                    }
279                } else {
280                    *start = Some(now);
281                }
282                *last_seen = Some(now);
283                items.push(item);
284            }
285            (
286                WindowPolicy::Sliding { size, slide },
287                WindowState::Sliding {
288                    windows,
289                    next_window_start,
290                },
291            ) => {
292                if next_window_start.is_none() {
293                    *next_window_start = Some(now);
294                }
295                // Spawn new windows whose start is at or before now.
296                while let Some(s) = *next_window_start {
297                    if s <= now {
298                        windows.push_back(PendingWindow {
299                            start: s,
300                            end: s + *size,
301                            items: Vec::new(),
302                        });
303                        *next_window_start = Some(s + *slide);
304                    } else {
305                        break;
306                    }
307                }
308                // Emit windows that have already ended.
309                while let Some(w) = windows.front() {
310                    if w.end <= now {
311                        let w = windows.pop_front().expect("front exists");
312                        let _ = out.emit(Window::new(w.items, w.start, w.end));
313                    } else {
314                        break;
315                    }
316                }
317                // Add the new item to all currently-active windows.
318                for w in windows.iter_mut() {
319                    if w.start <= now && now < w.end {
320                        w.items.push(item.clone());
321                    }
322                }
323            }
324            _ => unreachable!(
325                "policy/state mismatch is impossible by construction; \
326                 WindowStage::new enforces alignment"
327            ),
328        }
329        Ok(())
330    }
331
332    fn flush(&mut self, out: &mut dyn Emit<Item = Self::Output>) -> Result<(), Self::Error> {
333        let now = self.clock.now();
334        match &mut self.state {
335            WindowState::Tumbling { start, items } => {
336                if !items.is_empty() {
337                    let s = start.unwrap_or(now);
338                    let window_items = core::mem::take(items);
339                    let _ = out.emit(Window::new(window_items, s, now));
340                }
341            }
342            WindowState::Session {
343                start,
344                last_seen,
345                items,
346            } => {
347                if !items.is_empty() {
348                    let s = start.unwrap_or(now);
349                    let e = last_seen.unwrap_or(now);
350                    let window_items = core::mem::take(items);
351                    let _ = out.emit(Window::new(window_items, s, e));
352                }
353            }
354            WindowState::Sliding { windows, .. } => {
355                while let Some(w) = windows.pop_front() {
356                    if !w.items.is_empty() {
357                        let _ = out.emit(Window::new(w.items, w.start, w.end));
358                    }
359                }
360            }
361        }
362        Ok(())
363    }
364}
365
366#[cfg(test)]
367mod tests {
368    use super::*;
369    use crate::emit::EmitError;
370    use std::sync::{Arc, Mutex};
371
372    // A deterministic Clock for tests. Each call to `now()` returns the
373    // current set time. Tests advance time explicitly.
374    #[derive(Clone)]
375    struct FakeClock {
376        inner: Arc<Mutex<Instant>>,
377    }
378
379    impl FakeClock {
380        fn new(start: Instant) -> Self {
381            Self {
382                inner: Arc::new(Mutex::new(start)),
383            }
384        }
385
386        fn advance(&self, by: Duration) {
387            let mut g = self.inner.lock().unwrap();
388            *g += by;
389        }
390    }
391
392    impl Clock for FakeClock {
393        fn now(&self) -> Instant {
394            *self.inner.lock().unwrap()
395        }
396    }
397
398    struct Collect<T> {
399        out: Vec<Window<T>>,
400    }
401    impl<T> Emit for Collect<T> {
402        type Item = Window<T>;
403        fn emit(&mut self, w: Window<T>) -> Result<(), EmitError> {
404            self.out.push(w);
405            Ok(())
406        }
407    }
408
409    #[test]
410    fn tumbling_emits_on_boundary() {
411        let t0 = Instant::now();
412        let clock = FakeClock::new(t0);
413        let mut stage = WindowStage::<u32, _>::new(
414            WindowPolicy::Tumbling {
415                size: Duration::from_secs(10),
416            },
417            clock.clone(),
418        );
419        let mut emit = Collect::<u32> { out: Vec::new() };
420
421        // t=0: item 1
422        stage.process(1, &mut emit).unwrap();
423        assert!(emit.out.is_empty());
424
425        // t=5: item 2 (still inside first window)
426        clock.advance(Duration::from_secs(5));
427        stage.process(2, &mut emit).unwrap();
428        assert!(emit.out.is_empty());
429
430        // t=10: item 3 (boundary; emit window [0, 10) with items 1, 2)
431        clock.advance(Duration::from_secs(5));
432        stage.process(3, &mut emit).unwrap();
433        assert_eq!(emit.out.len(), 1);
434        assert_eq!(emit.out[0].items(), &[1, 2]);
435
436        // t=20: item 4 (emit window [10, 20) with item 3)
437        clock.advance(Duration::from_secs(10));
438        stage.process(4, &mut emit).unwrap();
439        assert_eq!(emit.out.len(), 2);
440        assert_eq!(emit.out[1].items(), &[3]);
441
442        // Flush emits the remaining window with item 4.
443        stage.flush(&mut emit).unwrap();
444        assert_eq!(emit.out.len(), 3);
445        assert_eq!(emit.out[2].items(), &[4]);
446    }
447
448    #[test]
449    fn session_closes_after_idle() {
450        let t0 = Instant::now();
451        let clock = FakeClock::new(t0);
452        let mut stage = WindowStage::<u32, _>::new(
453            WindowPolicy::Session {
454                idle: Duration::from_secs(5),
455            },
456            clock.clone(),
457        );
458        let mut emit = Collect::<u32> { out: Vec::new() };
459
460        // t=0: 1
461        stage.process(1, &mut emit).unwrap();
462        // t=2: 2 (within session)
463        clock.advance(Duration::from_secs(2));
464        stage.process(2, &mut emit).unwrap();
465        // t=4: 3 (within session)
466        clock.advance(Duration::from_secs(2));
467        stage.process(3, &mut emit).unwrap();
468        assert!(emit.out.is_empty());
469
470        // t=20: 4 (>5s gap, closes prior session, starts new one)
471        clock.advance(Duration::from_secs(16));
472        stage.process(4, &mut emit).unwrap();
473        assert_eq!(emit.out.len(), 1);
474        assert_eq!(emit.out[0].items(), &[1, 2, 3]);
475
476        // Flush closes the second session.
477        stage.flush(&mut emit).unwrap();
478        assert_eq!(emit.out.len(), 2);
479        assert_eq!(emit.out[1].items(), &[4]);
480    }
481
482    #[test]
483    fn sliding_overlapping_windows() {
484        let t0 = Instant::now();
485        let clock = FakeClock::new(t0);
486        // size=10s, slide=5s: windows [0,10), [5,15), [10,20), ...
487        let mut stage = WindowStage::<u32, _>::new(
488            WindowPolicy::Sliding {
489                size: Duration::from_secs(10),
490                slide: Duration::from_secs(5),
491            },
492            clock.clone(),
493        );
494        let mut emit = Collect::<u32> { out: Vec::new() };
495
496        // t=0: item 1. Spawns window [0,10). Item belongs.
497        stage.process(1, &mut emit).unwrap();
498        // t=3: item 2. Still only window [0,10) active.
499        clock.advance(Duration::from_secs(3));
500        stage.process(2, &mut emit).unwrap();
501        // t=5: item 3. Spawns window [5,15). Item belongs to both [0,10) and [5,15).
502        clock.advance(Duration::from_secs(2));
503        stage.process(3, &mut emit).unwrap();
504        // t=10: item 4. Window [0,10) ends -> emit. Spawn [10,20). Item belongs to [5,15) and [10,20).
505        clock.advance(Duration::from_secs(5));
506        stage.process(4, &mut emit).unwrap();
507        assert_eq!(emit.out.len(), 1);
508        assert_eq!(emit.out[0].items(), &[1, 2, 3]);
509
510        // Flush emits remaining windows: [5,15) with {3,4}, [10,20) with {4}.
511        stage.flush(&mut emit).unwrap();
512        assert_eq!(emit.out.len(), 3);
513        assert_eq!(emit.out[1].items(), &[3, 4]);
514        assert_eq!(emit.out[2].items(), &[4]);
515    }
516
517    #[test]
518    fn tumbling_flush_emits_partial() {
519        let t0 = Instant::now();
520        let clock = FakeClock::new(t0);
521        let mut stage = WindowStage::<u32, _>::new(
522            WindowPolicy::Tumbling {
523                size: Duration::from_secs(10),
524            },
525            clock.clone(),
526        );
527        let mut emit = Collect::<u32> { out: Vec::new() };
528
529        stage.process(1, &mut emit).unwrap();
530        stage.process(2, &mut emit).unwrap();
531        stage.flush(&mut emit).unwrap();
532        assert_eq!(emit.out.len(), 1);
533        assert_eq!(emit.out[0].items(), &[1, 2]);
534    }
535
536    #[test]
537    fn window_into_inner_returns_items() {
538        let t = Instant::now();
539        let w = Window::new(alloc::vec![10u32, 20, 30], t, t);
540        assert_eq!(w.len(), 3);
541        assert!(!w.is_empty());
542        assert_eq!(w.into_inner(), alloc::vec![10, 20, 30]);
543    }
544}