sdf-metadata 0.14.0

metadata definition for core sdf
Documentation
use crate::{
    util::{validation_error::ValidationError, validation_failure::ValidationFailure},
    wit::operator::{WindowKind, WindowProperties},
};

impl WindowProperties {
    pub fn window_kind(&self) -> &WindowKind {
        &self.kind
    }

    pub fn offset(&self) -> u64 {
        match &self.kind {
            WindowKind::Tumbling(tumbling) => tumbling.offset,
            WindowKind::Sliding(sliding) => sliding.offset,
        }
    }

    pub fn grace_period(&self) -> u64 {
        self.watermark_config.grace_period.unwrap_or(0)
    }

    /// Returns the time interval for new windows
    pub fn new_window_interval(&self) -> u64 {
        match &self.kind {
            WindowKind::Tumbling(tumbling) => tumbling.duration,
            WindowKind::Sliding(sliding) => sliding.slide,
        }
    }

    pub fn validate(&self) -> Result<(), ValidationFailure> {
        // idleness should be larger than window duration
        let mut errors = ValidationFailure::new();
        if let Some(idleness) = self.watermark_config.idleness {
            let window_duration = match &self.kind {
                WindowKind::Tumbling(tumbling) => tumbling.duration,
                WindowKind::Sliding(sliding) => sliding.duration,
            };
            if idleness < window_duration {
                errors.push(&ValidationError::new(&format!(
                    "idleness {} should be larger than window duration {}",
                    idleness, window_duration
                )));
            }
        }

        if errors.any() {
            return Err(errors);
        }
        Ok(())
    }
}