hydro_lang 0.16.0

A Rust framework for correct and performant distributed systems
Documentation
use std::marker::PhantomData;

use dfir_lang::graph::{
    DfirGraph, FlatGraphBuilderOutput, eliminate_extra_unions_tees, partition_graph,
};
use slotmap::{SecondaryMap, SlotMap, SparseSecondaryMap};

use super::compiled::CompiledFlow;
use super::deploy::{DeployFlow, DeployResult};
use super::deploy_provider::{ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec};
use super::ir::{HydroRoot, emit};
use crate::location::{Cluster, External, LocationKey, LocationType, Process};
#[cfg(stageleft_runtime)]
#[cfg(feature = "sim")]
use crate::sim::{flow::SimFlow, graph::SimNode};
use crate::staging_util::Invariant;
#[cfg(stageleft_runtime)]
#[cfg(feature = "viz")]
use crate::viz::api::GraphApi;

pub struct BuiltFlow<'a> {
    pub(super) ir: Vec<HydroRoot>,
    pub(super) locations: SlotMap<LocationKey, LocationType>,
    pub(super) location_names: SecondaryMap<LocationKey, String>,

    /// Application name used in telemetry.
    pub(super) flow_name: String,

    pub(super) _phantom: Invariant<'a>,
}

pub(crate) fn build_inner(ir: &mut Vec<HydroRoot>) -> SecondaryMap<LocationKey, DfirGraph> {
    emit(ir)
        .into_iter()
        .map(|(k, v)| {
            let FlatGraphBuilderOutput { mut flat_graph, .. } =
                v.build().expect("Failed to build DFIR flat graph.");
            eliminate_extra_unions_tees(&mut flat_graph);
            let partitioned_graph =
                partition_graph(flat_graph).expect("Failed to partition (cycle detected).");
            (k, partitioned_graph)
        })
        .collect()
}

impl<'a> BuiltFlow<'a> {
    /// Returns all [`HydroRoot`]s in the IR.
    pub fn ir(&self) -> &[HydroRoot] {
        &self.ir
    }

    /// Serialize the IR as JSON.
    #[cfg(feature = "runtime_support")]
    pub fn ir_json(&self) -> Result<String, serde_json::Error> {
        super::ir::serialize_dedup_shared(|| serde_json::to_string_pretty(&self.ir))
    }

    /// Returns all raw location ID -> location name mappings.
    pub fn location_names(&self) -> &SecondaryMap<LocationKey, String> {
        &self.location_names
    }

    /// Get a GraphApi instance for this built flow
    #[cfg(stageleft_runtime)]
    #[cfg(feature = "viz")]
    pub fn graph_api(&self) -> GraphApi<'_> {
        GraphApi::new(&self.ir, self.location_names())
    }

    /// Render graph to string in the given format.
    #[cfg(feature = "viz")]
    pub fn render_graph(
        &self,
        format: crate::viz::config::GraphType,
        use_short_labels: bool,
        show_metadata: bool,
    ) -> String {
        self.graph_api()
            .render(format, use_short_labels, show_metadata)
    }

    /// Write graph to file.
    #[cfg(feature = "viz")]
    pub fn write_graph_to_file(
        &self,
        format: crate::viz::config::GraphType,
        filename: &str,
        use_short_labels: bool,
        show_metadata: bool,
    ) -> Result<(), Box<dyn std::error::Error>> {
        self.graph_api()
            .write_to_file(format, filename, use_short_labels, show_metadata)
    }

    /// Generate graph based on CLI config. Returns Some(path) if written.
    #[cfg(feature = "viz")]
    pub fn generate_graph(
        &self,
        config: &crate::viz::config::GraphConfig,
    ) -> Result<Option<String>, Box<dyn std::error::Error>> {
        self.graph_api().generate_graph(config)
    }

    pub fn optimize_with(mut self, f: impl FnOnce(&mut [HydroRoot])) -> Self {
        f(&mut self.ir);
        self
    }

    pub fn with_default_optimize<D: Deploy<'a>>(self) -> DeployFlow<'a, D> {
        self.into_deploy()
    }

    #[cfg(feature = "sim")]
    /// Creates a simulation for this builder, which can be used to run deterministic simulations
    /// of the Hydro program.
    pub fn sim(self) -> SimFlow<'a> {
        use std::cell::RefCell;
        use std::rc::Rc;

        use slotmap::SparseSecondaryMap;

        use crate::sim::graph::SimNodePort;

        let shared_port_counter = Rc::new(RefCell::new(SimNodePort::default()));

        let mut processes = SparseSecondaryMap::new();
        let mut clusters = SparseSecondaryMap::new();
        let externals = SparseSecondaryMap::new();

        for (key, loc) in self.locations.iter() {
            match loc {
                LocationType::Process => {
                    processes.insert(
                        key,
                        SimNode {
                            shared_port_counter: shared_port_counter.clone(),
                        },
                    );
                }
                LocationType::Cluster => {
                    clusters.insert(
                        key,
                        SimNode {
                            shared_port_counter: shared_port_counter.clone(),
                        },
                    );
                }
                LocationType::External => {
                    panic!("Sim cannot have externals");
                }
            }
        }

        SimFlow {
            ir: self.ir,
            processes,
            clusters,
            externals,
            cluster_max_sizes: SparseSecondaryMap::new(),
            externals_port_registry: Default::default(),
            test_safety_only: false,
            unit_test_fuzz_iterations: 8192,
            _phantom: PhantomData,
        }
    }

    pub fn into_deploy<D: Deploy<'a>>(self) -> DeployFlow<'a, D> {
        let (processes, clusters, externals) = Default::default();
        DeployFlow {
            ir: self.ir,
            locations: self.locations,
            location_names: self.location_names,
            processes,
            clusters,
            externals,
            sidecars: SparseSecondaryMap::new(),
            flow_name: self.flow_name,
            _phantom: PhantomData,
        }
    }

    pub fn with_process<P, D: Deploy<'a>>(
        self,
        process: &Process<P>,
        spec: impl IntoProcessSpec<'a, D>,
    ) -> DeployFlow<'a, D> {
        self.into_deploy().with_process(process, spec)
    }

    pub fn with_remaining_processes<D: Deploy<'a>, S: IntoProcessSpec<'a, D> + 'a>(
        self,
        spec: impl Fn() -> S,
    ) -> DeployFlow<'a, D> {
        self.into_deploy().with_remaining_processes(spec)
    }

    pub fn with_external<P, D: Deploy<'a>>(
        self,
        process: &External<P>,
        spec: impl ExternalSpec<'a, D>,
    ) -> DeployFlow<'a, D> {
        self.into_deploy().with_external(process, spec)
    }

    pub fn with_remaining_externals<D: Deploy<'a>, S: ExternalSpec<'a, D> + 'a>(
        self,
        spec: impl Fn() -> S,
    ) -> DeployFlow<'a, D> {
        self.into_deploy().with_remaining_externals(spec)
    }

    pub fn with_cluster<C, D: Deploy<'a>>(
        self,
        cluster: &Cluster<C>,
        spec: impl ClusterSpec<'a, D>,
    ) -> DeployFlow<'a, D> {
        self.into_deploy().with_cluster(cluster, spec)
    }

    pub fn with_remaining_clusters<D: Deploy<'a>, S: ClusterSpec<'a, D> + 'a>(
        self,
        spec: impl Fn() -> S,
    ) -> DeployFlow<'a, D> {
        self.into_deploy().with_remaining_clusters(spec)
    }

    pub fn compile<D: Deploy<'a, InstantiateEnv = ()>>(self) -> CompiledFlow<'a> {
        self.into_deploy::<D>().compile()
    }

    pub fn deploy<D: Deploy<'a>>(self, env: &mut D::InstantiateEnv) -> DeployResult<'a, D> {
        self.into_deploy::<D>().deploy(env)
    }
}