1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
use std::cmp::max;
use crate::stream_engine::time::{
SpringDuration, SpringEventDuration, SpringTimestamp, MIN_TIMESTAMP,
};
#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Debug)]
pub struct Watermark {
max_rowtime: SpringTimestamp,
allowed_delay: SpringEventDuration,
}
impl Watermark {
pub fn new(allowed_delay: SpringEventDuration) -> Self {
Self {
max_rowtime: MIN_TIMESTAMP + allowed_delay.to_duration(),
allowed_delay,
}
}
pub fn as_timestamp(&self) -> SpringTimestamp {
self.max_rowtime - self.allowed_delay.to_duration()
}
pub fn update(&mut self, rowtime: SpringTimestamp) {
self.max_rowtime = max(rowtime, self.max_rowtime);
}
}