1use crate::input::{StoredInput, boxed_input};
2use crate::transaction_trace_build::{scope_events, stable_node_union};
3use crate::{
4 Graph, GraphError, GraphResult, InputNode, NodeId, OutputKey, RebaselineReason, TransactionId,
5 transaction_types::{
6 AuditEntry, AuditEvent, StagedInputChange, StagedInputOutcome, TransactionOptions,
7 TransactionPhase, TransactionResult,
8 },
9};
10use std::collections::{BTreeMap, BTreeSet};
11
12pub struct Transaction<'graph, C = (), O = ()> {
14 pub(crate) graph: &'graph mut Graph<C, O>,
15 pub(crate) working: Graph<C, O>,
16 id: TransactionId,
17 options: TransactionOptions,
18 staged_inputs: BTreeMap<NodeId, Box<dyn StoredInput>>,
19 pub(crate) staged_events: Vec<AuditEvent>,
20 pub(crate) staged_resource_planner_collections: Vec<NodeId>,
21 pub(crate) staged_output_rebaselines: BTreeMap<OutputKey, RebaselineReason>,
22 pub(crate) graph_mutated: bool,
23 pub(crate) failed: Option<GraphError>,
24 closed: bool,
25}
26
27impl<'graph, C, O> Transaction<'graph, C, O>
28where
29 O: Clone + PartialEq,
30{
31 pub(crate) fn new(
32 graph: &'graph mut Graph<C, O>,
33 id: TransactionId,
34 options: TransactionOptions,
35 ) -> Self {
36 let mut working = graph.clone();
37 working.transaction_open = false;
38 Self {
39 graph,
40 working,
41 id,
42 options,
43 staged_inputs: BTreeMap::new(),
44 staged_events: Vec::new(),
45 staged_resource_planner_collections: Vec::new(),
46 staged_output_rebaselines: BTreeMap::new(),
47 graph_mutated: false,
48 failed: None,
49 closed: false,
50 }
51 }
52
53 pub fn id(&self) -> TransactionId {
55 self.id
56 }
57
58 pub fn set_input<T>(&mut self, input: InputNode<T>, value: T) -> GraphResult<()>
60 where
61 T: Clone + PartialEq + Send + Sync + 'static,
62 {
63 self.set_input_by_id(input.id(), value)
64 }
65
66 pub fn set_input_by_id<T>(&mut self, node: NodeId, value: T) -> GraphResult<()>
68 where
69 T: Clone + PartialEq + Send + Sync + 'static,
70 {
71 self.ensure_open()?;
72 if let Err(error) = self.working.validate_input_write::<T>(node) {
73 self.failed.get_or_insert_with(|| error.clone());
74 return Err(error);
75 }
76 self.staged_inputs.insert(node, boxed_input(value));
77 Ok(())
78 }
79
80 pub fn commit(&mut self) -> GraphResult<TransactionResult<C, O>> {
82 self.ensure_open()?;
83 let mut phase_trace = vec![TransactionPhase::StageOperations];
84 phase_trace.push(TransactionPhase::ValidateTransaction);
85 if let Some(error) = self.failed.clone() {
86 self.close();
87 return Err(error);
88 }
89
90 let mut changed_inputs = Vec::new();
91 for (node, staged) in &self.staged_inputs {
92 let changed = self
93 .working
94 .input_values
95 .get(node)
96 .is_none_or(|current| !current.equals(staged.as_ref()));
97 if changed || !self.options.skip_equal_inputs {
98 changed_inputs.push(*node);
99 }
100 }
101 let changed_input_set = changed_inputs.iter().copied().collect::<BTreeSet<_>>();
102 let staged_input_changes = self
103 .staged_inputs
104 .keys()
105 .map(|node| StagedInputChange {
106 node: *node,
107 outcome: if changed_input_set.contains(node) {
108 StagedInputOutcome::Changed
109 } else {
110 StagedInputOutcome::Unchanged
111 },
112 })
113 .collect::<Vec<_>>();
114
115 let next_revision = if changed_inputs.is_empty() && !self.graph_mutated {
116 self.graph.revision
117 } else {
118 self.graph.revision.next()
119 };
120
121 let mut audit_events = self.staged_events.clone();
122 for node in self.staged_inputs.keys() {
123 let event = if changed_inputs.contains(node) {
124 AuditEvent::InputChanged(*node)
125 } else {
126 AuditEvent::InputUnchanged(*node)
127 };
128 audit_events.push(event);
129 }
130
131 phase_trace.push(TransactionPhase::CommitCanonicalInputs);
132 for node in &changed_inputs {
133 if let Some(staged) = self.staged_inputs.get(node) {
134 self.working.input_values.insert(*node, staged.clone());
135 if let Some(meta) = self.working.nodes.get_mut(node) {
136 meta.mark_changed(next_revision);
137 }
138 }
139 }
140 for event in &self.staged_events {
141 if let AuditEvent::NodeCreated(node) = event
142 && let Some(meta) = self.working.nodes.get_mut(node)
143 {
144 meta.mark_created(next_revision);
145 }
146 }
147 let created_nodes: Vec<NodeId> = self
148 .staged_events
149 .iter()
150 .filter_map(|event| match event {
151 AuditEvent::NodeCreated(node) => Some(*node),
152 _ => None,
153 })
154 .collect();
155 let dirty_roots = stable_node_union(changed_inputs.iter().copied().chain(created_nodes));
156 let mut initial_changed = dirty_roots.clone();
157 phase_trace.push(TransactionPhase::MarkDirtyNodes);
158 phase_trace.push(TransactionPhase::RecomputeDerivedNodes);
159 let derived_trace = match self.working.recompute_dirty_derived(&initial_changed) {
160 Ok(trace) => trace,
161 Err(error) => {
162 self.close();
163 return Err(error);
164 }
165 };
166 let recomputed_derived_nodes = derived_trace.recomputed;
167 let changed_derived_nodes = derived_trace.changed;
168 for node in &changed_derived_nodes {
169 if let Some(meta) = self.working.nodes.get_mut(node) {
170 meta.mark_changed(next_revision);
171 }
172 audit_events.push(AuditEvent::DerivedChanged(*node));
173 }
174 initial_changed.extend(changed_derived_nodes.iter().copied());
175 phase_trace.push(TransactionPhase::RecomputeCollectionNodes);
176 let collection_recompute = match self.working.recompute_dirty_collections(&initial_changed)
177 {
178 Ok(trace) => trace,
179 Err(error) => {
180 self.close();
181 return Err(error);
182 }
183 };
184 let recomputed_collection_nodes = collection_recompute.recomputed;
185 let changed_collection_nodes = collection_recompute.changed;
186 for node in &changed_collection_nodes {
187 if let Some(meta) = self.working.nodes.get_mut(node) {
188 meta.mark_changed(next_revision);
189 }
190 audit_events.push(AuditEvent::CollectionChanged(*node));
191 }
192 phase_trace.push(TransactionPhase::ComputeStructuralDiffs);
193 self.working
194 .baseline_collection_diffs(&self.staged_resource_planner_collections);
195 let collection_diffs = self
196 .working
197 .collection_diffs
198 .iter()
199 .map(|(node, diff)| diff.trace(*node))
200 .collect::<Vec<_>>();
201 phase_trace.push(TransactionPhase::ResolveScopeLifecycle);
202 let closed_scopes: Vec<_> = self
203 .staged_events
204 .iter()
205 .filter_map(|event| match event {
206 AuditEvent::ScopeClosed(scope) => Some(*scope),
207 _ => None,
208 })
209 .collect();
210 let scope_events = scope_events(&audit_events);
211 phase_trace.push(TransactionPhase::ProduceResourcePlans);
212 let resource_plan = match self.working.produce_resource_plan(&closed_scopes) {
213 Ok(plan) => plan,
214 Err(error) => {
215 self.close();
216 return Err(error);
217 }
218 };
219 let mut output_changed = initial_changed.clone();
220 output_changed.extend(changed_collection_nodes.iter().copied());
221 phase_trace.push(TransactionPhase::ProduceOutputFrames);
222 let output_frames = match self.working.produce_output_frames(
223 &output_changed,
224 &closed_scopes,
225 &self.staged_output_rebaselines,
226 self.id,
227 next_revision,
228 ) {
229 Ok(frames) => frames,
230 Err(error) => {
231 self.close();
232 return Err(error);
233 }
234 };
235 let audit_log = audit_events
236 .into_iter()
237 .map(|event| AuditEntry {
238 transaction_id: self.id,
239 revision: next_revision,
240 event,
241 })
242 .collect();
243 phase_trace.push(TransactionPhase::CommitGraphRevision);
244 self.working.revision = next_revision;
245 self.working.next_node_id = self.graph.next_node_id;
246 self.working.next_scope_id = self.graph.next_scope_id;
247 self.working.next_output_key = self.graph.next_output_key;
248
249 phase_trace.push(TransactionPhase::ReturnTransactionResult);
250 let result = TransactionResult {
251 transaction_id: self.id,
252 revision: next_revision,
253 staged_input_changes,
254 changed_inputs,
255 dirty_roots,
256 recomputed_derived_nodes,
257 changed_derived_nodes,
258 recomputed_collection_nodes,
259 changed_collection_nodes,
260 collection_diffs,
261 resource_plan,
262 output_frames,
263 scope_events,
264 audit_log,
265 phase_trace,
266 invariant_results: Vec::new(),
267 };
268 self.working.record_transaction_audit(&result);
269 *self.graph = self.working.clone();
270 self.graph.transaction_open = true;
271 self.close();
272 Ok(result)
273 }
274
275 pub(crate) fn ensure_open(&self) -> GraphResult<()> {
276 if self.closed {
277 Err(GraphError::TransactionClosed(self.id))
278 } else {
279 Ok(())
280 }
281 }
282
283 fn close(&mut self) {
284 self.closed = true;
285 self.graph.transaction_open = false;
286 }
287}
288
289impl<C, O> Drop for Transaction<'_, C, O> {
290 fn drop(&mut self) {
291 if !self.closed {
292 self.graph.transaction_open = false;
293 }
294 }
295}