use bytes::{BufMut, Bytes, BytesMut};
use crate::processor::serde::{Serde, SerdeAssociate, SerdeError};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct Window {
pub start: i64,
pub end: i64,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct Windowed<K> {
pub key: K,
pub window: Window,
}
#[derive(Debug, Clone, Copy)]
pub struct TimeWindows {
pub size_ms: i64,
pub advance_ms: i64,
pub grace_ms: i64,
}
impl TimeWindows {
#[must_use]
pub fn of_size(size_ms: i64) -> Self {
assert!(size_ms > 0, "window size must be > 0");
Self {
size_ms,
advance_ms: size_ms,
grace_ms: 0,
}
}
#[must_use]
pub fn advance_by(mut self, advance_ms: i64) -> Self {
assert!(
advance_ms > 0 && advance_ms <= self.size_ms,
"0 < advance <= size"
);
self.advance_ms = advance_ms;
self
}
#[must_use]
pub fn grace(mut self, grace_ms: i64) -> Self {
assert!(grace_ms >= 0, "grace must be >= 0");
self.grace_ms = grace_ms;
self
}
#[must_use]
pub fn windows_for(&self, t: i64) -> Vec<i64> {
let mut start = (std::cmp::max(0, t - self.size_ms + self.advance_ms) / self.advance_ms)
* self.advance_ms;
let mut out = Vec::new();
while start <= t {
out.push(start);
start += self.advance_ms;
}
out
}
}
#[derive(Debug, Clone, Copy)]
pub struct JoinWindows {
pub before_ms: i64,
pub after_ms: i64,
pub grace_ms: i64,
}
impl JoinWindows {
#[must_use]
pub fn of(time_difference_ms: i64) -> Self {
assert!(time_difference_ms >= 0, "time difference must be >= 0");
Self {
before_ms: time_difference_ms,
after_ms: time_difference_ms,
grace_ms: 0,
}
}
#[must_use]
pub fn before(mut self, before_ms: i64) -> Self {
assert!(before_ms >= 0, "before must be >= 0");
self.before_ms = before_ms;
self
}
#[must_use]
pub fn after(mut self, after_ms: i64) -> Self {
assert!(after_ms >= 0, "after must be >= 0");
self.after_ms = after_ms;
self
}
#[must_use]
pub fn grace(mut self, grace_ms: i64) -> Self {
assert!(grace_ms >= 0, "grace must be >= 0");
self.grace_ms = grace_ms;
self
}
#[must_use]
pub fn size(&self) -> i64 {
self.before_ms + self.after_ms
}
}
#[derive(Debug, Clone, Copy)]
pub struct SessionWindows {
pub gap_ms: i64,
pub grace_ms: i64,
}
impl SessionWindows {
#[must_use]
pub fn of_inactivity_gap(gap_ms: i64) -> Self {
assert!(gap_ms > 0, "session gap must be > 0");
Self {
gap_ms,
grace_ms: 0,
}
}
#[must_use]
pub fn grace(mut self, grace_ms: i64) -> Self {
assert!(grace_ms >= 0, "grace must be >= 0");
self.grace_ms = grace_ms;
self
}
}
#[derive(Debug, Clone, Copy)]
pub struct SessionWindowedSerde<KS> {
inner: KS,
}
impl<KS> SessionWindowedSerde<KS> {
#[must_use]
pub fn new(inner: KS) -> Self {
Self { inner }
}
}
impl<K, KS> Serde<Windowed<K>> for SessionWindowedSerde<KS>
where
K: Send + Sync + 'static,
KS: Serde<K>,
{
fn serialize(&self, value: &Windowed<K>) -> Bytes {
let kb = self.inner.serialize(&value.key);
let mut b = BytesMut::with_capacity(kb.len() + 16);
b.extend_from_slice(&kb);
b.put_i64(value.window.end);
b.put_i64(value.window.start);
b.freeze()
}
fn deserialize(&self, bytes: &[u8]) -> Result<Windowed<K>, SerdeError> {
if bytes.len() < 16 {
return Err(SerdeError(format!(
"session key too short: {}",
bytes.len()
)));
}
let split = bytes.len() - 16;
let key = self.inner.deserialize(&bytes[..split])?;
let end = i64::from_be_bytes(bytes[split..split + 8].try_into().expect("8 bytes"));
let start = i64::from_be_bytes(bytes[split + 8..].try_into().expect("8 bytes"));
Ok(Windowed {
key,
window: Window { start, end },
})
}
}
impl<KS: SerdeAssociate> SerdeAssociate for SessionWindowedSerde<KS> {
type Target = Windowed<KS::Target>;
}
#[derive(Debug, Clone, Copy)]
pub struct TimeWindowedSerde<KS> {
inner: KS,
size_ms: i64,
}
impl<KS> TimeWindowedSerde<KS> {
#[must_use]
pub fn new(inner: KS, size_ms: i64) -> Self {
Self { inner, size_ms }
}
}
impl<K, KS> Serde<Windowed<K>> for TimeWindowedSerde<KS>
where
K: Send + Sync + 'static,
KS: Serde<K>,
{
fn serialize(&self, value: &Windowed<K>) -> Bytes {
let kb = self.inner.serialize(&value.key);
let mut b = BytesMut::with_capacity(kb.len() + 8);
b.extend_from_slice(&kb);
b.put_i64(value.window.start);
b.freeze()
}
fn deserialize(&self, bytes: &[u8]) -> Result<Windowed<K>, SerdeError> {
if bytes.len() < 8 {
return Err(SerdeError(format!(
"windowed key too short: {}",
bytes.len()
)));
}
let split = bytes.len() - 8;
let key = self.inner.deserialize(&bytes[..split])?;
let start = i64::from_be_bytes(bytes[split..].try_into().expect("8 bytes"));
Ok(Windowed {
key,
window: Window {
start,
end: start + self.size_ms,
},
})
}
}
impl<KS: SerdeAssociate> SerdeAssociate for TimeWindowedSerde<KS> {
type Target = Windowed<KS::Target>;
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn windows_for_tumbling_one_window() {
let w = TimeWindows::of_size(10);
assert_eq!(w.windows_for(0), vec![0]);
assert_eq!(w.windows_for(9), vec![0]);
assert_eq!(w.windows_for(10), vec![10]);
assert_eq!(w.windows_for(25), vec![20]);
}
#[test]
fn windows_for_hopping_overlaps() {
let w = TimeWindows::of_size(10).advance_by(5);
assert_eq!(w.windows_for(12), vec![5, 10]); assert_eq!(w.windows_for(0), vec![0]);
}
#[test]
fn join_windows_before_after_size() {
let w = JoinWindows::of(10);
assert_eq!((w.before_ms, w.after_ms, w.grace_ms), (10, 10, 0));
assert_eq!(w.size(), 20);
let a = JoinWindows::of(10).before(3).after(7).grace(5);
assert_eq!((a.before_ms, a.after_ms, a.grace_ms), (3, 7, 5));
assert_eq!(a.size(), 10);
}
#[test]
fn time_windowed_serde_round_trips_output_format() {
use crate::processor::serde::{Serde, StringSerde};
let s = TimeWindowedSerde::new(StringSerde, 10);
let wk = Windowed {
key: "k".to_string(),
window: Window { start: 20, end: 30 },
};
let b = s.serialize(&wk);
assert_eq!(b.len(), 9); assert_eq!(&b[1..9], &20i64.to_be_bytes());
let back = s.deserialize(&b).unwrap();
assert_eq!(back.key, "k");
assert_eq!(back.window, Window { start: 20, end: 30 }); }
#[test]
fn session_windows_gap_and_grace() {
let w = SessionWindows::of_inactivity_gap(60_000);
assert_eq!((w.gap_ms, w.grace_ms), (60_000, 0));
let g = SessionWindows::of_inactivity_gap(60_000).grace(5);
assert_eq!((g.gap_ms, g.grace_ms), (60_000, 5));
}
#[test]
fn session_windowed_serde_round_trips_end_then_start() {
use crate::processor::serde::{Serde, StringSerde};
let s = SessionWindowedSerde::new(StringSerde);
let wk = Windowed {
key: "k".to_string(),
window: Window { start: 5, end: 9 },
};
let b = s.serialize(&wk);
assert_eq!(b.len(), 17); assert_eq!(&b[1..9], &9i64.to_be_bytes()); assert_eq!(&b[9..17], &5i64.to_be_bytes()); let back = s.deserialize(&b).unwrap();
assert_eq!(back.key, "k");
assert_eq!(back.window, Window { start: 5, end: 9 });
}
}