jellyflow_runtime/runtime/store/dispatch/
mod.rs1mod gate;
4mod pipeline;
5
6use crate::io::NodeGraphViewState;
7use crate::profile::{ApplyPipelineError, GraphProfile, apply_transaction_with_profile};
8use crate::runtime::commit::NodeGraphPatch;
9use crate::runtime::events::{NodeGraphStoreSnapshot, ViewChange};
10use crate::runtime::middleware::NodeGraphStoreMiddleware;
11use jellyflow_core::core::Graph;
12use jellyflow_core::ops::GraphTransaction;
13
14use super::dispatch_profile::DispatchProfile;
15use super::snapshot::StoreSnapshotParts;
16use super::{DispatchError, DispatchOutcome, NodeGraphStore};
17
18use self::pipeline::{DispatchPipeline, DispatchPipelineResult};
19
20pub(super) struct PreparedGraphCommit {
21 patch: NodeGraphPatch,
22 sanitized_view_state: Option<SanitizedViewState>,
23}
24
25struct SanitizedViewState {
26 before: NodeGraphViewState,
27 after: NodeGraphViewState,
28 changes: Vec<ViewChange>,
29}
30
31impl NodeGraphStore {
32 pub fn dispatch_transaction(
37 &mut self,
38 tx: &GraphTransaction,
39 ) -> Result<DispatchOutcome, DispatchError> {
40 self.dispatch_transaction_impl(tx, DispatchProfile::StoreProfile)
41 .map_err(DispatchError::Apply)
42 }
43
44 pub fn dispatch_transaction_with_profile(
48 &mut self,
49 tx: &GraphTransaction,
50 profile: &mut dyn GraphProfile,
51 ) -> Result<DispatchOutcome, ApplyPipelineError> {
52 self.dispatch_transaction_impl(tx, DispatchProfile::External(profile))
53 }
54
55 fn dispatch_transaction_impl(
56 &mut self,
57 tx: &GraphTransaction,
58 dispatch_profile: DispatchProfile<'_>,
59 ) -> Result<DispatchOutcome, ApplyPipelineError> {
60 match DispatchPipeline::new(self, dispatch_profile).run(tx)? {
61 DispatchPipelineResult::Empty(committed) => {
62 Ok(DispatchOutcome::from_committed(committed))
63 }
64 DispatchPipelineResult::Commit { graph, committed } => {
65 Ok(self.commit_dispatch(*graph, committed))
66 }
67 }
68 }
69
70 fn commit_dispatch(&mut self, graph: Graph, committed: GraphTransaction) -> DispatchOutcome {
71 let committed_for_history = committed.clone();
72 let prepared = self.prepare_committed_graph_patch(graph, committed);
73 self.history.record(committed_for_history);
74 self.complete_committed_patch(prepared)
75 }
76
77 fn run_before_dispatch_middleware(
78 &mut self,
79 tx: &mut GraphTransaction,
80 ) -> Result<(), ApplyPipelineError> {
81 if let Some(result) = self.with_dispatch_middleware_snapshot(|middleware, snapshot| {
82 middleware.before_dispatch(snapshot, tx)
83 }) {
84 result?;
85 }
86 Ok(())
87 }
88
89 pub(super) fn run_after_dispatch_middleware(&mut self, patch: &NodeGraphPatch) {
90 self.with_dispatch_middleware_snapshot(|middleware, snapshot| {
91 middleware.after_dispatch(snapshot, patch);
92 });
93 }
94
95 fn with_dispatch_middleware_snapshot<R>(
96 &mut self,
97 f: impl FnOnce(&mut dyn NodeGraphStoreMiddleware, NodeGraphStoreSnapshot<'_>) -> R,
98 ) -> Option<R> {
99 let snapshot_parts = StoreSnapshotParts::from_store_fields(
100 &self.graph,
101 self.graph_revision,
102 self.layout_facts_revision,
103 &self.view_state,
104 &self.interaction,
105 &self.runtime_tuning,
106 &self.history,
107 );
108 self.middleware
109 .as_deref_mut()
110 .map(|middleware| f(middleware, snapshot_parts.snapshot()))
111 }
112
113 pub(super) fn apply_to_graph(
114 &mut self,
115 graph: &mut Graph,
116 tx: &GraphTransaction,
117 ) -> Result<GraphTransaction, ApplyPipelineError> {
118 if let Some(profile) = self.profile.as_deref_mut() {
119 apply_transaction_with_profile(graph, profile, tx)
120 } else {
121 tx.apply_to(graph)?;
122 Ok(tx.clone())
123 }
124 }
125
126 fn install_committed_graph_state(&mut self, graph: Graph, committed: &GraphTransaction) {
127 self.graph = graph;
128 self.bump_graph_revision();
129 self.view_state.sanitize_for_graph(&self.graph);
130 self.lookups.apply_transaction(&self.graph, committed);
131 self.bump_layout_facts_revision();
132 }
133
134 pub(super) fn prepare_committed_graph_patch(
135 &mut self,
136 graph: Graph,
137 committed: GraphTransaction,
138 ) -> PreparedGraphCommit {
139 let view_before = self.view_state.clone();
140 self.install_committed_graph_state(graph, &committed);
141 let sanitized_view_state = self.committed_view_state_change(view_before);
142 PreparedGraphCommit {
143 patch: NodeGraphPatch::new(committed),
144 sanitized_view_state,
145 }
146 }
147
148 pub(super) fn complete_committed_patch(
149 &mut self,
150 prepared: PreparedGraphCommit,
151 ) -> DispatchOutcome {
152 let PreparedGraphCommit {
153 patch,
154 sanitized_view_state,
155 } = prepared;
156 self.run_after_dispatch_middleware(&patch);
157 self.publish_graph_commit(&patch);
158 if let Some(sanitized) = sanitized_view_state {
159 self.publish_view_changed(&sanitized.before, &sanitized.after, &sanitized.changes);
160 }
161 DispatchOutcome::new(patch)
162 }
163
164 fn committed_view_state_change(
165 &self,
166 before: NodeGraphViewState,
167 ) -> Option<SanitizedViewState> {
168 let after = self.view_state.clone();
169 if before == after {
170 return None;
171 }
172
173 let mut changes = Vec::new();
174 if before.pan != after.pan || (before.zoom - after.zoom).abs() > 1.0e-6 {
175 changes.push(ViewChange::viewport(after.pan, after.zoom));
176 }
177 if before.selected_nodes != after.selected_nodes
178 || before.selected_edges != after.selected_edges
179 || before.selected_groups != after.selected_groups
180 {
181 changes.push(ViewChange::selection(
182 after.selected_nodes.clone(),
183 after.selected_edges.clone(),
184 after.selected_groups.clone(),
185 ));
186 }
187
188 Some(SanitizedViewState {
189 before,
190 after,
191 changes,
192 })
193 }
194}