use super::window::{WindowAssigner, WindowId, WindowIdVec};
use std::time::Duration;
#[derive(Debug, Clone)]
pub struct SlidingWindowAssigner {
size_ms: i64,
slide_ms: i64,
windows_per_event: usize,
offset_ms: i64,
}
impl SlidingWindowAssigner {
#[must_use]
pub fn new(size: Duration, slide: Duration) -> Self {
let size_ms = i64::try_from(size.as_millis()).expect("Window size must fit in i64");
let slide_ms = i64::try_from(slide.as_millis()).expect("Slide interval must fit in i64");
assert!(size_ms > 0, "Window size must be positive");
assert!(slide_ms > 0, "Slide interval must be positive");
assert!(
slide_ms <= size_ms,
"Slide must not exceed size (use tumbling windows for non-overlapping)"
);
let windows_per_event = usize::try_from((size_ms + slide_ms - 1) / slide_ms)
.expect("Windows per event should fit in usize");
Self {
size_ms,
slide_ms,
windows_per_event,
offset_ms: 0,
}
}
#[must_use]
#[allow(clippy::cast_sign_loss)]
pub fn from_millis(size_ms: i64, slide_ms: i64) -> Self {
assert!(size_ms > 0, "Window size must be positive");
assert!(slide_ms > 0, "Slide interval must be positive");
assert!(
slide_ms <= size_ms,
"Slide must not exceed size (use tumbling windows for non-overlapping)"
);
let windows_per_event =
usize::try_from((size_ms + slide_ms - 1) / slide_ms).unwrap_or(usize::MAX);
Self {
size_ms,
slide_ms,
windows_per_event,
offset_ms: 0,
}
}
#[must_use]
pub fn with_offset_ms(mut self, offset_ms: i64) -> Self {
self.offset_ms = offset_ms;
self
}
#[must_use]
pub fn size_ms(&self) -> i64 {
self.size_ms
}
#[must_use]
pub fn slide_ms(&self) -> i64 {
self.slide_ms
}
#[must_use]
pub fn windows_per_event(&self) -> usize {
self.windows_per_event
}
#[must_use]
pub fn offset_ms(&self) -> i64 {
self.offset_ms
}
#[inline]
fn last_window_start(&self, timestamp: i64) -> i64 {
let adjusted = timestamp - self.offset_ms;
let base = if adjusted >= 0 {
(adjusted / self.slide_ms) * self.slide_ms
} else {
(adjusted.saturating_sub(self.slide_ms).saturating_add(1) / self.slide_ms)
* self.slide_ms
};
base + self.offset_ms
}
}
impl WindowAssigner for SlidingWindowAssigner {
#[inline]
fn assign_windows(&self, timestamp: i64) -> WindowIdVec {
let mut windows = WindowIdVec::new();
let last_start = self.last_window_start(timestamp);
let mut window_start = last_start;
while window_start + self.size_ms > timestamp {
let window_end = window_start + self.size_ms;
windows.push(WindowId::new(window_start, window_end));
let prev = window_start;
window_start = window_start.saturating_sub(self.slide_ms);
if window_start == prev {
break;
}
}
windows.reverse();
windows
}
fn max_timestamp(&self, window_end: i64) -> i64 {
window_end - 1
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_sliding_assigner_basic() {
let assigner = SlidingWindowAssigner::from_millis(10_000, 5_000);
let windows = assigner.assign_windows(7_000);
assert_eq!(windows.len(), 2);
assert_eq!(windows[0].start, 0);
assert_eq!(windows[0].end, 10_000);
assert_eq!(windows[1].start, 5_000);
assert_eq!(windows[1].end, 15_000);
}
#[test]
fn test_sliding_assigner_windows_per_event() {
let assigner = SlidingWindowAssigner::from_millis(10_000, 5_000);
assert_eq!(assigner.windows_per_event(), 2);
let assigner = SlidingWindowAssigner::from_millis(15_000, 5_000);
assert_eq!(assigner.windows_per_event(), 3);
}
}