mod cast;
mod duration_str;
mod event_time;
mod filter;
mod watermark;
pub use cast::{cast_to_millis_array, CastError};
pub use duration_str::parse_duration_str;
pub use event_time::{EventTimeError, EventTimeExtractor, ExtractionMode, TimestampField};
pub use filter::{filter_batch_by_timestamp, FilterError, ThresholdOp};
pub use watermark::{
AscendingTimestampsGenerator, BoundedOutOfOrdernessGenerator, PeriodicGenerator,
ProcessingTimeGenerator, PunctuatedGenerator, SourceProvidedGenerator, WatermarkGenerator,
WatermarkTracker, DEFAULT_MAX_FUTURE_SKEW_MS,
};
use smallvec::SmallVec;
use std::cmp::Ordering;
use std::collections::BinaryHeap;
use std::time::{SystemTime, UNIX_EPOCH};
#[must_use]
pub fn now_unix_millis() -> i64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_or(0, |d| i64::try_from(d.as_millis()).unwrap_or(i64::MAX))
}
pub type TimerKey = SmallVec<[u8; 16]>;
pub type FiredTimersVec = SmallVec<[TimerRegistration; 8]>;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct Watermark(pub i64);
impl Watermark {
#[inline]
#[must_use]
pub fn new(timestamp: i64) -> Self {
Self(timestamp)
}
#[inline]
#[must_use]
pub fn timestamp(&self) -> i64 {
self.0
}
#[inline]
#[must_use]
pub fn is_late(&self, event_time: i64) -> bool {
event_time < self.0
}
#[must_use]
pub fn min(self, other: Self) -> Self {
Self(self.0.min(other.0))
}
#[must_use]
pub fn max(self, other: Self) -> Self {
Self(self.0.max(other.0))
}
}
impl Default for Watermark {
fn default() -> Self {
Self(i64::MIN)
}
}
impl From<i64> for Watermark {
fn from(timestamp: i64) -> Self {
Self(timestamp)
}
}
impl From<Watermark> for i64 {
fn from(watermark: Watermark) -> Self {
watermark.0
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TimerRegistration {
pub id: u64,
pub timestamp: i64,
pub key: Option<TimerKey>,
pub operator_index: Option<usize>,
}
impl Ord for TimerRegistration {
fn cmp(&self, other: &Self) -> Ordering {
match other.timestamp.cmp(&self.timestamp) {
Ordering::Equal => other.id.cmp(&self.id),
ord => ord,
}
}
}
impl PartialOrd for TimerRegistration {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
const TIMER_WARN_THRESHOLD: usize = 100_000;
pub struct TimerService {
timers: BinaryHeap<TimerRegistration>,
next_timer_id: u64,
}
impl TimerService {
#[must_use]
pub fn new() -> Self {
Self {
timers: BinaryHeap::new(),
next_timer_id: 0,
}
}
#[must_use]
pub fn with_capacity(capacity: usize) -> Self {
Self {
timers: BinaryHeap::with_capacity(capacity),
next_timer_id: 0,
}
}
pub fn register_timer(
&mut self,
timestamp: i64,
key: Option<TimerKey>,
operator_index: Option<usize>,
) -> u64 {
let id = self.next_timer_id;
self.next_timer_id += 1;
self.timers.push(TimerRegistration {
id,
timestamp,
key,
operator_index,
});
if self.timers.len() == TIMER_WARN_THRESHOLD {
tracing::warn!(
pending = self.timers.len(),
"Timer heap reached {} pending timers — watermark may be stalled",
TIMER_WARN_THRESHOLD,
);
}
id
}
#[inline]
pub fn poll_timers(&mut self, current_time: i64) -> FiredTimersVec {
let mut fired = FiredTimersVec::new();
while let Some(timer) = self.timers.peek() {
if timer.timestamp <= current_time {
fired.push(self.timers.pop().expect("heap should not be empty"));
} else {
break;
}
}
fired
}
pub fn cancel_timer(&mut self, id: u64) -> bool {
let count_before = self.timers.len();
self.timers.retain(|t| t.id != id);
self.timers.len() < count_before
}
#[must_use]
pub fn pending_count(&self) -> usize {
self.timers.len()
}
#[must_use]
pub fn next_timer_timestamp(&self) -> Option<i64> {
self.timers.peek().map(|t| t.timestamp)
}
pub fn clear(&mut self) {
self.timers.clear();
}
}
impl Default for TimerService {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, thiserror::Error)]
pub enum TimeError {
#[error("Invalid timestamp: {0}")]
InvalidTimestamp(i64),
#[error("Timer not found: {0}")]
TimerNotFound(u64),
#[error("Watermark regression: current={current}, new={new}")]
WatermarkRegression {
current: i64,
new: i64,
},
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_watermark_creation() {
let watermark = Watermark::new(1000);
assert_eq!(watermark.timestamp(), 1000);
}
#[test]
fn test_watermark_late_detection() {
let watermark = Watermark::new(1000);
assert!(watermark.is_late(999));
assert!(!watermark.is_late(1000));
assert!(!watermark.is_late(1001));
}
#[test]
fn test_watermark_min_max() {
let w1 = Watermark::new(1000);
let w2 = Watermark::new(2000);
assert_eq!(w1.min(w2), Watermark::new(1000));
assert_eq!(w1.max(w2), Watermark::new(2000));
}
#[test]
fn test_watermark_ordering() {
let w1 = Watermark::new(1000);
let w2 = Watermark::new(2000);
assert!(w1 < w2);
assert!(w2 > w1);
assert_eq!(w1, Watermark::new(1000));
}
#[test]
fn test_watermark_conversions() {
let wm = Watermark::from(1000i64);
assert_eq!(wm.timestamp(), 1000);
let ts: i64 = wm.into();
assert_eq!(ts, 1000);
}
#[test]
fn test_watermark_default() {
let wm = Watermark::default();
assert_eq!(wm.timestamp(), i64::MIN);
}
#[test]
fn test_timer_service_creation() {
let service = TimerService::new();
assert_eq!(service.pending_count(), 0);
assert_eq!(service.next_timer_timestamp(), None);
}
#[test]
fn test_timer_registration() {
let mut service = TimerService::new();
let id1 = service.register_timer(100, None, None);
let id2 = service.register_timer(50, Some(TimerKey::from_slice(&[1, 2, 3])), Some(1));
assert_eq!(service.pending_count(), 2);
assert_ne!(id1, id2);
}
#[test]
fn test_timer_poll_order() {
let mut service = TimerService::new();
let id1 = service.register_timer(100, None, None);
let id2 = service.register_timer(50, Some(TimerKey::from_slice(&[1, 2, 3])), Some(0));
let _id3 = service.register_timer(150, None, None);
let fired = service.poll_timers(75);
assert_eq!(fired.len(), 1);
assert_eq!(fired[0].id, id2);
assert_eq!(fired[0].key, Some(TimerKey::from_slice(&[1, 2, 3])));
let fired = service.poll_timers(125);
assert_eq!(fired.len(), 1);
assert_eq!(fired[0].id, id1);
let fired = service.poll_timers(200);
assert_eq!(fired.len(), 1);
assert_eq!(service.pending_count(), 0);
}
#[test]
fn test_timer_poll_multiple() {
let mut service = TimerService::new();
service.register_timer(50, None, None);
service.register_timer(75, None, None);
service.register_timer(100, None, None);
let fired = service.poll_timers(80);
assert_eq!(fired.len(), 2);
assert_eq!(fired[0].timestamp, 50);
assert_eq!(fired[1].timestamp, 75);
}
#[test]
fn test_timer_cancel() {
let mut service = TimerService::new();
let id1 = service.register_timer(100, None, None);
let id2 = service.register_timer(200, None, None);
assert!(service.cancel_timer(id1));
assert_eq!(service.pending_count(), 1);
assert!(!service.cancel_timer(id1));
assert!(service.cancel_timer(id2));
assert_eq!(service.pending_count(), 0);
}
#[test]
fn test_timer_next_timestamp() {
let mut service = TimerService::new();
assert_eq!(service.next_timer_timestamp(), None);
service.register_timer(100, None, None);
assert_eq!(service.next_timer_timestamp(), Some(100));
service.register_timer(50, None, None);
assert_eq!(service.next_timer_timestamp(), Some(50));
}
#[test]
fn test_timer_clear() {
let mut service = TimerService::new();
service.register_timer(100, None, None);
service.register_timer(200, None, None);
service.register_timer(300, None, None);
service.clear();
assert_eq!(service.pending_count(), 0);
assert_eq!(service.next_timer_timestamp(), None);
}
#[test]
fn test_bounded_watermark_generator() {
let mut generator = BoundedOutOfOrdernessGenerator::new(100);
let wm1 = generator.on_event(1000);
assert_eq!(wm1, Some(Watermark::new(900)));
let wm2 = generator.on_event(800);
assert!(wm2.is_none());
let wm3 = generator.on_event(1200);
assert_eq!(wm3, Some(Watermark::new(1100)));
}
#[test]
fn test_ascending_watermark_generator() {
let mut generator = AscendingTimestampsGenerator::new();
let wm1 = generator.on_event(1000);
assert_eq!(wm1, Some(Watermark::new(1000)));
let wm2 = generator.on_event(2000);
assert_eq!(wm2, Some(Watermark::new(2000)));
let wm3 = generator.on_event(1500);
assert_eq!(wm3, None);
}
#[test]
fn test_watermark_tracker_basic() {
let mut tracker = WatermarkTracker::new(2);
tracker.update_source(0, 1000);
let wm = tracker.update_source(1, 500);
assert_eq!(wm, Some(Watermark::new(500)));
}
#[test]
fn test_watermark_tracker_idle() {
let mut tracker = WatermarkTracker::new(2);
tracker.update_source(0, 5000);
tracker.update_source(1, 1000);
let wm = tracker.mark_idle(1);
assert_eq!(wm, Some(Watermark::new(5000)));
assert!(tracker.is_idle(1));
assert!(!tracker.is_idle(0));
}
}