1use crate::input::{StoredInput, boxed_input};
2use crate::transaction_trace_build::{scope_events, stable_node_union};
3use crate::{
4 Graph, GraphError, GraphResult, InputNode, NodeId, OutputKey, RebaselineReason, TransactionId,
5 transaction_types::{
6 AuditEntry, AuditEvent, StagedInputChange, StagedInputOutcome, TransactionOptions,
7 TransactionPhase, TransactionResult,
8 },
9};
10use std::collections::{BTreeMap, BTreeSet};
11
12pub struct Transaction<'graph, C = ()> {
14 pub(crate) graph: &'graph mut Graph<C>,
15 pub(crate) working: Graph<C>,
16 id: TransactionId,
17 options: TransactionOptions,
18 staged_inputs: BTreeMap<NodeId, Box<dyn StoredInput>>,
19 pub(crate) staged_events: Vec<AuditEvent>,
20 pub(crate) staged_resource_planner_collections: Vec<NodeId>,
21 pub(crate) staged_output_rebaselines: BTreeMap<OutputKey, RebaselineReason>,
22 pub(crate) graph_mutated: bool,
23 pub(crate) failed: Option<GraphError>,
24 closed: bool,
25}
26
27impl<'graph, C> Transaction<'graph, C> {
28 pub(crate) fn new(
29 graph: &'graph mut Graph<C>,
30 id: TransactionId,
31 options: TransactionOptions,
32 ) -> Self {
33 let mut working = graph.clone();
34 working.transaction_open = false;
35 Self {
36 graph,
37 working,
38 id,
39 options,
40 staged_inputs: BTreeMap::new(),
41 staged_events: Vec::new(),
42 staged_resource_planner_collections: Vec::new(),
43 staged_output_rebaselines: BTreeMap::new(),
44 graph_mutated: false,
45 failed: None,
46 closed: false,
47 }
48 }
49
50 pub fn id(&self) -> TransactionId {
52 self.id
53 }
54
55 pub fn set_input<T>(&mut self, input: InputNode<T>, value: T) -> GraphResult<()>
57 where
58 T: Clone + PartialEq + Send + Sync + 'static,
59 {
60 self.set_input_by_id(input.id(), value)
61 }
62
63 pub fn set_input_by_id<T>(&mut self, node: NodeId, value: T) -> GraphResult<()>
65 where
66 T: Clone + PartialEq + Send + Sync + 'static,
67 {
68 self.ensure_open()?;
69 if let Err(error) = self.working.validate_input_write::<T>(node) {
70 self.failed.get_or_insert_with(|| error.clone());
71 return Err(error);
72 }
73 self.staged_inputs.insert(node, boxed_input(value));
74 Ok(())
75 }
76
77 pub fn commit(&mut self) -> GraphResult<TransactionResult<C>> {
79 self.ensure_open()?;
80 let mut phase_trace = vec![TransactionPhase::StageOperations];
81 phase_trace.push(TransactionPhase::ValidateTransaction);
82 if let Some(error) = self.failed.clone() {
83 self.close();
84 return Err(error);
85 }
86
87 let mut changed_inputs = Vec::new();
88 for (node, staged) in &self.staged_inputs {
89 let changed = self
90 .working
91 .input_values
92 .get(node)
93 .is_none_or(|current| !current.equals(staged.as_ref()));
94 if changed || !self.options.skip_equal_inputs {
95 changed_inputs.push(*node);
96 }
97 }
98 let changed_input_set = changed_inputs.iter().copied().collect::<BTreeSet<_>>();
99 let staged_input_changes = self
100 .staged_inputs
101 .keys()
102 .map(|node| StagedInputChange {
103 node: *node,
104 outcome: if changed_input_set.contains(node) {
105 StagedInputOutcome::Changed
106 } else {
107 StagedInputOutcome::Unchanged
108 },
109 })
110 .collect::<Vec<_>>();
111
112 let next_revision = if changed_inputs.is_empty() && !self.graph_mutated {
113 self.graph.revision
114 } else {
115 self.graph.revision.next()
116 };
117
118 let mut audit_events = self.staged_events.clone();
119 for node in self.staged_inputs.keys() {
120 let event = if changed_input_set.contains(node) {
121 AuditEvent::InputChanged(*node)
122 } else {
123 AuditEvent::InputUnchanged(*node)
124 };
125 audit_events.push(event);
126 }
127
128 phase_trace.push(TransactionPhase::CommitCanonicalInputs);
129 for node in &changed_inputs {
130 if let Some(staged) = self.staged_inputs.get(node) {
131 self.working.input_values.insert(*node, staged.clone());
132 if let Some(meta) = self.working.nodes.get_mut(node) {
133 meta.mark_changed(next_revision);
134 }
135 }
136 }
137 for event in &self.staged_events {
138 if let AuditEvent::NodeCreated(node) = event
139 && let Some(meta) = self.working.nodes.get_mut(node)
140 {
141 meta.mark_created(next_revision);
142 }
143 }
144 let created_nodes: Vec<NodeId> = self
145 .staged_events
146 .iter()
147 .filter_map(|event| match event {
148 AuditEvent::NodeCreated(node) => Some(*node),
149 _ => None,
150 })
151 .collect();
152 let dirty_roots = stable_node_union(changed_inputs.iter().copied().chain(created_nodes));
153 let mut initial_changed = dirty_roots.clone();
154 phase_trace.push(TransactionPhase::MarkDirtyNodes);
155 phase_trace.push(TransactionPhase::RecomputeDerivedNodes);
156 let derived_trace = match self.working.recompute_dirty_derived(&initial_changed) {
157 Ok(trace) => trace,
158 Err(error) => {
159 self.close();
160 return Err(error);
161 }
162 };
163 let recomputed_derived_nodes = derived_trace.recomputed;
164 let changed_derived_nodes = derived_trace.changed;
165 for node in &changed_derived_nodes {
166 if let Some(meta) = self.working.nodes.get_mut(node) {
167 meta.mark_changed(next_revision);
168 }
169 audit_events.push(AuditEvent::DerivedChanged(*node));
170 }
171 initial_changed.extend(changed_derived_nodes.iter().copied());
172 phase_trace.push(TransactionPhase::RecomputeCollectionNodes);
173 let collection_recompute = match self.working.recompute_dirty_collections(&initial_changed)
174 {
175 Ok(trace) => trace,
176 Err(error) => {
177 self.close();
178 return Err(error);
179 }
180 };
181 let recomputed_collection_nodes = collection_recompute.recomputed;
182 let changed_collection_nodes = collection_recompute.changed;
183 for node in &changed_collection_nodes {
184 if let Some(meta) = self.working.nodes.get_mut(node) {
185 meta.mark_changed(next_revision);
186 }
187 audit_events.push(AuditEvent::CollectionChanged(*node));
188 }
189 phase_trace.push(TransactionPhase::ComputeStructuralDiffs);
190 self.working
191 .baseline_collection_diffs(&self.staged_resource_planner_collections);
192 let collection_diffs = self
193 .working
194 .collection_diffs
195 .iter()
196 .map(|(node, diff)| diff.trace(*node))
197 .collect::<Vec<_>>();
198 phase_trace.push(TransactionPhase::ResolveScopeLifecycle);
199 let closed_scopes: Vec<_> = self
200 .staged_events
201 .iter()
202 .filter_map(|event| match event {
203 AuditEvent::ScopeClosed(scope) => Some(*scope),
204 _ => None,
205 })
206 .collect();
207 let scope_events = scope_events(&audit_events);
208 phase_trace.push(TransactionPhase::ProduceResourcePlans);
209 let resource_plan = match self.working.produce_resource_plan(&closed_scopes) {
210 Ok(plan) => plan,
211 Err(error) => {
212 self.close();
213 return Err(error);
214 }
215 };
216 let mut output_changed = initial_changed.clone();
217 output_changed.extend(changed_collection_nodes.iter().copied());
218 phase_trace.push(TransactionPhase::ProduceOutputFrames);
219 let output_frames = match self.working.produce_output_frames(
220 &output_changed,
221 &closed_scopes,
222 &self.staged_output_rebaselines,
223 self.id,
224 next_revision,
225 ) {
226 Ok(frames) => frames,
227 Err(error) => {
228 self.close();
229 return Err(error);
230 }
231 };
232 let audit_log = audit_events
233 .into_iter()
234 .map(|event| AuditEntry {
235 transaction_id: self.id,
236 revision: next_revision,
237 event,
238 })
239 .collect();
240 phase_trace.push(TransactionPhase::CommitGraphRevision);
241 self.working.revision = next_revision;
242 self.working.next_node_id = self.graph.next_node_id;
243 self.working.next_scope_id = self.graph.next_scope_id;
244 self.working.next_output_key = self.graph.next_output_key;
245
246 phase_trace.push(TransactionPhase::ReturnTransactionResult);
247 let result = TransactionResult {
248 transaction_id: self.id,
249 revision: next_revision,
250 staged_input_changes,
251 changed_inputs,
252 dirty_roots,
253 recomputed_derived_nodes,
254 changed_derived_nodes,
255 recomputed_collection_nodes,
256 changed_collection_nodes,
257 collection_diffs,
258 resource_plan,
259 output_frames,
260 scope_events,
261 audit_log,
262 phase_trace,
263 invariant_results: Vec::new(),
264 };
265 self.working
266 .record_transaction_audit(&result, self.options.audit_explanations);
267 self.working.reclaim_closed_scopes(&closed_scopes);
268 std::mem::swap(self.graph, &mut self.working);
269 self.close();
270 Ok(result)
271 }
272
273 pub(crate) fn ensure_open(&self) -> GraphResult<()> {
274 if self.closed {
275 Err(GraphError::TransactionClosed(self.id))
276 } else {
277 Ok(())
278 }
279 }
280
281 fn close(&mut self) {
282 self.closed = true;
283 self.graph.transaction_open = false;
284 }
285}
286
287impl<C> Drop for Transaction<'_, C> {
288 fn drop(&mut self) {
289 if !self.closed {
290 self.graph.transaction_open = false;
291 }
292 }
293}