zenoh-flow 0.5.0-alpha.2

Zenoh-Flow: a Zenoh-based data flow programming framework for computations that span from the cloud to the device.
//
// Copyright (c) 2021 - 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
//

use crate::model::descriptor::link::{CompositeInputDescriptor, CompositeOutputDescriptor};
use crate::model::descriptor::node::{try_load_descriptor_from_file, NodeDescriptor};
use crate::model::descriptor::LinkDescriptor;
use crate::prelude::PortId;
use crate::types::configuration::Merge;
use crate::types::{Configuration, NodeId};
use crate::utils::parse_uri;
use crate::zfresult::{ErrorKind, ZFResult as Result};
use crate::{bail, zferror};
use async_recursion::async_recursion;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;

/// Describes a simple operator.
///
/// Example:
///
///
/// ```yaml
/// id : PrintSink
/// uri: file://./target/release/libmy_op.so
/// configuration:
///   by: 10
/// inputs: [Number]
/// outputs: [Multiplied]
/// ```
///
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
pub struct OperatorDescriptor {
    pub id: NodeId,
    pub inputs: Vec<PortId>,
    pub outputs: Vec<PortId>,
    pub uri: Option<String>,
    pub configuration: Option<Configuration>,
}

impl std::fmt::Display for OperatorDescriptor {
    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
        write!(f, "{} - Kind: Operator (Simple)", self.id)
    }
}

impl OperatorDescriptor {
    /// Creates a new `OperatorDescriptor` from its YAML representation.
    ///
    ///  # Errors
    /// A variant error is returned if deserialization fails.
    pub fn from_yaml(data: &str) -> Result<Self> {
        let dataflow_descriptor = serde_yaml::from_str::<OperatorDescriptor>(data)
            .map_err(|e| zferror!(ErrorKind::ParsingError, e))?;
        Ok(dataflow_descriptor)
    }

    /// Creates a new `OperatorDescriptor` from its JSON representation.
    ///
    ///  # Errors
    /// A variant error is returned if deserialization fails.
    pub fn from_json(data: &str) -> Result<Self> {
        let dataflow_descriptor = serde_json::from_str::<OperatorDescriptor>(data)
            .map_err(|e| zferror!(ErrorKind::ParsingError, e))?;
        Ok(dataflow_descriptor)
    }

    /// Returns the JSON representation of the `OperatorDescriptor`.
    ///
    ///  # Errors
    /// A variant error is returned if serialization fails.
    pub fn to_json(&self) -> Result<String> {
        serde_json::to_string(&self).map_err(|e| zferror!(ErrorKind::SerializationError, e).into())
    }

    /// Returns the YAML representation of the `OperatorDescriptor`.
    ///
    ///  # Errors
    /// A variant error is returned if serialization fails.
    pub fn to_yaml(&self) -> Result<String> {
        serde_yaml::to_string(&self).map_err(|e| zferror!(ErrorKind::SerializationError, e).into())
    }
}

/// Describes a composite operator.
///
/// Example:
///
///
/// ```yaml
/// id: AMagicAIBasedOperator
/// configuration:
///   parameter1: value1
///   parameter2: value2
/// operators: # Operators composing the flow
///   - id: ComposedAIDownsampling
///     descritptor: [file://|...]/some/path/to/its/descritptor/file.yaml
///     configuration:
///       paramter3: value3
///   - id: MagicAI
///         descriptor: [file://|...]/some/path/to/its/descritptor/file.yaml
///   - id: MagicPostProcessing
///         descriptor: [file://|...]/some/path/to/its/descritptor/file.yaml
/// links:
/// - from:
///     node : ComposedAIDownsampling
///     output : Data
///   to:
///     node : MagicAI
///     input : In
/// - from:
///     node : MagicAI
///     output : Out
///   to:
///     node : MagicPostProcessing
///     input : Data
///
/// inputs:
///     - node: ComposedAIDownsampling
///       input: Data
/// outputs:
///   - node: MagicPostProcessing
///     output: Data
/// ```
///
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct CompositeOperatorDescriptor {
    pub id: NodeId,
    pub inputs: Vec<CompositeInputDescriptor>,
    pub outputs: Vec<CompositeOutputDescriptor>,
    pub operators: Vec<NodeDescriptor>,
    pub links: Vec<LinkDescriptor>,
    pub configuration: Option<Configuration>,
}

impl std::fmt::Display for CompositeOperatorDescriptor {
    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
        write!(f, "{} - Kind: Operator (Composite)", self.id)
    }
}

impl CompositeOperatorDescriptor {
    /// Creates a new `CompositeOperatorDescriptor` from its YAML representation.
    ///
    ///  # Errors
    /// A variant error is returned if deserialization fails.
    pub fn from_yaml(data: &str) -> Result<Self> {
        let dataflow_descriptor = serde_yaml::from_str::<CompositeOperatorDescriptor>(data)
            .map_err(|e| zferror!(ErrorKind::ParsingError, e))?;
        Ok(dataflow_descriptor)
    }

    /// Creates a new `CompositeOperatorDescriptor` from its JSON representation.
    ///
    ///  # Errors
    /// A variant error is returned if deserialization fails.
    pub fn from_json(data: &str) -> Result<Self> {
        let dataflow_descriptor = serde_json::from_str::<CompositeOperatorDescriptor>(data)
            .map_err(|e| zferror!(ErrorKind::ParsingError, e))?;
        Ok(dataflow_descriptor)
    }

    /// Returns the JSON representation of the `CompositeOperatorDescriptor`.
    ///
    ///  # Errors
    /// A variant error is returned if serialization fails.
    pub fn to_json(&self) -> Result<String> {
        serde_json::to_string(&self).map_err(|e| zferror!(ErrorKind::SerializationError, e).into())
    }

    /// Returns the YAML representation of the `CompositeOperatorDescriptor`.
    ///
    ///  # Errors
    /// A variant error is returned if serialization fails.
    pub fn to_yaml(&self) -> Result<String> {
        serde_yaml::to_string(&self).map_err(|e| zferror!(ErrorKind::SerializationError, e).into())
    }

    /// Flattens the `CompositeOperatorDescriptor` by loading all the composite operators
    ///
    ///  # Errors
    /// A variant error is returned if loading operators fails. Or if the node does not contains an
    /// operator.
    #[async_recursion]
    pub(crate) async fn flatten(
        mut self,
        composite_id: NodeId,
        links: &mut Vec<LinkDescriptor>,
        global_configuration: Option<Configuration>,
        ancestors: &mut Vec<String>,
    ) -> Result<Vec<OperatorDescriptor>> {
        log::trace!("[Descriptor] Flattening {}", self.id);
        let mut simple_operators = vec![];
        self.configuration = global_configuration.merge_overwrite(self.configuration);

        for o in self.operators {
            let description = match parse_uri(&o.descriptor)? {
                crate::model::ZFUri::File(path) => try_load_descriptor_from_file(path).await,
                crate::model::ZFUri::Builtin(_) => bail!(
                    ErrorKind::ConfigurationError,
                    "Builtin operators are not yet supported!"
                ),
            }?;

            let NodeDescriptor {
                id: operator_id,
                descriptor,
                configuration,
            } = o;

            let configuration = self.configuration.clone().merge_overwrite(configuration);

            let res_simple = OperatorDescriptor::from_yaml(&description);
            if let Ok(mut simple_operator) = res_simple {
                log::trace!(
                    "[Descriptor] Flattening {} - {} is simple",
                    self.id,
                    simple_operator.id
                );
                let new_id: NodeId = format!("{composite_id}/{operator_id}").into();

                let output_ids: HashMap<_, _> = self
                    .outputs
                    .iter()
                    .filter(|&output| output.node == operator_id)
                    .map(|output| (&output.id, &output.output))
                    .collect();

                let input_ids: HashMap<_, _> = self
                    .inputs
                    .iter()
                    .filter(|&input| input.node == operator_id)
                    .map(|input| (&input.id, &input.input))
                    .collect();

                // Updating all the links with the old id to the new ID
                for l in &mut self.links {
                    if l.from.node == operator_id {
                        log::trace!("Updating {} to {}", l.from.node, new_id);
                        l.from.node = new_id.clone();
                    }
                    if l.to.node == operator_id {
                        log::trace!("Updating {} to {}", l.to.node, new_id);
                        l.to.node = new_id.clone();
                    }
                }

                links
                    .iter_mut()
                    .filter(|link| {
                        link.from.node == composite_id
                            && output_ids.keys().contains(&&link.from.output)
                    })
                    .for_each(|link| {
                        link.from.node = new_id.clone();
                        link.from.output = (*output_ids.get(&&link.from.output).unwrap()).clone();
                    });

                links
                    .iter_mut()
                    .filter(|link| {
                        link.to.node == composite_id && input_ids.keys().contains(&&link.to.input)
                    })
                    .for_each(|link| {
                        link.to.node = new_id.clone();
                        link.to.input = (*input_ids.get(&&link.to.input).unwrap()).clone();
                    });

                // Updating the new id
                simple_operator.id = new_id;

                simple_operator.configuration = configuration
                    .clone()
                    .merge_overwrite(simple_operator.configuration);

                log::trace!(
                    "[Descriptor] Flattening {} - Pushing simple {}",
                    self.id,
                    simple_operator.id
                );
                // Adding in the list of operators
                simple_operators.push(simple_operator);

                continue;
            }

            let res_composite = CompositeOperatorDescriptor::from_yaml(&description);
            if let Ok(composite_operator) = res_composite {
                log::trace!(
                    "[Descriptor] Flattening {} - {} is composite",
                    self.id,
                    composite_operator.id
                );
                if let Ok(index) = ancestors.binary_search(&descriptor) {
                    bail!(
                        ErrorKind::GenericError, // FIXME Dedicated error?
                        "Possible recursion detected, < {} > would be included again after: {:?}",
                        descriptor,
                        &ancestors[index..]
                    );
                }
                ancestors.push(descriptor.clone());

                let mut operators = composite_operator
                    .flatten(operator_id, &mut self.links, configuration, ancestors)
                    .await?;

                for operator in operators.iter_mut() {
                    let new_id: NodeId = format!("{}/{}", composite_id, operator.id).into();
                    self.links
                        .iter_mut()
                        .filter(|link| link.from.node == operator.id || link.to.node == operator.id)
                        .for_each(|link| {
                            if link.from.node == operator.id {
                                link.from.node = new_id.clone();
                            }
                            if link.to.node == operator.id {
                                link.to.node = new_id.clone();
                            }
                        });

                    operator.id = new_id;
                }

                simple_operators.append(&mut operators);

                ancestors.pop();
                continue;
            }

            // If we arrive at that code it means that both attempts to parse the descriptor failed.
            log::error!(
                "Could not parse < {} > as either a Simple or a Composite Operator:",
                operator_id
            );
            log::error!("Simple: {:?}", res_simple.err().unwrap());
            log::error!("Composite: {:?}", res_composite.err().unwrap());

            bail!(
                ErrorKind::ParsingError,
                "Could not parse < {} >",
                operator_id
            );
        }

        links.append(&mut self.links);

        Ok(simple_operators)
    }
}

#[cfg(test)]
#[path = "../tests/flatten-composite.rs"]
mod tests;