hydroflow_plus_cli_integration 0.8.0

Library for working with hydro_deploy and hydroflow_plus
Documentation
use std::cell::RefCell;
use std::rc::Rc;

use hydroflow_plus::location::{
    Cluster, ClusterSpec, Deploy, HfSendManyToMany, HfSendManyToOne, HfSendOneToMany,
    HfSendOneToOne, Location, ProcessSpec,
};
use hydroflow_plus::util::cli::{
    ConnectedDemux, ConnectedDirect, ConnectedSink, ConnectedSource, ConnectedTagged, HydroCLI,
};
use stageleft::{q, Quoted, RuntimeData};

use super::HydroflowPlusMeta;

pub struct CLIRuntime {}

impl<'a> Deploy<'a> for CLIRuntime {
    type ClusterId = u32;
    type Process = CLIRuntimeNode<'a>;
    type Cluster = CLIRuntimeCluster<'a>;
    type Meta = ();
    type GraphId = usize;
    type ProcessPort = String;
    type ClusterPort = String;
}

#[derive(Clone)]
pub struct CLIRuntimeNode<'a> {
    id: usize,
    next_port: Rc<RefCell<usize>>,
    cli: RuntimeData<&'a HydroCLI<HydroflowPlusMeta>>,
}

impl<'a> Location for CLIRuntimeNode<'a> {
    type Port = String;
    type Meta = ();

    fn id(&self) -> usize {
        self.id
    }

    fn next_port(&self) -> String {
        let next_send_port = *self.next_port.borrow();
        *self.next_port.borrow_mut() += 1;
        format!("port_{}", next_send_port)
    }

    fn update_meta(&mut self, _meta: &Self::Meta) {}
}

#[derive(Clone)]
pub struct CLIRuntimeCluster<'a> {
    id: usize,
    next_port: Rc<RefCell<usize>>,
    cli: RuntimeData<&'a HydroCLI<HydroflowPlusMeta>>,
}

impl<'a> Location for CLIRuntimeCluster<'a> {
    type Port = String;
    type Meta = ();

    fn id(&self) -> usize {
        self.id
    }

    fn next_port(&self) -> String {
        let next_send_port = *self.next_port.borrow();
        *self.next_port.borrow_mut() += 1;
        format!("port_{}", next_send_port)
    }

    fn update_meta(&mut self, _meta: &Self::Meta) {}
}

impl<'a> Cluster<'a> for CLIRuntimeCluster<'a> {
    type Id = u32;

    fn ids(&self) -> impl Quoted<'a, &'a Vec<u32>> + Copy + 'a {
        let cli = self.cli;
        let self_id = self.id;
        q!(cli.meta.clusters.get(&self_id).unwrap())
    }

    fn self_id(&self) -> impl Quoted<'a, u32> + Copy + 'a {
        let cli = self.cli;
        q!(cli
            .meta
            .cluster_id
            .expect("Tried to read Cluster ID on a non-cluster node"))
    }
}

impl<'a> HfSendOneToOne<CLIRuntimeNode<'a>> for CLIRuntimeNode<'a> {
    fn connect(&self, _other: &CLIRuntimeNode, _source_port: &String, _recipient_port: &String) {}

    fn gen_sink_statement(&self, port: &String) -> syn::Expr {
        let self_cli = self.cli;
        let port = port.as_str();
        q!({
            self_cli
                .port(port)
                .connect_local_blocking::<ConnectedDirect>()
                .into_sink()
        })
        .splice()
    }

    fn gen_source_statement(other: &CLIRuntimeNode<'a>, port: &String) -> syn::Expr {
        let self_cli = other.cli;
        let port = port.as_str();
        q!({
            self_cli
                .port(port)
                .connect_local_blocking::<ConnectedDirect>()
                .into_source()
        })
        .splice()
    }
}

impl<'a> HfSendManyToOne<CLIRuntimeNode<'a>, u32> for CLIRuntimeCluster<'a> {
    fn connect(&self, _other: &CLIRuntimeNode, _source_port: &String, _recipient_port: &String) {}

    fn gen_sink_statement(&self, port: &String) -> syn::Expr {
        let self_cli = self.cli;
        let port = port.as_str();
        q!({
            self_cli
                .port(port)
                .connect_local_blocking::<ConnectedDirect>()
                .into_sink()
        })
        .splice()
    }

    fn gen_source_statement(other: &CLIRuntimeNode<'a>, port: &String) -> syn::Expr {
        let self_cli = other.cli;
        let port = port.as_str();
        q!({
            self_cli
                .port(port)
                .connect_local_blocking::<ConnectedTagged<ConnectedDirect>>()
                .into_source()
        })
        .splice()
    }
}

impl<'a> HfSendOneToMany<CLIRuntimeCluster<'a>, u32> for CLIRuntimeNode<'a> {
    fn connect(&self, _other: &CLIRuntimeCluster, _source_port: &String, _recipient_port: &String) {
    }

    fn gen_sink_statement(&self, port: &String) -> syn::Expr {
        let self_cli = self.cli;
        let port = port.as_str();

        q!({
            self_cli
                .port(port)
                .connect_local_blocking::<ConnectedDemux<ConnectedDirect>>()
                .into_sink()
        })
        .splice()
    }

    fn gen_source_statement(other: &CLIRuntimeCluster<'a>, port: &String) -> syn::Expr {
        let self_cli = other.cli;
        let port = port.as_str();

        q!({
            self_cli
                .port(port)
                .connect_local_blocking::<ConnectedDirect>()
                .into_source()
        })
        .splice()
    }
}

impl<'a> HfSendManyToMany<CLIRuntimeCluster<'a>, u32> for CLIRuntimeCluster<'a> {
    fn connect(&self, _other: &CLIRuntimeCluster, _source_port: &String, _recipient_port: &String) {
    }

    fn gen_sink_statement(&self, port: &String) -> syn::Expr {
        let self_cli = self.cli;
        let port = port.as_str();

        q!({
            self_cli
                .port(port)
                .connect_local_blocking::<ConnectedDemux<ConnectedDirect>>()
                .into_sink()
        })
        .splice()
    }

    fn gen_source_statement(other: &CLIRuntimeCluster<'a>, port: &String) -> syn::Expr {
        let self_cli = other.cli;
        let port = port.as_str();

        q!({
            self_cli
                .port(port)
                .connect_local_blocking::<ConnectedTagged<ConnectedDirect>>()
                .into_source()
        })
        .splice()
    }
}

impl<'cli> ProcessSpec<'cli, CLIRuntime> for RuntimeData<&'cli HydroCLI<HydroflowPlusMeta>> {
    fn build(&self, id: usize, _meta: &mut ()) -> CLIRuntimeNode<'cli> {
        CLIRuntimeNode {
            id,
            next_port: Rc::new(RefCell::new(0)),
            cli: *self,
        }
    }
}

impl<'cli> ClusterSpec<'cli, CLIRuntime> for RuntimeData<&'cli HydroCLI<HydroflowPlusMeta>> {
    fn build(&self, id: usize, _meta: &mut ()) -> CLIRuntimeCluster<'cli> {
        CLIRuntimeCluster {
            id,
            next_port: Rc::new(RefCell::new(0)),
            cli: *self,
        }
    }
}