sdf_metadata/metadata/operator/
window_properties.rs

1use crate::{
2    util::{validation_error::ValidationError, validation_failure::ValidationFailure},
3    wit::operator::{WindowKind, WindowProperties},
4};
5
6impl WindowProperties {
7    pub fn window_kind(&self) -> &WindowKind {
8        &self.kind
9    }
10
11    pub fn offset(&self) -> u64 {
12        match &self.kind {
13            WindowKind::Tumbling(tumbling) => tumbling.offset,
14            WindowKind::Sliding(sliding) => sliding.offset,
15        }
16    }
17
18    pub fn grace_period(&self) -> u64 {
19        self.watermark_config.grace_period.unwrap_or(0)
20    }
21
22    /// Returns the time interval for new windows
23    pub fn new_window_interval(&self) -> u64 {
24        match &self.kind {
25            WindowKind::Tumbling(tumbling) => tumbling.duration,
26            WindowKind::Sliding(sliding) => sliding.slide,
27        }
28    }
29
30    pub fn validate(&self) -> Result<(), ValidationFailure> {
31        // idleness should be larger than window duration
32        let mut errors = ValidationFailure::new();
33        if let Some(idleness) = self.watermark_config.idleness {
34            let window_duration = match &self.kind {
35                WindowKind::Tumbling(tumbling) => tumbling.duration,
36                WindowKind::Sliding(sliding) => sliding.duration,
37            };
38            if idleness < window_duration {
39                errors.push(&ValidationError::new(&format!(
40                    "idleness {} should be larger than window duration {}",
41                    idleness, window_duration
42                )));
43            }
44        }
45
46        if errors.any() {
47            return Err(errors);
48        }
49        Ok(())
50    }
51}