1use crate::input::{StoredInput, boxed_input};
2use crate::{
3 Graph, GraphError, GraphResult, InputNode, NodeId, OutputKey, RebaselineReason, TransactionId,
4 transaction_types::{
5 AuditEntry, AuditEvent, TransactionOptions, TransactionPhase, TransactionResult,
6 },
7};
8use std::collections::BTreeMap;
9
10pub struct Transaction<'graph, C = (), O = ()> {
12 pub(crate) graph: &'graph mut Graph<C, O>,
13 pub(crate) working: Graph<C, O>,
14 id: TransactionId,
15 options: TransactionOptions,
16 staged_inputs: BTreeMap<NodeId, Box<dyn StoredInput>>,
17 pub(crate) staged_events: Vec<AuditEvent>,
18 pub(crate) staged_resource_planner_collections: Vec<NodeId>,
19 pub(crate) staged_output_rebaselines: BTreeMap<OutputKey, RebaselineReason>,
20 pub(crate) graph_mutated: bool,
21 pub(crate) failed: Option<GraphError>,
22 closed: bool,
23}
24
25impl<'graph, C, O> Transaction<'graph, C, O>
26where
27 O: Clone + PartialEq,
28{
29 pub(crate) fn new(
30 graph: &'graph mut Graph<C, O>,
31 id: TransactionId,
32 options: TransactionOptions,
33 ) -> Self {
34 let mut working = graph.clone();
35 working.transaction_open = false;
36 Self {
37 graph,
38 working,
39 id,
40 options,
41 staged_inputs: BTreeMap::new(),
42 staged_events: Vec::new(),
43 staged_resource_planner_collections: Vec::new(),
44 staged_output_rebaselines: BTreeMap::new(),
45 graph_mutated: false,
46 failed: None,
47 closed: false,
48 }
49 }
50
51 pub fn id(&self) -> TransactionId {
53 self.id
54 }
55
56 pub fn set_input<T>(&mut self, input: InputNode<T>, value: T) -> GraphResult<()>
58 where
59 T: Clone + PartialEq + 'static,
60 {
61 self.set_input_by_id(input.id(), value)
62 }
63
64 pub fn set_input_by_id<T>(&mut self, node: NodeId, value: T) -> GraphResult<()>
66 where
67 T: Clone + PartialEq + 'static,
68 {
69 self.ensure_open()?;
70 if let Err(error) = self.working.validate_input_write::<T>(node) {
71 self.failed.get_or_insert_with(|| error.clone());
72 return Err(error);
73 }
74 self.staged_inputs.insert(node, boxed_input(value));
75 Ok(())
76 }
77
78 pub fn commit(&mut self) -> GraphResult<TransactionResult<C, O>> {
80 self.ensure_open()?;
81 let mut phase_trace = vec![TransactionPhase::StageOperations];
82 phase_trace.push(TransactionPhase::ValidateTransaction);
83 if let Some(error) = self.failed.clone() {
84 self.close();
85 return Err(error);
86 }
87
88 let mut changed_inputs = Vec::new();
89 for (node, staged) in &self.staged_inputs {
90 let changed = self
91 .working
92 .input_values
93 .get(node)
94 .is_none_or(|current| !current.equals(staged.as_ref()));
95 if changed || !self.options.skip_equal_inputs {
96 changed_inputs.push(*node);
97 }
98 }
99
100 let next_revision = if changed_inputs.is_empty() && !self.graph_mutated {
101 self.graph.revision
102 } else {
103 self.graph.revision.next()
104 };
105
106 let mut audit_events = self.staged_events.clone();
107 for node in self.staged_inputs.keys() {
108 let event = if changed_inputs.contains(node) {
109 AuditEvent::InputChanged(*node)
110 } else {
111 AuditEvent::InputUnchanged(*node)
112 };
113 audit_events.push(event);
114 }
115
116 phase_trace.push(TransactionPhase::CommitCanonicalInputs);
117 for node in &changed_inputs {
118 if let Some(staged) = self.staged_inputs.get(node) {
119 self.working.input_values.insert(*node, staged.clone());
120 if let Some(meta) = self.working.nodes.get_mut(node) {
121 meta.mark_changed(next_revision);
122 }
123 }
124 }
125 for event in &self.staged_events {
126 if let AuditEvent::NodeCreated(node) = event
127 && let Some(meta) = self.working.nodes.get_mut(node)
128 {
129 meta.mark_created(next_revision);
130 }
131 }
132 let created_nodes: Vec<NodeId> = self
133 .staged_events
134 .iter()
135 .filter_map(|event| match event {
136 AuditEvent::NodeCreated(node) => Some(*node),
137 _ => None,
138 })
139 .collect();
140 let mut initial_changed = changed_inputs.clone();
141 initial_changed.extend(created_nodes);
142 phase_trace.push(TransactionPhase::MarkDirtyNodes);
143 phase_trace.push(TransactionPhase::RecomputeDerivedNodes);
144 let changed_derived_nodes = match self.working.recompute_dirty_derived(&initial_changed) {
145 Ok(nodes) => nodes,
146 Err(error) => {
147 self.close();
148 return Err(error);
149 }
150 };
151 for node in &changed_derived_nodes {
152 if let Some(meta) = self.working.nodes.get_mut(node) {
153 meta.mark_changed(next_revision);
154 }
155 audit_events.push(AuditEvent::DerivedChanged(*node));
156 }
157 initial_changed.extend(changed_derived_nodes.iter().copied());
158 phase_trace.push(TransactionPhase::RecomputeCollectionNodes);
159 let changed_collection_nodes =
160 match self.working.recompute_dirty_collections(&initial_changed) {
161 Ok(nodes) => nodes,
162 Err(error) => {
163 self.close();
164 return Err(error);
165 }
166 };
167 for node in &changed_collection_nodes {
168 if let Some(meta) = self.working.nodes.get_mut(node) {
169 meta.mark_changed(next_revision);
170 }
171 audit_events.push(AuditEvent::CollectionChanged(*node));
172 }
173 phase_trace.push(TransactionPhase::ComputeStructuralDiffs);
174 self.working
175 .baseline_collection_diffs(&self.staged_resource_planner_collections);
176 phase_trace.push(TransactionPhase::ResolveScopeLifecycle);
177 let closed_scopes: Vec<_> = self
178 .staged_events
179 .iter()
180 .filter_map(|event| match event {
181 AuditEvent::ScopeClosed(scope) => Some(*scope),
182 _ => None,
183 })
184 .collect();
185 phase_trace.push(TransactionPhase::ProduceResourcePlans);
186 let resource_plan = match self.working.produce_resource_plan(&closed_scopes) {
187 Ok(plan) => plan,
188 Err(error) => {
189 self.close();
190 return Err(error);
191 }
192 };
193 let mut output_changed = initial_changed.clone();
194 output_changed.extend(changed_collection_nodes.iter().copied());
195 phase_trace.push(TransactionPhase::ProduceOutputFrames);
196 let output_frames = match self.working.produce_output_frames(
197 &output_changed,
198 &closed_scopes,
199 &self.staged_output_rebaselines,
200 self.id,
201 next_revision,
202 ) {
203 Ok(frames) => frames,
204 Err(error) => {
205 self.close();
206 return Err(error);
207 }
208 };
209 let audit_log = audit_events
210 .into_iter()
211 .map(|event| AuditEntry {
212 transaction_id: self.id,
213 revision: next_revision,
214 event,
215 })
216 .collect();
217 phase_trace.push(TransactionPhase::CommitGraphRevision);
218 self.working.revision = next_revision;
219 self.working.next_node_id = self.graph.next_node_id;
220 self.working.next_scope_id = self.graph.next_scope_id;
221 self.working.next_output_key = self.graph.next_output_key;
222
223 phase_trace.push(TransactionPhase::ReturnTransactionResult);
224 let result = TransactionResult {
225 transaction_id: self.id,
226 revision: next_revision,
227 changed_inputs,
228 changed_derived_nodes,
229 changed_collection_nodes,
230 resource_plan,
231 output_frames,
232 audit_log,
233 phase_trace,
234 };
235 self.working.record_transaction_audit(&result);
236 *self.graph = self.working.clone();
237 self.graph.transaction_open = true;
238 self.close();
239 Ok(result)
240 }
241
242 pub(crate) fn ensure_open(&self) -> GraphResult<()> {
243 if self.closed {
244 Err(GraphError::TransactionClosed(self.id))
245 } else {
246 Ok(())
247 }
248 }
249
250 fn close(&mut self) {
251 self.closed = true;
252 self.graph.transaction_open = false;
253 }
254}
255
256impl<C, O> Drop for Transaction<'_, C, O> {
257 fn drop(&mut self) {
258 if !self.closed {
259 self.graph.transaction_open = false;
260 }
261 }
262}