Skip to main content

trellis_core/
transaction.rs

1use crate::input::{StoredInput, boxed_input};
2use crate::transaction_trace_build::{scope_events, stable_node_union};
3use crate::{
4    Graph, GraphError, GraphResult, InputNode, NodeId, OutputKey, RebaselineReason, TransactionId,
5    transaction_types::{
6        AuditEntry, AuditEvent, StagedInputChange, StagedInputOutcome, TransactionOptions,
7        TransactionPhase, TransactionResult,
8    },
9};
10use std::collections::{BTreeMap, BTreeSet};
11
12/// Staged canonical input transaction.
13pub struct Transaction<'graph, C = (), O = ()> {
14    pub(crate) graph: &'graph mut Graph<C, O>,
15    pub(crate) working: Graph<C, O>,
16    id: TransactionId,
17    options: TransactionOptions,
18    staged_inputs: BTreeMap<NodeId, Box<dyn StoredInput>>,
19    pub(crate) staged_events: Vec<AuditEvent>,
20    pub(crate) staged_resource_planner_collections: Vec<NodeId>,
21    pub(crate) staged_output_rebaselines: BTreeMap<OutputKey, RebaselineReason>,
22    pub(crate) graph_mutated: bool,
23    pub(crate) failed: Option<GraphError>,
24    closed: bool,
25}
26
27impl<'graph, C, O> Transaction<'graph, C, O>
28where
29    O: Clone + PartialEq,
30{
31    pub(crate) fn new(
32        graph: &'graph mut Graph<C, O>,
33        id: TransactionId,
34        options: TransactionOptions,
35    ) -> Self {
36        let mut working = graph.clone();
37        working.transaction_open = false;
38        Self {
39            graph,
40            working,
41            id,
42            options,
43            staged_inputs: BTreeMap::new(),
44            staged_events: Vec::new(),
45            staged_resource_planner_collections: Vec::new(),
46            staged_output_rebaselines: BTreeMap::new(),
47            graph_mutated: false,
48            failed: None,
49            closed: false,
50        }
51    }
52
53    /// Returns this transaction's id.
54    pub fn id(&self) -> TransactionId {
55        self.id
56    }
57
58    /// Stages a typed canonical input change.
59    pub fn set_input<T>(&mut self, input: InputNode<T>, value: T) -> GraphResult<()>
60    where
61        T: Clone + PartialEq + Send + Sync + 'static,
62    {
63        self.set_input_by_id(input.id(), value)
64    }
65
66    /// Stages a canonical input change by node id.
67    pub fn set_input_by_id<T>(&mut self, node: NodeId, value: T) -> GraphResult<()>
68    where
69        T: Clone + PartialEq + Send + Sync + 'static,
70    {
71        self.ensure_open()?;
72        if let Err(error) = self.working.validate_input_write::<T>(node) {
73            self.failed.get_or_insert_with(|| error.clone());
74            return Err(error);
75        }
76        self.staged_inputs.insert(node, boxed_input(value));
77        Ok(())
78    }
79
80    /// Commits staged input changes atomically.
81    pub fn commit(&mut self) -> GraphResult<TransactionResult<C, O>> {
82        self.ensure_open()?;
83        let mut phase_trace = vec![TransactionPhase::StageOperations];
84        phase_trace.push(TransactionPhase::ValidateTransaction);
85        if let Some(error) = self.failed.clone() {
86            self.close();
87            return Err(error);
88        }
89
90        let mut changed_inputs = Vec::new();
91        for (node, staged) in &self.staged_inputs {
92            let changed = self
93                .working
94                .input_values
95                .get(node)
96                .is_none_or(|current| !current.equals(staged.as_ref()));
97            if changed || !self.options.skip_equal_inputs {
98                changed_inputs.push(*node);
99            }
100        }
101        let changed_input_set = changed_inputs.iter().copied().collect::<BTreeSet<_>>();
102        let staged_input_changes = self
103            .staged_inputs
104            .keys()
105            .map(|node| StagedInputChange {
106                node: *node,
107                outcome: if changed_input_set.contains(node) {
108                    StagedInputOutcome::Changed
109                } else {
110                    StagedInputOutcome::Unchanged
111                },
112            })
113            .collect::<Vec<_>>();
114
115        let next_revision = if changed_inputs.is_empty() && !self.graph_mutated {
116            self.graph.revision
117        } else {
118            self.graph.revision.next()
119        };
120
121        let mut audit_events = self.staged_events.clone();
122        for node in self.staged_inputs.keys() {
123            let event = if changed_inputs.contains(node) {
124                AuditEvent::InputChanged(*node)
125            } else {
126                AuditEvent::InputUnchanged(*node)
127            };
128            audit_events.push(event);
129        }
130
131        phase_trace.push(TransactionPhase::CommitCanonicalInputs);
132        for node in &changed_inputs {
133            if let Some(staged) = self.staged_inputs.get(node) {
134                self.working.input_values.insert(*node, staged.clone());
135                if let Some(meta) = self.working.nodes.get_mut(node) {
136                    meta.mark_changed(next_revision);
137                }
138            }
139        }
140        for event in &self.staged_events {
141            if let AuditEvent::NodeCreated(node) = event
142                && let Some(meta) = self.working.nodes.get_mut(node)
143            {
144                meta.mark_created(next_revision);
145            }
146        }
147        let created_nodes: Vec<NodeId> = self
148            .staged_events
149            .iter()
150            .filter_map(|event| match event {
151                AuditEvent::NodeCreated(node) => Some(*node),
152                _ => None,
153            })
154            .collect();
155        let dirty_roots = stable_node_union(changed_inputs.iter().copied().chain(created_nodes));
156        let mut initial_changed = dirty_roots.clone();
157        phase_trace.push(TransactionPhase::MarkDirtyNodes);
158        phase_trace.push(TransactionPhase::RecomputeDerivedNodes);
159        let derived_trace = match self.working.recompute_dirty_derived(&initial_changed) {
160            Ok(trace) => trace,
161            Err(error) => {
162                self.close();
163                return Err(error);
164            }
165        };
166        let recomputed_derived_nodes = derived_trace.recomputed;
167        let changed_derived_nodes = derived_trace.changed;
168        for node in &changed_derived_nodes {
169            if let Some(meta) = self.working.nodes.get_mut(node) {
170                meta.mark_changed(next_revision);
171            }
172            audit_events.push(AuditEvent::DerivedChanged(*node));
173        }
174        initial_changed.extend(changed_derived_nodes.iter().copied());
175        phase_trace.push(TransactionPhase::RecomputeCollectionNodes);
176        let collection_recompute = match self.working.recompute_dirty_collections(&initial_changed)
177        {
178            Ok(trace) => trace,
179            Err(error) => {
180                self.close();
181                return Err(error);
182            }
183        };
184        let recomputed_collection_nodes = collection_recompute.recomputed;
185        let changed_collection_nodes = collection_recompute.changed;
186        for node in &changed_collection_nodes {
187            if let Some(meta) = self.working.nodes.get_mut(node) {
188                meta.mark_changed(next_revision);
189            }
190            audit_events.push(AuditEvent::CollectionChanged(*node));
191        }
192        phase_trace.push(TransactionPhase::ComputeStructuralDiffs);
193        self.working
194            .baseline_collection_diffs(&self.staged_resource_planner_collections);
195        let collection_diffs = self
196            .working
197            .collection_diffs
198            .iter()
199            .map(|(node, diff)| diff.trace(*node))
200            .collect::<Vec<_>>();
201        phase_trace.push(TransactionPhase::ResolveScopeLifecycle);
202        let closed_scopes: Vec<_> = self
203            .staged_events
204            .iter()
205            .filter_map(|event| match event {
206                AuditEvent::ScopeClosed(scope) => Some(*scope),
207                _ => None,
208            })
209            .collect();
210        let scope_events = scope_events(&audit_events);
211        phase_trace.push(TransactionPhase::ProduceResourcePlans);
212        let resource_plan = match self.working.produce_resource_plan(&closed_scopes) {
213            Ok(plan) => plan,
214            Err(error) => {
215                self.close();
216                return Err(error);
217            }
218        };
219        let mut output_changed = initial_changed.clone();
220        output_changed.extend(changed_collection_nodes.iter().copied());
221        phase_trace.push(TransactionPhase::ProduceOutputFrames);
222        let output_frames = match self.working.produce_output_frames(
223            &output_changed,
224            &closed_scopes,
225            &self.staged_output_rebaselines,
226            self.id,
227            next_revision,
228        ) {
229            Ok(frames) => frames,
230            Err(error) => {
231                self.close();
232                return Err(error);
233            }
234        };
235        let audit_log = audit_events
236            .into_iter()
237            .map(|event| AuditEntry {
238                transaction_id: self.id,
239                revision: next_revision,
240                event,
241            })
242            .collect();
243        phase_trace.push(TransactionPhase::CommitGraphRevision);
244        self.working.revision = next_revision;
245        self.working.next_node_id = self.graph.next_node_id;
246        self.working.next_scope_id = self.graph.next_scope_id;
247        self.working.next_output_key = self.graph.next_output_key;
248
249        phase_trace.push(TransactionPhase::ReturnTransactionResult);
250        let result = TransactionResult {
251            transaction_id: self.id,
252            revision: next_revision,
253            staged_input_changes,
254            changed_inputs,
255            dirty_roots,
256            recomputed_derived_nodes,
257            changed_derived_nodes,
258            recomputed_collection_nodes,
259            changed_collection_nodes,
260            collection_diffs,
261            resource_plan,
262            output_frames,
263            scope_events,
264            audit_log,
265            phase_trace,
266            invariant_results: Vec::new(),
267        };
268        self.working.record_transaction_audit(&result);
269        *self.graph = self.working.clone();
270        self.graph.transaction_open = true;
271        self.close();
272        Ok(result)
273    }
274
275    pub(crate) fn ensure_open(&self) -> GraphResult<()> {
276        if self.closed {
277            Err(GraphError::TransactionClosed(self.id))
278        } else {
279            Ok(())
280        }
281    }
282
283    fn close(&mut self) {
284        self.closed = true;
285        self.graph.transaction_open = false;
286    }
287}
288
289impl<C, O> Drop for Transaction<'_, C, O> {
290    fn drop(&mut self) {
291        if !self.closed {
292            self.graph.transaction_open = false;
293        }
294    }
295}