jellyflow-runtime 0.1.0

Headless store, rules, schema, profile, and change pipeline for Jellyflow.
Documentation
use crate::profile::ApplyPipelineError;
use jellyflow_core::core::Graph;
use jellyflow_core::ops::GraphTransaction;

use super::super::{NodeGraphStore, dispatch_profile::DispatchProfile};
use super::gate::DispatchTransactionGate;

pub(super) struct DispatchPipeline<'store, 'profile> {
    store: &'store mut NodeGraphStore,
    dispatch_profile: DispatchProfile<'profile>,
}

pub(super) enum DispatchPipelineResult {
    Empty(GraphTransaction),
    Commit {
        graph: Box<Graph>,
        committed: GraphTransaction,
    },
}

impl<'store, 'profile> DispatchPipeline<'store, 'profile> {
    pub(super) fn new(
        store: &'store mut NodeGraphStore,
        dispatch_profile: DispatchProfile<'profile>,
    ) -> Self {
        Self {
            store,
            dispatch_profile,
        }
    }

    pub(super) fn run(
        mut self,
        tx: &GraphTransaction,
    ) -> Result<DispatchPipelineResult, ApplyPipelineError> {
        let mut tx = DispatchTransactionGate::normalize_and_validate(tx.clone())?;
        if tx.is_empty() {
            return Ok(DispatchPipelineResult::Empty(tx));
        }

        self.store.run_before_dispatch_middleware(&mut tx)?;
        tx = DispatchTransactionGate::normalize_and_validate(tx)?;
        if tx.is_empty() {
            return Ok(DispatchPipelineResult::Empty(tx));
        }

        let (graph, committed) = self.apply_to_scratch(&tx)?;
        let committed = DispatchTransactionGate::normalize_and_validate(committed)?;
        Ok(DispatchPipelineResult::Commit {
            graph: Box::new(graph),
            committed,
        })
    }

    fn apply_to_scratch(
        &mut self,
        tx: &GraphTransaction,
    ) -> Result<(Graph, GraphTransaction), ApplyPipelineError> {
        let mut scratch = self.store.graph.clone();
        let committed = self
            .dispatch_profile
            .apply_to_graph(self.store, &mut scratch, tx)?;
        Ok((scratch, committed))
    }
}