use std::time::{Duration, Instant};
use super::Watermark;
pub trait WatermarkGenerator: Send {
fn on_event(&mut self, timestamp: i64) -> Option<Watermark>;
fn on_periodic(&mut self) -> Option<Watermark>;
fn current_watermark(&self) -> i64;
fn advance_watermark(&mut self, timestamp: i64) -> Option<Watermark>;
fn is_processing_time(&self) -> bool {
false
}
}
pub const DEFAULT_MAX_FUTURE_SKEW_MS: i64 = 5 * 60 * 1000;
#[inline]
fn is_grossly_future(timestamp: i64, skew_ms: i64) -> bool {
if skew_ms <= 0 {
return false;
}
let now = super::now_unix_millis();
now > 0 && timestamp > now.saturating_add(skew_ms)
}
pub struct BoundedOutOfOrdernessGenerator {
max_out_of_orderness: i64,
current_max_timestamp: i64,
current_watermark: i64,
max_future_skew_ms: i64,
}
impl BoundedOutOfOrdernessGenerator {
#[must_use]
pub fn new(max_out_of_orderness: i64) -> Self {
Self {
max_out_of_orderness,
current_max_timestamp: i64::MIN,
current_watermark: i64::MIN,
max_future_skew_ms: DEFAULT_MAX_FUTURE_SKEW_MS,
}
}
#[must_use]
pub fn with_max_future_skew(mut self, skew_ms: i64) -> Self {
self.max_future_skew_ms = skew_ms;
self
}
#[must_use]
#[allow(clippy::cast_possible_truncation)] pub fn from_duration(max_out_of_orderness: Duration) -> Self {
Self::new(max_out_of_orderness.as_millis() as i64)
}
#[must_use]
pub fn max_out_of_orderness(&self) -> i64 {
self.max_out_of_orderness
}
}
impl WatermarkGenerator for BoundedOutOfOrdernessGenerator {
#[inline]
fn on_event(&mut self, timestamp: i64) -> Option<Watermark> {
if is_grossly_future(timestamp, self.max_future_skew_ms) {
return None;
}
if timestamp > self.current_max_timestamp {
self.current_max_timestamp = timestamp;
let new_watermark = timestamp.saturating_sub(self.max_out_of_orderness);
if new_watermark > self.current_watermark {
self.current_watermark = new_watermark;
return Some(Watermark::new(new_watermark));
}
}
None
}
#[inline]
fn on_periodic(&mut self) -> Option<Watermark> {
None
}
#[inline]
fn current_watermark(&self) -> i64 {
self.current_watermark
}
#[inline]
fn advance_watermark(&mut self, timestamp: i64) -> Option<Watermark> {
if is_grossly_future(timestamp, self.max_future_skew_ms) {
return None;
}
if timestamp > self.current_watermark {
self.current_watermark = timestamp;
let min_max = timestamp.saturating_add(self.max_out_of_orderness);
if min_max > self.current_max_timestamp {
self.current_max_timestamp = min_max;
}
Some(Watermark::new(timestamp))
} else {
None
}
}
}
#[derive(Debug)]
pub struct AscendingTimestampsGenerator {
current_watermark: i64,
max_future_skew_ms: i64,
}
impl Default for AscendingTimestampsGenerator {
fn default() -> Self {
Self::new()
}
}
impl AscendingTimestampsGenerator {
#[must_use]
pub fn new() -> Self {
Self {
current_watermark: i64::MIN,
max_future_skew_ms: DEFAULT_MAX_FUTURE_SKEW_MS,
}
}
#[must_use]
pub fn with_max_future_skew(mut self, skew_ms: i64) -> Self {
self.max_future_skew_ms = skew_ms;
self
}
}
impl WatermarkGenerator for AscendingTimestampsGenerator {
#[inline]
fn on_event(&mut self, timestamp: i64) -> Option<Watermark> {
if is_grossly_future(timestamp, self.max_future_skew_ms) {
return None;
}
if timestamp > self.current_watermark {
self.current_watermark = timestamp;
Some(Watermark::new(timestamp))
} else {
None
}
}
#[inline]
fn on_periodic(&mut self) -> Option<Watermark> {
None
}
#[inline]
fn current_watermark(&self) -> i64 {
self.current_watermark
}
#[inline]
fn advance_watermark(&mut self, timestamp: i64) -> Option<Watermark> {
if is_grossly_future(timestamp, self.max_future_skew_ms) {
return None;
}
if timestamp > self.current_watermark {
self.current_watermark = timestamp;
Some(Watermark::new(timestamp))
} else {
None
}
}
}
pub struct PeriodicGenerator<G: WatermarkGenerator> {
inner: G,
period: Duration,
last_emit_time: Instant,
last_emitted_watermark: i64,
}
impl<G: WatermarkGenerator> PeriodicGenerator<G> {
#[must_use]
pub fn new(inner: G, period: Duration) -> Self {
Self {
inner,
period,
last_emit_time: Instant::now(),
last_emitted_watermark: i64::MIN,
}
}
#[must_use]
pub fn inner(&self) -> &G {
&self.inner
}
pub fn inner_mut(&mut self) -> &mut G {
&mut self.inner
}
}
impl<G: WatermarkGenerator> WatermarkGenerator for PeriodicGenerator<G> {
fn on_event(&mut self, timestamp: i64) -> Option<Watermark> {
let wm = self.inner.on_event(timestamp);
if let Some(ref w) = wm {
self.last_emitted_watermark = w.timestamp();
self.last_emit_time = Instant::now();
}
wm
}
fn on_periodic(&mut self) -> Option<Watermark> {
if self.last_emit_time.elapsed() >= self.period {
let current = self.inner.current_watermark();
if current > self.last_emitted_watermark {
self.last_emitted_watermark = current;
self.last_emit_time = Instant::now();
return Some(Watermark::new(current));
}
self.last_emit_time = Instant::now();
}
None
}
fn current_watermark(&self) -> i64 {
self.inner.current_watermark()
}
fn advance_watermark(&mut self, timestamp: i64) -> Option<Watermark> {
let wm = self.inner.advance_watermark(timestamp);
if let Some(ref w) = wm {
self.last_emitted_watermark = w.timestamp();
self.last_emit_time = Instant::now();
}
wm
}
fn is_processing_time(&self) -> bool {
self.inner.is_processing_time()
}
}
pub struct PunctuatedGenerator<F>
where
F: Fn(i64) -> Option<Watermark> + Send,
{
predicate: F,
current_watermark: i64,
}
impl<F> PunctuatedGenerator<F>
where
F: Fn(i64) -> Option<Watermark> + Send,
{
#[must_use]
pub fn new(predicate: F) -> Self {
Self {
predicate,
current_watermark: i64::MIN,
}
}
}
impl<F> WatermarkGenerator for PunctuatedGenerator<F>
where
F: Fn(i64) -> Option<Watermark> + Send,
{
fn on_event(&mut self, timestamp: i64) -> Option<Watermark> {
if let Some(wm) = (self.predicate)(timestamp) {
if wm.timestamp() > self.current_watermark {
self.current_watermark = wm.timestamp();
return Some(wm);
}
}
None
}
fn on_periodic(&mut self) -> Option<Watermark> {
None
}
fn current_watermark(&self) -> i64 {
self.current_watermark
}
fn advance_watermark(&mut self, timestamp: i64) -> Option<Watermark> {
if timestamp > self.current_watermark {
self.current_watermark = timestamp;
Some(Watermark::new(timestamp))
} else {
None
}
}
}
#[derive(Debug)]
pub struct WatermarkTracker {
source_watermarks: Vec<i64>,
combined_watermark: i64,
idle_sources: Vec<bool>,
last_activity: Vec<Instant>,
idle_timeout: Vec<Option<Duration>>,
}
impl WatermarkTracker {
#[must_use]
pub fn new(num_sources: usize) -> Self {
Self {
source_watermarks: vec![i64::MIN; num_sources],
combined_watermark: i64::MIN,
idle_sources: vec![false; num_sources],
last_activity: vec![Instant::now(); num_sources],
idle_timeout: vec![None; num_sources],
}
}
#[must_use]
pub fn with_idle_timeout(num_sources: usize, idle_timeout: Duration) -> Self {
Self {
source_watermarks: vec![i64::MIN; num_sources],
combined_watermark: i64::MIN,
idle_sources: vec![false; num_sources],
last_activity: vec![Instant::now(); num_sources],
idle_timeout: vec![Some(idle_timeout); num_sources],
}
}
pub fn set_idle_timeout(&mut self, source_id: usize, timeout: Option<Duration>) {
if let Some(slot) = self.idle_timeout.get_mut(source_id) {
*slot = timeout;
}
}
pub fn update_source(&mut self, source_id: usize, watermark: i64) -> Option<Watermark> {
if source_id >= self.source_watermarks.len() {
return None;
}
self.idle_sources[source_id] = false;
self.last_activity[source_id] = Instant::now();
if watermark > self.source_watermarks[source_id] {
self.source_watermarks[source_id] = watermark;
self.update_combined()
} else {
None
}
}
pub fn mark_idle(&mut self, source_id: usize) -> Option<Watermark> {
if source_id >= self.idle_sources.len() {
return None;
}
self.idle_sources[source_id] = true;
self.update_combined()
}
pub fn check_idle_sources(&mut self) -> Option<Watermark> {
let mut any_marked = false;
for i in 0..self.idle_sources.len() {
let Some(timeout) = self.idle_timeout[i] else {
continue; };
if !self.idle_sources[i] && self.last_activity[i].elapsed() >= timeout {
self.idle_sources[i] = true;
any_marked = true;
}
}
if any_marked {
self.update_combined()
} else {
None
}
}
#[must_use]
pub fn current_watermark(&self) -> Option<Watermark> {
if self.combined_watermark == i64::MIN {
None
} else {
Some(Watermark::new(self.combined_watermark))
}
}
#[must_use]
pub fn source_watermark(&self, source_id: usize) -> Option<i64> {
self.source_watermarks.get(source_id).copied()
}
#[must_use]
pub fn is_idle(&self, source_id: usize) -> bool {
self.idle_sources.get(source_id).copied().unwrap_or(false)
}
#[must_use]
pub fn num_sources(&self) -> usize {
self.source_watermarks.len()
}
#[must_use]
pub fn active_source_count(&self) -> usize {
self.idle_sources.iter().filter(|&&idle| !idle).count()
}
fn update_combined(&mut self) -> Option<Watermark> {
let mut min_watermark = i64::MAX;
let mut has_active = false;
for (i, &wm) in self.source_watermarks.iter().enumerate() {
if !self.idle_sources[i] {
has_active = true;
min_watermark = min_watermark.min(wm);
}
}
if !has_active {
min_watermark = self
.source_watermarks
.iter()
.copied()
.max()
.unwrap_or(i64::MIN);
}
if min_watermark > self.combined_watermark && min_watermark != i64::MAX {
self.combined_watermark = min_watermark;
Some(Watermark::new(min_watermark))
} else {
None
}
}
}
pub struct SourceProvidedGenerator {
source_watermark: i64,
fallback: BoundedOutOfOrdernessGenerator,
prefer_source: bool,
}
impl SourceProvidedGenerator {
#[must_use]
pub fn new(fallback_lateness: i64, prefer_source: bool) -> Self {
Self {
source_watermark: i64::MIN,
fallback: BoundedOutOfOrdernessGenerator::new(fallback_lateness),
prefer_source,
}
}
pub fn on_source_watermark(&mut self, watermark: i64) -> Option<Watermark> {
if watermark > self.source_watermark {
self.source_watermark = watermark;
if self.prefer_source || watermark > self.fallback.current_watermark() {
return Some(Watermark::new(watermark));
}
}
None
}
}
impl WatermarkGenerator for SourceProvidedGenerator {
fn on_event(&mut self, timestamp: i64) -> Option<Watermark> {
let fallback_wm = self.fallback.on_event(timestamp);
if self.prefer_source {
if self.source_watermark > i64::MIN {
return None; }
}
fallback_wm
}
fn on_periodic(&mut self) -> Option<Watermark> {
None
}
fn current_watermark(&self) -> i64 {
if self.prefer_source && self.source_watermark > i64::MIN {
self.source_watermark
} else {
self.fallback.current_watermark().max(self.source_watermark)
}
}
fn advance_watermark(&mut self, timestamp: i64) -> Option<Watermark> {
self.on_source_watermark(timestamp)
}
}
pub struct ProcessingTimeGenerator {
current_watermark: i64,
}
impl ProcessingTimeGenerator {
#[must_use]
pub fn new() -> Self {
Self {
current_watermark: i64::MIN,
}
}
}
impl Default for ProcessingTimeGenerator {
fn default() -> Self {
Self::new()
}
}
impl WatermarkGenerator for ProcessingTimeGenerator {
#[inline]
fn on_event(&mut self, _timestamp: i64) -> Option<Watermark> {
None
}
#[inline]
fn on_periodic(&mut self) -> Option<Watermark> {
let now = super::now_unix_millis();
if now > self.current_watermark {
self.current_watermark = now;
Some(Watermark::new(now))
} else {
None
}
}
#[inline]
fn current_watermark(&self) -> i64 {
self.current_watermark
}
#[inline]
fn advance_watermark(&mut self, timestamp: i64) -> Option<Watermark> {
if timestamp > self.current_watermark {
self.current_watermark = timestamp;
Some(Watermark::new(timestamp))
} else {
None
}
}
#[inline]
fn is_processing_time(&self) -> bool {
true
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_bounded_generator_first_event() {
let mut gen = BoundedOutOfOrdernessGenerator::new(100);
let wm = gen.on_event(1000);
assert_eq!(wm, Some(Watermark::new(900)));
assert_eq!(gen.current_watermark(), 900);
}
#[test]
fn processing_time_domain_is_reported_for_late_filter_skip() {
assert!(ProcessingTimeGenerator::new().is_processing_time());
assert!(!BoundedOutOfOrdernessGenerator::new(100).is_processing_time());
let p = Duration::from_millis(1);
assert!(PeriodicGenerator::new(ProcessingTimeGenerator::new(), p).is_processing_time());
assert!(
!PeriodicGenerator::new(BoundedOutOfOrdernessGenerator::new(100), p)
.is_processing_time()
);
}
#[test]
fn test_bounded_generator_out_of_order() {
let mut gen = BoundedOutOfOrdernessGenerator::new(100);
gen.on_event(1000);
let wm = gen.on_event(800);
assert_eq!(wm, None);
assert_eq!(gen.current_watermark(), 900); }
#[test]
fn test_bounded_generator_advancement() {
let mut gen = BoundedOutOfOrdernessGenerator::new(100);
gen.on_event(1000);
let wm = gen.on_event(1200);
assert_eq!(wm, Some(Watermark::new(1100)));
}
#[test]
fn test_bounded_generator_from_duration() {
let gen = BoundedOutOfOrdernessGenerator::from_duration(Duration::from_secs(5));
assert_eq!(gen.max_out_of_orderness(), 5000);
}
#[test]
fn test_bounded_generator_no_periodic() {
let mut gen = BoundedOutOfOrdernessGenerator::new(100);
assert_eq!(gen.on_periodic(), None);
}
#[test]
fn test_ascending_generator_advances_on_each_event() {
let mut gen = AscendingTimestampsGenerator::new();
let wm1 = gen.on_event(1000);
assert_eq!(wm1, Some(Watermark::new(1000)));
let wm2 = gen.on_event(2000);
assert_eq!(wm2, Some(Watermark::new(2000)));
}
#[test]
fn test_ascending_generator_ignores_backwards() {
let mut gen = AscendingTimestampsGenerator::new();
gen.on_event(2000);
let wm = gen.on_event(1000);
assert_eq!(wm, None);
assert_eq!(gen.current_watermark(), 2000);
}
#[test]
fn test_periodic_generator_passes_through() {
let inner = BoundedOutOfOrdernessGenerator::new(100);
let mut gen = PeriodicGenerator::new(inner, Duration::from_millis(100));
let wm = gen.on_event(1000);
assert_eq!(wm, Some(Watermark::new(900)));
}
#[test]
fn test_periodic_generator_inner_access() {
let inner = BoundedOutOfOrdernessGenerator::new(100);
let gen = PeriodicGenerator::new(inner, Duration::from_millis(100));
assert_eq!(gen.inner().max_out_of_orderness(), 100);
}
#[test]
fn test_punctuated_generator_predicate() {
let mut gen = PunctuatedGenerator::new(|ts| {
if ts % 1000 == 0 {
Some(Watermark::new(ts))
} else {
None
}
});
assert_eq!(gen.on_event(500), None);
assert_eq!(gen.on_event(999), None);
assert_eq!(gen.on_event(1000), Some(Watermark::new(1000)));
assert_eq!(gen.on_event(1500), None);
assert_eq!(gen.on_event(2000), Some(Watermark::new(2000)));
}
#[test]
fn test_punctuated_generator_no_regression() {
let mut gen = PunctuatedGenerator::new(|ts| Some(Watermark::new(ts)));
gen.on_event(2000);
let wm = gen.on_event(1000);
assert_eq!(wm, None);
assert_eq!(gen.current_watermark(), 2000);
}
#[test]
fn test_tracker_single_source() {
let mut tracker = WatermarkTracker::new(1);
let wm = tracker.update_source(0, 1000);
assert_eq!(wm, Some(Watermark::new(1000)));
assert_eq!(tracker.current_watermark(), Some(Watermark::new(1000)));
}
#[test]
fn test_tracker_multiple_sources() {
let mut tracker = WatermarkTracker::new(3);
tracker.update_source(0, 1000);
tracker.update_source(1, 2000);
let wm = tracker.update_source(2, 500);
assert_eq!(wm, Some(Watermark::new(500))); }
#[test]
fn test_tracker_min_watermark() {
let mut tracker = WatermarkTracker::new(2);
tracker.update_source(0, 5000);
tracker.update_source(1, 3000);
assert_eq!(tracker.current_watermark(), Some(Watermark::new(3000)));
tracker.update_source(1, 4000);
assert_eq!(tracker.current_watermark(), Some(Watermark::new(4000)));
}
#[test]
fn test_tracker_idle_source() {
let mut tracker = WatermarkTracker::new(2);
tracker.update_source(0, 5000);
tracker.update_source(1, 1000);
let wm = tracker.mark_idle(1);
assert_eq!(wm, Some(Watermark::new(5000)));
}
#[test]
fn check_idle_sources_advances_then_reactivation_is_monotone() {
let mut tracker = WatermarkTracker::with_idle_timeout(2, Duration::ZERO);
tracker.update_source(0, 5_000); tracker.update_source(1, 1_000); assert_eq!(tracker.current_watermark(), Some(Watermark::new(1_000)));
let advanced = tracker.check_idle_sources();
assert_eq!(advanced, Some(Watermark::new(5_000)));
assert_eq!(tracker.current_watermark(), Some(Watermark::new(5_000)));
let res = tracker.update_source(1, 1_500);
assert_eq!(res, None, "stale reactivation must not emit a regress");
assert_eq!(tracker.current_watermark(), Some(Watermark::new(5_000)));
tracker.update_source(0, 9_000);
tracker.update_source(1, 8_000);
assert_eq!(tracker.current_watermark(), Some(Watermark::new(8_000)));
}
#[test]
fn test_tracker_all_idle() {
let mut tracker = WatermarkTracker::new(2);
tracker.update_source(0, 5000);
tracker.update_source(1, 3000);
tracker.mark_idle(0);
let wm = tracker.mark_idle(1);
assert_eq!(wm, Some(Watermark::new(5000)));
}
#[test]
fn test_tracker_source_watermark() {
let mut tracker = WatermarkTracker::new(2);
tracker.update_source(0, 1000);
tracker.update_source(1, 2000);
assert_eq!(tracker.source_watermark(0), Some(1000));
assert_eq!(tracker.source_watermark(1), Some(2000));
assert_eq!(tracker.source_watermark(5), None); }
#[test]
fn test_tracker_active_source_count() {
let mut tracker = WatermarkTracker::new(3);
assert_eq!(tracker.active_source_count(), 3);
tracker.mark_idle(0);
assert_eq!(tracker.active_source_count(), 2);
tracker.mark_idle(2);
assert_eq!(tracker.active_source_count(), 1);
tracker.update_source(0, 1000);
assert_eq!(tracker.active_source_count(), 2);
}
#[test]
fn test_tracker_invalid_source() {
let mut tracker = WatermarkTracker::new(2);
let wm = tracker.update_source(5, 1000); assert_eq!(wm, None);
let wm = tracker.mark_idle(5);
assert_eq!(wm, None);
}
#[test]
fn test_source_provided_fallback() {
let mut gen = SourceProvidedGenerator::new(100, false);
let wm = gen.on_event(1000);
assert_eq!(wm, Some(Watermark::new(900))); }
#[test]
fn test_source_provided_explicit_watermark() {
let mut gen = SourceProvidedGenerator::new(100, true);
let wm = gen.on_source_watermark(500);
assert_eq!(wm, Some(Watermark::new(500)));
assert_eq!(gen.current_watermark(), 500);
}
#[test]
fn test_advance_watermark_bounded_generator() {
let mut gen = BoundedOutOfOrdernessGenerator::new(100);
let wm = gen.advance_watermark(500);
assert_eq!(wm, Some(Watermark::new(500)));
assert_eq!(gen.current_watermark(), 500);
let wm = gen.advance_watermark(800);
assert_eq!(wm, Some(Watermark::new(800)));
assert_eq!(gen.current_watermark(), 800);
let wm = gen.advance_watermark(600);
assert_eq!(wm, None);
assert_eq!(gen.current_watermark(), 800);
}
#[test]
fn test_advance_watermark_maintains_invariant() {
let mut gen = BoundedOutOfOrdernessGenerator::new(100);
gen.on_event(1000);
gen.advance_watermark(1200);
assert_eq!(gen.current_watermark(), 1200);
let wm = gen.on_event(1250);
assert_eq!(wm, None);
assert_eq!(gen.current_watermark(), 1200);
let wm = gen.on_event(1400);
assert_eq!(wm, Some(Watermark::new(1300)));
}
#[test]
fn test_advance_watermark_ascending_generator() {
let mut gen = AscendingTimestampsGenerator::new();
let wm = gen.advance_watermark(500);
assert_eq!(wm, Some(Watermark::new(500)));
assert_eq!(gen.current_watermark(), 500);
let wm = gen.advance_watermark(300);
assert_eq!(wm, None);
assert_eq!(gen.current_watermark(), 500);
let wm = gen.advance_watermark(1000);
assert_eq!(wm, Some(Watermark::new(1000)));
}
#[test]
fn test_advance_watermark_periodic_generator() {
let inner = BoundedOutOfOrdernessGenerator::new(100);
let mut gen = PeriodicGenerator::new(inner, Duration::from_millis(100));
let wm = gen.advance_watermark(500);
assert_eq!(wm, Some(Watermark::new(500)));
assert_eq!(gen.current_watermark(), 500);
let wm = gen.advance_watermark(300);
assert_eq!(wm, None);
}
#[test]
fn test_advance_watermark_punctuated_generator() {
let mut gen = PunctuatedGenerator::new(|ts| {
if ts % 1000 == 0 {
Some(Watermark::new(ts))
} else {
None
}
});
let wm = gen.advance_watermark(500);
assert_eq!(wm, Some(Watermark::new(500)));
assert_eq!(gen.current_watermark(), 500);
let wm = gen.advance_watermark(200);
assert_eq!(wm, None);
}
#[test]
fn test_advance_watermark_source_provided_generator() {
let mut gen = SourceProvidedGenerator::new(100, true);
let wm = gen.advance_watermark(500);
assert_eq!(wm, Some(Watermark::new(500)));
assert_eq!(gen.current_watermark(), 500);
let wm = gen.advance_watermark(300);
assert_eq!(wm, None);
}
#[test]
fn test_processing_time_generator_ignores_events() {
let mut gen = ProcessingTimeGenerator::new();
assert_eq!(gen.on_event(1000), None);
assert_eq!(gen.on_event(2000), None);
assert_eq!(gen.current_watermark(), i64::MIN);
}
#[test]
fn test_processing_time_generator_periodic() {
let mut gen = ProcessingTimeGenerator::new();
let wm = gen.on_periodic();
assert!(wm.is_some());
let ts = wm.unwrap().timestamp();
assert!(ts > 1_577_836_800_000, "timestamp too old: {ts}");
}
#[test]
fn test_processing_time_generator_advance_watermark() {
let mut gen = ProcessingTimeGenerator::new();
let wm = gen.advance_watermark(500);
assert_eq!(wm, Some(Watermark::new(500)));
assert_eq!(gen.current_watermark(), 500);
let wm = gen.advance_watermark(300);
assert_eq!(wm, None);
assert_eq!(gen.current_watermark(), 500);
let wm = gen.advance_watermark(1000);
assert_eq!(wm, Some(Watermark::new(1000)));
}
#[test]
fn test_processing_time_generator_default() {
let gen = ProcessingTimeGenerator::default();
assert_eq!(gen.current_watermark(), i64::MIN);
}
#[test]
fn future_skew_event_does_not_advance_watermark() {
let mut gen = BoundedOutOfOrdernessGenerator::new(0);
let now = crate::time::now_unix_millis();
assert_eq!(gen.on_event(now + 2 * 60 * 60 * 1000), None);
assert_eq!(gen.current_watermark(), i64::MIN);
assert_eq!(gen.on_event(now), Some(Watermark::new(now)));
}
}