Skip to main content

jellyflow_runtime/runtime/store/dispatch/
mod.rs

1//! Transaction dispatch, undo, redo, middleware, and commit publication.
2
3mod gate;
4mod pipeline;
5
6use crate::io::NodeGraphViewState;
7use crate::profile::{ApplyPipelineError, GraphProfile, apply_transaction_with_profile};
8use crate::runtime::commit::NodeGraphPatch;
9use crate::runtime::events::{NodeGraphStoreSnapshot, ViewChange};
10use crate::runtime::middleware::NodeGraphStoreMiddleware;
11use jellyflow_core::core::Graph;
12use jellyflow_core::ops::GraphTransaction;
13
14use super::dispatch_profile::DispatchProfile;
15use super::snapshot::StoreSnapshotParts;
16use super::{DispatchError, DispatchOutcome, NodeGraphStore};
17
18use self::pipeline::{DispatchPipeline, DispatchPipelineResult};
19
20pub(super) struct PreparedGraphCommit {
21    patch: NodeGraphPatch,
22    sanitized_view_state: Option<SanitizedViewState>,
23}
24
25struct SanitizedViewState {
26    before: NodeGraphViewState,
27    after: NodeGraphViewState,
28    changes: Vec<ViewChange>,
29}
30
31impl NodeGraphStore {
32    /// Applies a transaction and records it in history.
33    ///
34    /// This mirrors the UI loop contract: the store applies edits to a scratch graph first and only
35    /// commits on success (so rejected profile validations do not partially mutate state).
36    pub fn dispatch_transaction(
37        &mut self,
38        tx: &GraphTransaction,
39    ) -> Result<DispatchOutcome, DispatchError> {
40        self.dispatch_transaction_impl(tx, DispatchProfile::StoreProfile)
41            .map_err(DispatchError::Apply)
42    }
43
44    /// Dispatches a transaction using an externally-owned profile pipeline.
45    ///
46    /// This is intended for UI integration where the profile is owned by the presenter layer.
47    pub fn dispatch_transaction_with_profile(
48        &mut self,
49        tx: &GraphTransaction,
50        profile: &mut dyn GraphProfile,
51    ) -> Result<DispatchOutcome, ApplyPipelineError> {
52        self.dispatch_transaction_impl(tx, DispatchProfile::External(profile))
53    }
54
55    fn dispatch_transaction_impl(
56        &mut self,
57        tx: &GraphTransaction,
58        dispatch_profile: DispatchProfile<'_>,
59    ) -> Result<DispatchOutcome, ApplyPipelineError> {
60        match DispatchPipeline::new(self, dispatch_profile).run(tx)? {
61            DispatchPipelineResult::Empty(committed) => {
62                Ok(DispatchOutcome::from_committed(committed))
63            }
64            DispatchPipelineResult::Commit { graph, committed } => {
65                Ok(self.commit_dispatch(*graph, committed))
66            }
67        }
68    }
69
70    fn commit_dispatch(&mut self, graph: Graph, committed: GraphTransaction) -> DispatchOutcome {
71        let committed_for_history = committed.clone();
72        let prepared = self.prepare_committed_graph_patch(graph, committed);
73        self.history.record(committed_for_history);
74        self.complete_committed_patch(prepared)
75    }
76
77    fn run_before_dispatch_middleware(
78        &mut self,
79        tx: &mut GraphTransaction,
80    ) -> Result<(), ApplyPipelineError> {
81        if let Some(result) = self.with_dispatch_middleware_snapshot(|middleware, snapshot| {
82            middleware.before_dispatch(snapshot, tx)
83        }) {
84            result?;
85        }
86        Ok(())
87    }
88
89    pub(super) fn run_after_dispatch_middleware(&mut self, patch: &NodeGraphPatch) {
90        self.with_dispatch_middleware_snapshot(|middleware, snapshot| {
91            middleware.after_dispatch(snapshot, patch);
92        });
93    }
94
95    fn with_dispatch_middleware_snapshot<R>(
96        &mut self,
97        f: impl FnOnce(&mut dyn NodeGraphStoreMiddleware, NodeGraphStoreSnapshot<'_>) -> R,
98    ) -> Option<R> {
99        let snapshot_parts = StoreSnapshotParts::from_store_fields(
100            &self.graph,
101            self.graph_revision,
102            self.layout_facts_revision,
103            &self.view_state,
104            &self.interaction,
105            &self.runtime_tuning,
106            &self.history,
107        );
108        self.middleware
109            .as_deref_mut()
110            .map(|middleware| f(middleware, snapshot_parts.snapshot()))
111    }
112
113    pub(super) fn apply_to_graph(
114        &mut self,
115        graph: &mut Graph,
116        tx: &GraphTransaction,
117    ) -> Result<GraphTransaction, ApplyPipelineError> {
118        if let Some(profile) = self.profile.as_deref_mut() {
119            apply_transaction_with_profile(graph, profile, tx)
120        } else {
121            tx.apply_to(graph)?;
122            Ok(tx.clone())
123        }
124    }
125
126    fn install_committed_graph_state(&mut self, graph: Graph, committed: &GraphTransaction) {
127        self.graph = graph;
128        self.bump_graph_revision();
129        self.view_state.sanitize_for_graph(&self.graph);
130        self.lookups.apply_transaction(&self.graph, committed);
131        self.bump_layout_facts_revision();
132    }
133
134    pub(super) fn prepare_committed_graph_patch(
135        &mut self,
136        graph: Graph,
137        committed: GraphTransaction,
138    ) -> PreparedGraphCommit {
139        let view_before = self.view_state.clone();
140        self.install_committed_graph_state(graph, &committed);
141        let sanitized_view_state = self.committed_view_state_change(view_before);
142        PreparedGraphCommit {
143            patch: NodeGraphPatch::new(committed),
144            sanitized_view_state,
145        }
146    }
147
148    pub(super) fn complete_committed_patch(
149        &mut self,
150        prepared: PreparedGraphCommit,
151    ) -> DispatchOutcome {
152        let PreparedGraphCommit {
153            patch,
154            sanitized_view_state,
155        } = prepared;
156        self.run_after_dispatch_middleware(&patch);
157        self.publish_graph_commit(&patch);
158        if let Some(sanitized) = sanitized_view_state {
159            self.publish_view_changed(&sanitized.before, &sanitized.after, &sanitized.changes);
160        }
161        DispatchOutcome::new(patch)
162    }
163
164    fn committed_view_state_change(
165        &self,
166        before: NodeGraphViewState,
167    ) -> Option<SanitizedViewState> {
168        let after = self.view_state.clone();
169        if before == after {
170            return None;
171        }
172
173        let mut changes = Vec::new();
174        if before.pan != after.pan || (before.zoom - after.zoom).abs() > 1.0e-6 {
175            changes.push(ViewChange::viewport(after.pan, after.zoom));
176        }
177        if before.selected_nodes != after.selected_nodes
178            || before.selected_edges != after.selected_edges
179            || before.selected_groups != after.selected_groups
180        {
181            changes.push(ViewChange::selection(
182                after.selected_nodes.clone(),
183                after.selected_edges.clone(),
184                after.selected_groups.clone(),
185            ));
186        }
187
188        Some(SanitizedViewState {
189            before,
190            after,
191            changes,
192        })
193    }
194}