sdf-metadata 0.14.0

metadata definition for core sdf
Documentation
use anyhow::Result;

use crate::{
    metadata::io::topic::KVSchemaType,
    util::{
        sdf_types_map::SdfTypesMap, validation_error::ValidationError,
        validation_failure::ValidationFailure,
    },
    wit::{
        dataflow::PostTransforms,
        operator::{OperatorType, StepInvocation},
    },
};

impl PostTransforms {
    pub fn operators(&self) -> Vec<(StepInvocation, OperatorType)> {
        match self {
            PostTransforms::AssignTimestamp(window) => window.operators(),
            PostTransforms::Partition(partition) => partition.operators(),
        }
    }
    pub fn output_type(&self, input_type: KVSchemaType) -> Result<KVSchemaType, ValidationError> {
        match self {
            PostTransforms::AssignTimestamp(window) => window.output_type(input_type),
            PostTransforms::Partition(partition) => partition.output_type(input_type),
        }
    }

    pub fn validate(
        &self,
        types: &SdfTypesMap,
        expected_input_type: &KVSchemaType,
    ) -> Result<(), ValidationFailure> {
        let mut errors = ValidationFailure::new();

        match self {
            PostTransforms::AssignTimestamp(window) => {
                if let Err(window_error) =
                    window.validate(types, expected_input_type, "transforms block")
                {
                    errors.concat_with_context("Window", &window_error)
                }
            }
            PostTransforms::Partition(partition) => {
                if let Err(partition_error) =
                    partition.validate(types, expected_input_type, "transforms block")
                {
                    errors.concat_with_context("Partition", &partition_error)
                }
            }
        }

        if errors.any() {
            Err(errors)
        } else {
            Ok(())
        }
    }

    #[cfg(feature = "parser")]
    pub fn update_inline_operators(&mut self) -> Result<()> {
        match self {
            PostTransforms::AssignTimestamp(window) => window.update_inline_operators(),
            PostTransforms::Partition(partition) => partition.update_inline_operators(),
        }
    }
}

#[cfg(test)]
mod test {
    use crate::wit::operator::{
        PartitionOperator, TransformOperator, PostTransforms, StepInvocation, Transforms, Window,
    };

    #[test]
    fn test_operators() {
        let post_transforms = PostTransforms::AssignTimestamp(Window {
            partition: Some(PartitionOperator {
                assign_key: StepInvocation {
                    uses: "my-assign".into(),
                    ..Default::default()
                },
                transforms: Transforms {
                    steps: vec![TransformOperator::Map(StepInvocation {
                        uses: "my-map".into(),
                        ..Default::default()
                    })],
                },
                update_state: None,
            }),
            flush: Some(StepInvocation {
                uses: "my-flush".into(),
                ..Default::default()
            }),
            assign_timestamp: StepInvocation {
                uses: "my-assign-timestamp".into(),
                ..Default::default()
            },
            ..Default::default()
        });

        let op = post_transforms.operators();
        assert_eq!(op.len(), 4);
        assert_eq!(op[0].0.uses, "my-assign-timestamp");
        assert_eq!(op[1].0.uses, "my-assign");
        assert_eq!(op[2].0.uses, "my-map");
        assert_eq!(op[3].0.uses, "my-flush");
    }
}