sdf-metadata 0.14.0

metadata definition for core sdf
Documentation
use std::collections::BTreeMap;

use anyhow::{anyhow, Result};

use sdf_common::constants::DATAFLOW_STABLE_VERSION;
use sdf_common::version::ApiVersion;
use sdf_parser_core::config::transform::{TransformOperator, TypedState};
use sdf_parser_core::config::types::{MetadataTypeInner, MetadataTypeTagged, MetadataTypesMap};
use sdf_parser_package::pkg::{Function, PackageConfig, PackageWrapperV0_5_0};

use crate::wit::metadata::MetadataType;
use crate::wit::{
    package_interface::{PackageDefinition as PackageWit, PackageImport as PackageImportWit},
    dataflow::DevConfig as DevConfigWit,
    states::StateTyped as StateTypedWit,
    metadata::SdfKeyedState as SdfKeyedStateWit,
    operator::OperatorType as OperatorTypeWit,
};
use super::types::IntoBinding;
use super::{IntoTopicBindings, TryIntoBinding};

impl TryFrom<PackageConfig> for PackageWit {
    type Error = anyhow::Error;
    fn try_from(config: PackageConfig) -> Result<Self> {
        match config {
            PackageConfig::V0_4_0(wrapper) => wrapper.try_into_binding(&ApiVersion::V4),
            PackageConfig::V0_5_0(wrapper) => {
                let mut wit_config: PackageWit = wrapper.try_into_binding(&ApiVersion::V5)?;
                wit_config.api_version = DATAFLOW_STABLE_VERSION.to_string();
                Ok(wit_config)
            }
            PackageConfig::V0_6_0(wrapper) => {
                let mut wit_config: PackageWit = wrapper.try_into_binding(&ApiVersion::V6)?;
                wit_config.api_version = DATAFLOW_STABLE_VERSION.to_string();
                Ok(wit_config)
            }
        }
    }
}

impl TryIntoBinding for PackageWrapperV0_5_0 {
    type Target = PackageWit;

    fn try_into_binding(self, api_version: &ApiVersion) -> Result<Self::Target> {
        let types: MetadataTypesMap = self.types.try_into()?;
        let pkg_types = types
            .iter()
            .map(|(name, ty)| ty.to_owned().into_bindings(name, &types, api_version))
            .collect::<Result<Vec<_>>>()?
            .into_iter()
            .flatten()
            .collect::<Vec<MetadataType>>();

        let mut pkg_types_dedup = BTreeMap::new();
        for ty in pkg_types.iter() {
            if let Some(prev_ty) = pkg_types_dedup.insert(&ty.name, ty.clone()) {
                if prev_ty != *ty {
                    return Err(anyhow!(
                        "Type {} is defined multiple times with different definitions",
                        ty.name
                    ));
                }
            }
        }
        let pkg_types = pkg_types_dedup.into_values().collect::<Vec<_>>();

        Ok(PackageWit {
            api_version: DATAFLOW_STABLE_VERSION.to_string(),
            meta: self.meta.into(),
            imports: self
                .imports
                .into_iter()
                .map(|import| import.into())
                .collect::<Vec<PackageImportWit>>(),
            functions: self
                .functions
                .iter()
                .map(|operator| {
                    operator
                        .inner()
                        .to_owned()
                        .try_into()
                        .map(|op| (op, operator.to_owned().into()))
                })
                .collect::<Result<_>>()?,
            types: pkg_types,
            states: self
                .states
                .into_iter()
                .map(|(name, s)| StateTypedWit::try_from_wrapper(name, s, &types))
                .collect::<Result<_>>()?,
            dev: self.dev.map(|d| DevConfigWit {
                converter: d.converter.map(|c| c.into()),
                imports: d.imports.into_iter().map(|i| i.into()).collect(),
                topics: d
                    .topics
                    .into_iter()
                    .map(|(key, v)| (key.clone(), v.into_bindings(&key)))
                    .collect(),
            }),
        })
    }
}

impl From<TransformOperator> for OperatorTypeWit {
    fn from(operator: TransformOperator) -> Self {
        match operator {
            TransformOperator::Map(_) => OperatorTypeWit::Map,
            TransformOperator::Filter(_) => OperatorTypeWit::Filter,
            TransformOperator::FilterMap(_) => OperatorTypeWit::FilterMap,
            TransformOperator::FlatMap(_) => OperatorTypeWit::FlatMap,
        }
    }
}

impl From<Function> for OperatorTypeWit {
    fn from(operator: Function) -> Self {
        match operator {
            Function::Map(_) => OperatorTypeWit::Map,
            Function::Filter(_) => OperatorTypeWit::Filter,
            Function::FilterMap(_) => OperatorTypeWit::FilterMap,
            Function::FlatMap(_) => OperatorTypeWit::FlatMap,
            Function::AssignKey(_) => OperatorTypeWit::AssignKey,
            Function::AssignTimestamp(_) => OperatorTypeWit::AssignTimestamp,
            Function::UpdateState(_) => OperatorTypeWit::UpdateState,
            Function::WindowAggregate(_) => OperatorTypeWit::WindowAggregate,
        }
    }
}

impl StateTypedWit {
    fn try_from_wrapper(
        name: String,
        wrapper: TypedState,
        types: &MetadataTypesMap,
    ) -> Result<StateTypedWit> {
        if let MetadataTypeInner::MetadataTypeTagged(MetadataTypeTagged::KeyedState(key_value)) =
            wrapper.inner_type.ty
        {
            Ok(StateTypedWit {
                name,
                type_: SdfKeyedStateWit::try_from_wrapper(key_value, types)?,
            })
        } else {
            Err(anyhow!(
                "State {} must be a key value type. Found {}",
                name,
                wrapper.inner_type.ty()
            ))
        }
    }
}
#[cfg(test)]
mod test {
    use sdf_parser_package::parse_package;
    use crate::into_wit::pkg_into_wit::PackageWit;

    #[test]
    fn test_pkg_duplicated_types() {
        let config = "
apiVersion: 0.5.0
meta:
  name: pkg-types
  version: 0.1.0
  namespace: example
types:
  my-event:
    type: object
    properties:
      name:
        type: string
      value:
        type: object
        type-name: my-obj
        properties:
          a:
            type: string
          c:
            type: string
  my-obj:
    type: object
    properties:
      a:
        type: string
      b:
        type: string
";

        let pkg = parse_package(config).unwrap();
        let wit_pkg = PackageWit::try_from(pkg).expect_err("Expected error");

        assert_eq!(
            wit_pkg.to_string(),
            "Type my-obj is defined multiple times with different definitions"
        );
    }
}