hydroflow_plus_deploy 0.9.0

Library for working with hydro_deploy and hydroflow_plus
Documentation
use std::cell::RefCell;
use std::rc::Rc;

use hydroflow_plus::deploy::{ClusterSpec, Deploy, Node, ProcessSpec};
use hydroflow_plus::lang::graph::HydroflowGraph;
use hydroflow_plus::util::deploy::{
    ConnectedDemux, ConnectedDirect, ConnectedSink, ConnectedSource, ConnectedTagged, DeployPorts,
};
use stageleft::{q, Quoted, RuntimeData};

use super::HydroflowPlusMeta;

pub struct DeployRuntime {}

impl<'a> Deploy<'a> for DeployRuntime {
    type InstantiateEnv = ();
    type CompileEnv = RuntimeData<&'a DeployPorts<HydroflowPlusMeta>>;
    type Process = DeployRuntimeNode;
    type Cluster = DeployRuntimeCluster;
    type Meta = ();
    type GraphId = usize;
    type ProcessPort = String;
    type ClusterPort = String;

    fn has_trivial_node() -> bool {
        true
    }

    fn trivial_process(_id: usize) -> Self::Process {
        DeployRuntimeNode {
            next_port: Rc::new(RefCell::new(0)),
        }
    }

    fn trivail_cluster(_id: usize) -> Self::Cluster {
        DeployRuntimeCluster {
            next_port: Rc::new(RefCell::new(0)),
        }
    }

    fn allocate_process_port(process: &Self::Process) -> Self::ProcessPort {
        process.next_port()
    }

    fn allocate_cluster_port(cluster: &Self::Cluster) -> Self::ClusterPort {
        cluster.next_port()
    }

    fn o2o_sink_source(
        env: &Self::CompileEnv,
        _p1: &Self::Process,
        p1_port: &Self::ProcessPort,
        _p2: &Self::Process,
        p2_port: &Self::ProcessPort,
    ) -> (syn::Expr, syn::Expr) {
        let env = *env;
        (
            {
                let port = p1_port.as_str();

                q!({
                    env.port(port)
                        .connect_local_blocking::<ConnectedDirect>()
                        .into_sink()
                })
                .splice()
            },
            {
                let port = p2_port.as_str();

                q!({
                    env.port(port)
                        .connect_local_blocking::<ConnectedDirect>()
                        .into_source()
                })
                .splice()
            },
        )
    }

    fn o2o_connect(
        _p1: &Self::Process,
        _p1_port: &Self::ProcessPort,
        _p2: &Self::Process,
        _p2_port: &Self::ProcessPort,
    ) {
        panic!()
    }

    fn o2m_sink_source(
        env: &Self::CompileEnv,
        _p1: &Self::Process,
        p1_port: &Self::ProcessPort,
        _c2: &Self::Cluster,
        c2_port: &Self::ClusterPort,
    ) -> (syn::Expr, syn::Expr) {
        let env = *env;
        (
            {
                let port = p1_port.as_str();

                q!({
                    env.port(port)
                        .connect_local_blocking::<ConnectedDemux<ConnectedDirect>>()
                        .into_sink()
                })
                .splice()
            },
            {
                let port = c2_port.as_str();

                q!({
                    env.port(port)
                        .connect_local_blocking::<ConnectedDirect>()
                        .into_source()
                })
                .splice()
            },
        )
    }

    fn o2m_connect(
        _p1: &Self::Process,
        _p1_port: &Self::ProcessPort,
        _c2: &Self::Cluster,
        _c2_port: &Self::ClusterPort,
    ) {
        panic!()
    }

    fn m2o_sink_source(
        env: &Self::CompileEnv,
        _c1: &Self::Cluster,
        c1_port: &Self::ClusterPort,
        _p2: &Self::Process,
        p2_port: &Self::ProcessPort,
    ) -> (syn::Expr, syn::Expr) {
        let env = *env;
        (
            {
                let port = c1_port.as_str();

                q!({
                    env.port(port)
                        .connect_local_blocking::<ConnectedDirect>()
                        .into_sink()
                })
                .splice()
            },
            {
                let port = p2_port.as_str();

                q!({
                    env.port(port)
                        .connect_local_blocking::<ConnectedTagged<ConnectedDirect>>()
                        .into_source()
                })
                .splice()
            },
        )
    }

    fn m2o_connect(
        _c1: &Self::Cluster,
        _c1_port: &Self::ClusterPort,
        _p2: &Self::Process,
        _p2_port: &Self::ProcessPort,
    ) {
        panic!()
    }

    fn m2m_sink_source(
        env: &Self::CompileEnv,
        _c1: &Self::Cluster,
        c1_port: &Self::ClusterPort,
        _c2: &Self::Cluster,
        c2_port: &Self::ClusterPort,
    ) -> (syn::Expr, syn::Expr) {
        let env = *env;
        (
            {
                let port = c1_port.as_str();

                q!({
                    env.port(port)
                        .connect_local_blocking::<ConnectedDemux<ConnectedDirect>>()
                        .into_sink()
                })
                .splice()
            },
            {
                let port = c2_port.as_str();

                q!({
                    env.port(port)
                        .connect_local_blocking::<ConnectedTagged<ConnectedDirect>>()
                        .into_source()
                })
                .splice()
            },
        )
    }

    fn m2m_connect(
        _c1: &Self::Cluster,
        _c1_port: &Self::ClusterPort,
        _c2: &Self::Cluster,
        _c2_port: &Self::ClusterPort,
    ) {
        panic!()
    }

    fn cluster_ids(
        env: &Self::CompileEnv,
        of_cluster: usize,
    ) -> impl Quoted<'a, &'a Vec<u32>> + Copy + 'a {
        let cli = *env;
        q!(cli.meta.clusters.get(&of_cluster).unwrap())
    }

    fn cluster_self_id(env: &Self::CompileEnv) -> impl Quoted<'a, u32> + Copy + 'a {
        let cli = *env;
        q!(cli
            .meta
            .cluster_id
            .expect("Tried to read Cluster ID on a non-cluster node"))
    }
}

#[derive(Clone)]
pub struct DeployRuntimeNode {
    next_port: Rc<RefCell<usize>>,
}

impl Node for DeployRuntimeNode {
    type Port = String;
    type Meta = ();
    type InstantiateEnv = ();

    fn next_port(&self) -> String {
        let next_send_port = *self.next_port.borrow();
        *self.next_port.borrow_mut() += 1;
        format!("port_{}", next_send_port)
    }

    fn update_meta(&mut self, _meta: &Self::Meta) {}

    fn instantiate(
        &self,
        _env: &mut Self::InstantiateEnv,
        _meta: &mut Self::Meta,
        _graph: HydroflowGraph,
        _extra_stmts: Vec<syn::Stmt>,
    ) {
        panic!(".deploy() cannot be called on a DeployRuntimeNode");
    }
}

#[derive(Clone)]
pub struct DeployRuntimeCluster {
    next_port: Rc<RefCell<usize>>,
}

impl Node for DeployRuntimeCluster {
    type Port = String;
    type Meta = ();
    type InstantiateEnv = ();

    fn next_port(&self) -> String {
        let next_send_port = *self.next_port.borrow();
        *self.next_port.borrow_mut() += 1;
        format!("port_{}", next_send_port)
    }

    fn update_meta(&mut self, _meta: &Self::Meta) {}

    fn instantiate(
        &self,
        _env: &mut Self::InstantiateEnv,
        _meta: &mut Self::Meta,
        _graph: HydroflowGraph,
        _extra_stmts: Vec<syn::Stmt>,
    ) {
        panic!(".deploy() cannot be called on a DeployRuntimeCluster");
    }
}

impl<'a> ProcessSpec<'a, DeployRuntime> for () {
    fn build(self, _id: usize, _name_hint: &str) -> DeployRuntimeNode {
        DeployRuntimeNode {
            next_port: Rc::new(RefCell::new(0)),
        }
    }
}

impl<'cli> ClusterSpec<'cli, DeployRuntime> for () {
    fn build(self, _id: usize, _name_hint: &str) -> DeployRuntimeCluster {
        DeployRuntimeCluster {
            next_port: Rc::new(RefCell::new(0)),
        }
    }
}