use super::Event;
use smallvec::SmallVec;
use std::time::Duration;
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub enum EmitStrategy {
#[default]
OnWatermark,
Periodic(Duration),
OnUpdate,
OnWindowClose,
Changelog,
Final,
}
impl EmitStrategy {
#[must_use]
pub fn needs_periodic_timer(&self) -> bool {
matches!(self, Self::Periodic(_))
}
#[must_use]
pub fn periodic_interval(&self) -> Option<Duration> {
match self {
Self::Periodic(d) => Some(*d),
_ => None,
}
}
#[must_use]
pub fn emits_on_update(&self) -> bool {
matches!(self, Self::OnUpdate)
}
#[must_use]
pub fn emits_intermediate(&self) -> bool {
matches!(self, Self::OnUpdate | Self::Periodic(_))
}
#[must_use]
pub fn requires_changelog(&self) -> bool {
matches!(self, Self::Changelog)
}
#[must_use]
pub fn is_append_only_compatible(&self) -> bool {
matches!(self, Self::OnWindowClose | Self::Final)
}
#[must_use]
pub fn generates_retractions(&self) -> bool {
matches!(self, Self::OnWatermark | Self::OnUpdate | Self::Changelog)
}
#[must_use]
pub fn suppresses_intermediate(&self) -> bool {
matches!(self, Self::OnWindowClose | Self::Final)
}
#[must_use]
pub fn drops_late_data(&self) -> bool {
matches!(self, Self::Final)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct WindowId {
pub start: i64,
pub end: i64,
}
impl WindowId {
#[must_use]
pub fn new(start: i64, end: i64) -> Self {
Self { start, end }
}
#[must_use]
pub fn duration_ms(&self) -> i64 {
self.end - self.start
}
#[inline]
#[must_use]
pub fn to_key(&self) -> super::TimerKey {
super::TimerKey::from(self.to_key_inline())
}
#[inline]
#[must_use]
pub fn to_key_inline(&self) -> [u8; 16] {
let mut key = [0u8; 16];
key[..8].copy_from_slice(&self.start.to_be_bytes());
key[8..16].copy_from_slice(&self.end.to_be_bytes());
key
}
#[must_use]
pub fn from_key(key: &[u8]) -> Option<Self> {
if key.len() != 16 {
return None;
}
let start = i64::from_be_bytes(key[0..8].try_into().ok()?);
let end = i64::from_be_bytes(key[8..16].try_into().ok()?);
Some(Self { start, end })
}
}
pub type WindowIdVec = SmallVec<[WindowId; 4]>;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CdcOperation {
Insert,
Delete,
UpdateBefore,
UpdateAfter,
}
impl CdcOperation {
#[must_use]
pub fn weight(&self) -> i32 {
match self {
Self::Insert | Self::UpdateAfter => 1,
Self::Delete | Self::UpdateBefore => -1,
}
}
#[must_use]
pub fn is_insert(&self) -> bool {
matches!(self, Self::Insert | Self::UpdateAfter)
}
#[must_use]
pub fn is_delete(&self) -> bool {
matches!(self, Self::Delete | Self::UpdateBefore)
}
#[inline]
#[must_use]
pub fn to_u8(self) -> u8 {
match self {
Self::Insert => 0,
Self::Delete => 1,
Self::UpdateBefore => 2,
Self::UpdateAfter => 3,
}
}
#[inline]
#[must_use]
pub fn from_u8(value: u8) -> Option<Self> {
match value {
0 => Some(Self::Insert),
1 => Some(Self::Delete),
2 => Some(Self::UpdateBefore),
3 => Some(Self::UpdateAfter),
_ => None,
}
}
}
#[derive(Debug, Clone)]
pub struct ChangelogRecord {
pub operation: CdcOperation,
pub weight: i32,
pub emit_timestamp: i64,
pub event: Event,
}
impl ChangelogRecord {
#[must_use]
pub fn insert(event: Event, emit_timestamp: i64) -> Self {
Self {
operation: CdcOperation::Insert,
weight: 1,
emit_timestamp,
event,
}
}
#[must_use]
pub fn delete(event: Event, emit_timestamp: i64) -> Self {
Self {
operation: CdcOperation::Delete,
weight: -1,
emit_timestamp,
event,
}
}
#[must_use]
pub fn update(old_event: Event, new_event: Event, emit_timestamp: i64) -> (Self, Self) {
let before = Self {
operation: CdcOperation::UpdateBefore,
weight: -1,
emit_timestamp,
event: old_event,
};
let after = Self {
operation: CdcOperation::UpdateAfter,
weight: 1,
emit_timestamp,
event: new_event,
};
(before, after)
}
#[must_use]
pub fn new(operation: CdcOperation, event: Event, emit_timestamp: i64) -> Self {
Self {
operation,
weight: operation.weight(),
emit_timestamp,
event,
}
}
#[must_use]
pub fn is_insert(&self) -> bool {
self.operation.is_insert()
}
#[must_use]
pub fn is_delete(&self) -> bool {
self.operation.is_delete()
}
}
pub trait WindowAssigner: Send {
fn assign_windows(&self, timestamp: i64) -> WindowIdVec;
fn max_timestamp(&self, window_end: i64) -> i64 {
window_end - 1
}
}
#[derive(Debug, Clone)]
pub struct TumblingWindowAssigner {
size_ms: i64,
offset_ms: i64,
}
impl TumblingWindowAssigner {
#[must_use]
pub fn new(size: Duration) -> Self {
let size_ms = i64::try_from(size.as_millis()).expect("Window size must fit in i64");
assert!(size_ms > 0, "Window size must be positive");
Self {
size_ms,
offset_ms: 0,
}
}
#[must_use]
pub fn from_millis(size_ms: i64) -> Self {
assert!(size_ms > 0, "Window size must be positive");
Self {
size_ms,
offset_ms: 0,
}
}
#[must_use]
pub fn with_offset_ms(mut self, offset_ms: i64) -> Self {
self.offset_ms = offset_ms;
self
}
#[must_use]
pub fn size_ms(&self) -> i64 {
self.size_ms
}
#[must_use]
pub fn offset_ms(&self) -> i64 {
self.offset_ms
}
#[inline]
#[must_use]
pub fn assign(&self, timestamp: i64) -> WindowId {
let adjusted = timestamp - self.offset_ms;
let window_start = if adjusted >= 0 {
(adjusted / self.size_ms) * self.size_ms
} else {
((adjusted - self.size_ms + 1) / self.size_ms) * self.size_ms
};
let window_start = window_start + self.offset_ms;
WindowId::new(window_start, window_start + self.size_ms)
}
}
impl WindowAssigner for TumblingWindowAssigner {
#[inline]
fn assign_windows(&self, timestamp: i64) -> WindowIdVec {
let mut windows = WindowIdVec::new();
windows.push(self.assign(timestamp));
windows
}
}
#[cfg(test)]
mod tests;