flowcore 1.0.0

Structures shared between runtime and clients
Documentation
use std::fmt;

use log::debug;
use serde_derive::{Deserialize, Serialize};

use crate::errors::{Result, ResultExt};
use crate::model::datatype::DataType;
use crate::model::io::IO;
use crate::model::name::Name;
use crate::model::route::HasRoute;
use crate::model::route::Route;
use crate::model::validation::Validate;

/// `Connection` defines a connection between the output of one function or flow to the input
/// of another function or flow and maybe optionally named for legibility/debugging.
#[derive(Deserialize, Serialize, Default, Debug, Clone)]
#[serde(deny_unknown_fields)]
pub struct Connection {
    /// Optional name given to a connection for legibility and debugging
    #[serde(default, skip_serializing_if = "String::is_empty")]
    name: Name,
    /// `from` defines the origin of the connection
    #[serde(deserialize_with = "super::route::route_string")]
    from: Route,
    /// `to` defines the destination(s) of this connection
    #[serde(deserialize_with = "super::route::route_or_route_array")]
    to: Vec<Route>,
    /// `from_io` is used during the compilation process and refers to a found output for the connection
    // TODO make these references, not clones
    #[serde(skip)]
    from_io: IO,
    /// `to_io` is used during the compilation process and refers to a found input for the connection
    #[serde(skip)]
    to_io: IO,
    /// `level` defines at what level in the flow hierarchy this connections belongs. It is used
    /// when collapsing connections to reduce work and avoid infinite recursion
    #[serde(skip)]
    level: usize,
    /// Whether the collapsed connection was routed via a higher-level flow,
    /// making the destination input external to its flow
    #[serde(skip)]
    external_input: bool,
}

/// `Direction` defines whether a `Connection` is coming from an IO or to an IO
#[derive(Debug)]
#[allow(clippy::upper_case_acronyms)]
pub enum Direction {
    /// The `Connection` is `FROM` this IO to another
    FROM,
    /// The `Connection` is `TO` this IO from another
    TO,
}

impl fmt::Display for Connection {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        match (self.from_io.flow_io(), self.to_io.flow_io()) {
            (true, true) => write!(
                f,
                "(f){} --> {}(f)",
                self.from_io.route(),
                self.to_io.route()
            ),
            (true, false) => write!(f, "(f){} --> {}", self.from_io.route(), self.to_io.route()),
            (false, true) => write!(
                f,
                "   {} --> {}(f)",
                self.from_io.route(),
                self.to_io.route()
            ),
            (false, false) => write!(f, "   {} --> {}", self.from_io.route(), self.to_io.route()),
        }
    }
}

impl Validate for Connection {
    // TODO maybe validate some of the combinations here that are checked in build_connections here?
    fn validate(&self) -> Result<()> {
        self.name.validate()?;
        self.from.validate()?;
        for destination in &self.to {
            destination.validate()?;
        }
        Ok(())
    }
}

impl Connection {
    /// Create a new Route with `from_route` as the source `Route` and `to_route` as the destination
    pub fn new<R>(from_route: R, to_route: R) -> Self
    where
        R: Into<Route>,
    {
        Connection {
            from: from_route.into(),
            to: vec![to_route.into()],
            ..Default::default()
        }
    }

    /// Create a new named Connection
    pub fn new_named<N, R>(name: N, from_route: R, to_route: R) -> Self
    where
        N: Into<Name>,
        R: Into<Route>,
    {
        Connection {
            name: name.into(),
            from: from_route.into(),
            to: vec![to_route.into()],
            ..Default::default()
        }
    }

    /// Return the name
    #[must_use]
    pub fn name(&self) -> &Name {
        &self.name
    }

    /// Set the connection name
    pub fn set_name(&mut self, name: Name) {
        self.name = name;
    }

    /// Return the `from` Route
    #[must_use]
    pub fn from(&self) -> &Route {
        &self.from
    }

    /// Set the `from` Route
    pub fn set_from<R: Into<Route>>(&mut self, from: R) {
        self.from = from.into();
    }

    /// Return the `to` Routes
    #[must_use]
    pub fn to(&self) -> &Vec<Route> {
        &self.to
    }

    /// Set the `to` Routes
    pub fn set_to(&mut self, to: Vec<Route>) {
        self.to = to;
    }

    /// Connect the `from_io` to the `to_io` inside a flow at level `level`, if they are compatible
    ///
    /// # Errors
    ///
    /// Will return `Err` if the source and destinations do not have any compatible types
    pub fn connect(&mut self, from_io: IO, to_io: IO, level: usize) -> Result<()> {
        let from_str = self.from.to_string();
        let io_route = from_io.route().to_string();
        let from_io_subroute = from_str.strip_prefix(&io_route).unwrap_or("");
        DataType::compatible_types(
            from_io.datatypes(),
            to_io.datatypes(),
            &Route::from(from_io_subroute),
        )
        .chain_err(|| {
            format!(
                "Incompatible source and destination types:\n\
            Source:      '{}/{from_io_subroute}' of types {:?}\n\
            Destination: '{}' of types {:?}",
                from_io.route(),
                from_io.datatypes(),
                to_io.route(),
                to_io.datatypes()
            )
        })?;
        debug!(
            "Connection built from '{}' to '{}'",
            from_io.route(),
            to_io.route()
        );
        self.from_io = from_io;
        self.to_io = to_io;
        self.level = level;

        Ok(())
    }

    /// Return a reference to the `from_io`
    #[must_use]
    pub fn from_io(&self) -> &IO {
        &self.from_io
    }

    /// Return a mutable reference to the `from_io`
    pub fn from_io_mut(&mut self) -> &mut IO {
        &mut self.from_io
    }

    /// Return a reference to the `to_io`
    #[must_use]
    pub fn to_io(&self) -> &IO {
        &self.to_io
    }

    /// Return a mutable reference to the `to_io`
    pub fn to_io_mut(&mut self) -> &mut IO {
        &mut self.to_io
    }

    /// Get at what level in the flow hierarchy this connection exists (source)
    #[must_use]
    pub fn level(&self) -> usize {
        self.level
    }

    /// Whether the destination input is external to its flow (connection was
    /// routed via a higher-level flow during collapsing)
    #[must_use]
    pub fn external_input(&self) -> bool {
        self.external_input
    }

    /// Mark the destination input as external to its flow
    pub fn set_external_input(&mut self) {
        self.external_input = true;
    }
}

#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used)]
mod test {
    use url::Url;

    use crate::deserializers::deserializer::get;
    use crate::errors::Result;
    use crate::model::validation::Validate;

    use super::Connection;

    fn toml_from_str(content: &str) -> Result<Connection> {
        let url = Url::parse("file:///fake.toml").expect("Could not parse URL");
        let deserializer = get::<Connection>(&url).expect("Could not get deserializer");
        deserializer.deserialize(content, Some(&url))
    }

    #[test]
    fn single_destination_deserialization() {
        let input_str = r#"
        from = "source"
        to = "destination"
        "#;

        let connection: Result<Connection> = toml_from_str(input_str);
        assert!(connection.is_ok(), "Could not deserialize Connection");
    }

    #[test]
    fn multiple_destination_deserialization() {
        let input_str = r#"
        from = "source"
        to = ["destination", "destination2"]
        "#;

        let connection: Result<Connection> = toml_from_str(input_str);
        assert!(connection.is_ok(), "Could not deserialize Connection");
    }

    #[test]
    fn display_connection() {
        let connection1 = Connection::new("input/number", "process_1");
        println!("Connection: {connection1}");
    }

    #[test]
    fn validate_connection() {
        let connection1 = Connection::new("input/number", "process_1");
        assert!(connection1.validate().is_ok());
    }

    #[test]
    fn deserialize_extra_field_fails() {
        let input_str = r#"
        name = "input"
        foo = "extra token"
        type = "object"
        "#;

        let connection: Result<Connection> = toml_from_str(input_str);
        assert!(
            connection.is_err(),
            "Deserialized invalid connection TOML without error, but should not have."
        );
    }

    use crate::model::datatype::DataType;
    use crate::model::io::IO;

    #[test]
    fn connect_with_subroute_compatible() {
        // Subroute /1 selects a string from array/string, compatible with string input
        let mut conn = Connection::new("/flow/get/string/1", "/flow/process/input");
        let from_io = IO::new(vec![DataType::from("array/string")], "/flow/get/string");
        let to_io = IO::new(vec![DataType::from("string")], "/flow/process/input");
        assert!(
            conn.connect(from_io, to_io, 0).is_ok(),
            "Subroute selection from array/string to string should be compatible"
        );
    }

    #[test]
    fn connect_without_subroute_compatible() {
        let mut conn = Connection::new("/flow/get/strings", "/flow/process/input");
        let from_io = IO::new(vec![DataType::from("array/string")], "/flow/get/strings");
        let to_io = IO::new(vec![DataType::from("array/string")], "/flow/process/input");
        assert!(
            conn.connect(from_io, to_io, 0).is_ok(),
            "Direct array/string to array/string should be compatible"
        );
    }

    #[test]
    fn connect_incompatible_types() {
        let mut conn = Connection::new("/flow/get/string", "/flow/process/value");
        let from_io = IO::new(vec![DataType::from("string")], "/flow/get/string");
        let to_io = IO::new(vec![DataType::from("number")], "/flow/process/value");
        assert!(
            conn.connect(from_io, to_io, 0).is_err(),
            "string to number should be incompatible"
        );
    }

    #[test]
    fn subroute_narrows_deep_array_to_compatible() {
        // output is array/array/array/number (order 3)
        // Without subroute: order difference 3→0 = 3, rejected (>2)
        // With subroute /0: selects array/array/number (order 2), difference 2→0 = 2, accepted
        let mut conn = Connection::new("/flow/func/output/0", "/flow/process/value");
        let from_io = IO::new(
            vec![DataType::from("array/array/array/number")],
            "/flow/func/output",
        );
        let to_io = IO::new(vec![DataType::from("number")], "/flow/process/value");
        assert!(
            conn.connect(from_io, to_io, 0).is_ok(),
            "Subroute /0 on array/array/array/number should narrow to array/array/number, compatible with number (diff 2)"
        );
    }

    #[test]
    fn without_subroute_deep_array_is_incompatible() {
        // Same types but no subroute — order difference 3, should be rejected
        let mut conn = Connection::new("/flow/func/output", "/flow/process/value");
        let from_io = IO::new(
            vec![DataType::from("array/array/array/number")],
            "/flow/func/output",
        );
        let to_io = IO::new(vec![DataType::from("number")], "/flow/process/value");
        assert!(
            conn.connect(from_io, to_io, 0).is_err(),
            "array/array/array/number to number without subroute should be incompatible (diff 3)"
        );
    }
}