Skip to main content

trellis_core/
transaction.rs

1use crate::input::{StoredInput, boxed_input};
2use crate::transaction_trace_build::{scope_events, stable_node_union};
3use crate::{
4    Graph, GraphError, GraphResult, InputNode, NodeId, OutputKey, RebaselineReason, TransactionId,
5    transaction_types::{
6        AuditEntry, AuditEvent, StagedInputChange, StagedInputOutcome, TransactionOptions,
7        TransactionPhase, TransactionResult,
8    },
9};
10use std::collections::{BTreeMap, BTreeSet};
11
12/// Staged canonical input transaction.
13pub struct Transaction<'graph, C = ()> {
14    pub(crate) graph: &'graph mut Graph<C>,
15    pub(crate) working: Graph<C>,
16    id: TransactionId,
17    options: TransactionOptions,
18    staged_inputs: BTreeMap<NodeId, Box<dyn StoredInput>>,
19    pub(crate) staged_events: Vec<AuditEvent>,
20    pub(crate) staged_resource_planner_collections: Vec<NodeId>,
21    pub(crate) staged_output_rebaselines: BTreeMap<OutputKey, RebaselineReason>,
22    pub(crate) graph_mutated: bool,
23    pub(crate) failed: Option<GraphError>,
24    closed: bool,
25}
26
27impl<'graph, C> Transaction<'graph, C> {
28    pub(crate) fn new(
29        graph: &'graph mut Graph<C>,
30        id: TransactionId,
31        options: TransactionOptions,
32    ) -> Self {
33        let mut working = graph.clone();
34        working.transaction_open = false;
35        Self {
36            graph,
37            working,
38            id,
39            options,
40            staged_inputs: BTreeMap::new(),
41            staged_events: Vec::new(),
42            staged_resource_planner_collections: Vec::new(),
43            staged_output_rebaselines: BTreeMap::new(),
44            graph_mutated: false,
45            failed: None,
46            closed: false,
47        }
48    }
49
50    /// Returns this transaction's id.
51    pub fn id(&self) -> TransactionId {
52        self.id
53    }
54
55    /// Stages a typed canonical input change.
56    pub fn set_input<T>(&mut self, input: InputNode<T>, value: T) -> GraphResult<()>
57    where
58        T: Clone + PartialEq + Send + Sync + 'static,
59    {
60        self.set_input_by_id(input.id(), value)
61    }
62
63    /// Stages a canonical input change by node id.
64    pub fn set_input_by_id<T>(&mut self, node: NodeId, value: T) -> GraphResult<()>
65    where
66        T: Clone + PartialEq + Send + Sync + 'static,
67    {
68        self.ensure_open()?;
69        if let Err(error) = self.working.validate_input_write::<T>(node) {
70            self.failed.get_or_insert_with(|| error.clone());
71            return Err(error);
72        }
73        self.staged_inputs.insert(node, boxed_input(value));
74        Ok(())
75    }
76
77    /// Commits staged input changes atomically.
78    pub fn commit(&mut self) -> GraphResult<TransactionResult<C>> {
79        self.ensure_open()?;
80        let mut phase_trace = vec![TransactionPhase::StageOperations];
81        phase_trace.push(TransactionPhase::ValidateTransaction);
82        if let Some(error) = self.failed.clone() {
83            self.close();
84            return Err(error);
85        }
86
87        let mut changed_inputs = Vec::new();
88        for (node, staged) in &self.staged_inputs {
89            let changed = self
90                .working
91                .input_values
92                .get(node)
93                .is_none_or(|current| !current.equals(staged.as_ref()));
94            if changed || !self.options.skip_equal_inputs {
95                changed_inputs.push(*node);
96            }
97        }
98        let changed_input_set = changed_inputs.iter().copied().collect::<BTreeSet<_>>();
99        let staged_input_changes = self
100            .staged_inputs
101            .keys()
102            .map(|node| StagedInputChange {
103                node: *node,
104                outcome: if changed_input_set.contains(node) {
105                    StagedInputOutcome::Changed
106                } else {
107                    StagedInputOutcome::Unchanged
108                },
109            })
110            .collect::<Vec<_>>();
111
112        let next_revision = if changed_inputs.is_empty() && !self.graph_mutated {
113            self.graph.revision
114        } else {
115            self.graph.revision.next()
116        };
117
118        let mut audit_events = self.staged_events.clone();
119        for node in self.staged_inputs.keys() {
120            let event = if changed_input_set.contains(node) {
121                AuditEvent::InputChanged(*node)
122            } else {
123                AuditEvent::InputUnchanged(*node)
124            };
125            audit_events.push(event);
126        }
127
128        phase_trace.push(TransactionPhase::CommitCanonicalInputs);
129        for node in &changed_inputs {
130            if let Some(staged) = self.staged_inputs.get(node) {
131                self.working.input_values.insert(*node, staged.clone());
132                if let Some(meta) = self.working.nodes.get_mut(node) {
133                    meta.mark_changed(next_revision);
134                }
135            }
136        }
137        for event in &self.staged_events {
138            if let AuditEvent::NodeCreated(node) = event
139                && let Some(meta) = self.working.nodes.get_mut(node)
140            {
141                meta.mark_created(next_revision);
142            }
143        }
144        let created_nodes: Vec<NodeId> = self
145            .staged_events
146            .iter()
147            .filter_map(|event| match event {
148                AuditEvent::NodeCreated(node) => Some(*node),
149                _ => None,
150            })
151            .collect();
152        let dirty_roots = stable_node_union(changed_inputs.iter().copied().chain(created_nodes));
153        let mut initial_changed = dirty_roots.clone();
154        phase_trace.push(TransactionPhase::MarkDirtyNodes);
155        phase_trace.push(TransactionPhase::RecomputeDerivedNodes);
156        let derived_trace = match self.working.recompute_dirty_derived(&initial_changed) {
157            Ok(trace) => trace,
158            Err(error) => {
159                self.close();
160                return Err(error);
161            }
162        };
163        let recomputed_derived_nodes = derived_trace.recomputed;
164        let changed_derived_nodes = derived_trace.changed;
165        for node in &changed_derived_nodes {
166            if let Some(meta) = self.working.nodes.get_mut(node) {
167                meta.mark_changed(next_revision);
168            }
169            audit_events.push(AuditEvent::DerivedChanged(*node));
170        }
171        initial_changed.extend(changed_derived_nodes.iter().copied());
172        phase_trace.push(TransactionPhase::RecomputeCollectionNodes);
173        let collection_recompute = match self.working.recompute_dirty_collections(&initial_changed)
174        {
175            Ok(trace) => trace,
176            Err(error) => {
177                self.close();
178                return Err(error);
179            }
180        };
181        let recomputed_collection_nodes = collection_recompute.recomputed;
182        let changed_collection_nodes = collection_recompute.changed;
183        for node in &changed_collection_nodes {
184            if let Some(meta) = self.working.nodes.get_mut(node) {
185                meta.mark_changed(next_revision);
186            }
187            audit_events.push(AuditEvent::CollectionChanged(*node));
188        }
189        phase_trace.push(TransactionPhase::ComputeStructuralDiffs);
190        self.working
191            .baseline_collection_diffs(&self.staged_resource_planner_collections);
192        let collection_diffs = self
193            .working
194            .collection_diffs
195            .iter()
196            .map(|(node, diff)| diff.trace(*node))
197            .collect::<Vec<_>>();
198        phase_trace.push(TransactionPhase::ResolveScopeLifecycle);
199        let closed_scopes: Vec<_> = self
200            .staged_events
201            .iter()
202            .filter_map(|event| match event {
203                AuditEvent::ScopeClosed(scope) => Some(*scope),
204                _ => None,
205            })
206            .collect();
207        let scope_events = scope_events(&audit_events);
208        phase_trace.push(TransactionPhase::ProduceResourcePlans);
209        let resource_plan = match self.working.produce_resource_plan(&closed_scopes) {
210            Ok(plan) => plan,
211            Err(error) => {
212                self.close();
213                return Err(error);
214            }
215        };
216        let mut output_changed = initial_changed.clone();
217        output_changed.extend(changed_collection_nodes.iter().copied());
218        phase_trace.push(TransactionPhase::ProduceOutputFrames);
219        let output_frames = match self.working.produce_output_frames(
220            &output_changed,
221            &closed_scopes,
222            &self.staged_output_rebaselines,
223            self.id,
224            next_revision,
225        ) {
226            Ok(frames) => frames,
227            Err(error) => {
228                self.close();
229                return Err(error);
230            }
231        };
232        let audit_log = audit_events
233            .into_iter()
234            .map(|event| AuditEntry {
235                transaction_id: self.id,
236                revision: next_revision,
237                event,
238            })
239            .collect();
240        phase_trace.push(TransactionPhase::CommitGraphRevision);
241        self.working.revision = next_revision;
242        self.working.next_node_id = self.graph.next_node_id;
243        self.working.next_scope_id = self.graph.next_scope_id;
244        self.working.next_output_key = self.graph.next_output_key;
245
246        phase_trace.push(TransactionPhase::ReturnTransactionResult);
247        let result = TransactionResult {
248            transaction_id: self.id,
249            revision: next_revision,
250            staged_input_changes,
251            changed_inputs,
252            dirty_roots,
253            recomputed_derived_nodes,
254            changed_derived_nodes,
255            recomputed_collection_nodes,
256            changed_collection_nodes,
257            collection_diffs,
258            resource_plan,
259            output_frames,
260            scope_events,
261            audit_log,
262            phase_trace,
263            invariant_results: Vec::new(),
264        };
265        self.working
266            .record_transaction_audit(&result, self.options.audit_explanations);
267        self.working.reclaim_closed_scopes(&closed_scopes);
268        std::mem::swap(self.graph, &mut self.working);
269        self.close();
270        Ok(result)
271    }
272
273    pub(crate) fn ensure_open(&self) -> GraphResult<()> {
274        if self.closed {
275            Err(GraphError::TransactionClosed(self.id))
276        } else {
277            Ok(())
278        }
279    }
280
281    fn close(&mut self) {
282        self.closed = true;
283        self.graph.transaction_open = false;
284    }
285}
286
287impl<C> Drop for Transaction<'_, C> {
288    fn drop(&mut self) {
289        if !self.closed {
290            self.graph.transaction_open = false;
291        }
292    }
293}