Skip to main content

trellis_core/
transaction.rs

1use crate::input::{StoredInput, boxed_input};
2use crate::{
3    Graph, GraphError, GraphResult, InputNode, NodeId, OutputKey, RebaselineReason, TransactionId,
4    transaction_types::{
5        AuditEntry, AuditEvent, TransactionOptions, TransactionPhase, TransactionResult,
6    },
7};
8use std::collections::BTreeMap;
9
10/// Staged canonical input transaction.
11pub struct Transaction<'graph, C = (), O = ()> {
12    pub(crate) graph: &'graph mut Graph<C, O>,
13    pub(crate) working: Graph<C, O>,
14    id: TransactionId,
15    options: TransactionOptions,
16    staged_inputs: BTreeMap<NodeId, Box<dyn StoredInput>>,
17    pub(crate) staged_events: Vec<AuditEvent>,
18    pub(crate) staged_resource_planner_collections: Vec<NodeId>,
19    pub(crate) staged_output_rebaselines: BTreeMap<OutputKey, RebaselineReason>,
20    pub(crate) graph_mutated: bool,
21    pub(crate) failed: Option<GraphError>,
22    closed: bool,
23}
24
25impl<'graph, C, O> Transaction<'graph, C, O>
26where
27    O: Clone + PartialEq,
28{
29    pub(crate) fn new(
30        graph: &'graph mut Graph<C, O>,
31        id: TransactionId,
32        options: TransactionOptions,
33    ) -> Self {
34        let mut working = graph.clone();
35        working.transaction_open = false;
36        Self {
37            graph,
38            working,
39            id,
40            options,
41            staged_inputs: BTreeMap::new(),
42            staged_events: Vec::new(),
43            staged_resource_planner_collections: Vec::new(),
44            staged_output_rebaselines: BTreeMap::new(),
45            graph_mutated: false,
46            failed: None,
47            closed: false,
48        }
49    }
50
51    /// Returns this transaction's id.
52    pub fn id(&self) -> TransactionId {
53        self.id
54    }
55
56    /// Stages a typed canonical input change.
57    pub fn set_input<T>(&mut self, input: InputNode<T>, value: T) -> GraphResult<()>
58    where
59        T: Clone + PartialEq + 'static,
60    {
61        self.set_input_by_id(input.id(), value)
62    }
63
64    /// Stages a canonical input change by node id.
65    pub fn set_input_by_id<T>(&mut self, node: NodeId, value: T) -> GraphResult<()>
66    where
67        T: Clone + PartialEq + 'static,
68    {
69        self.ensure_open()?;
70        if let Err(error) = self.working.validate_input_write::<T>(node) {
71            self.failed.get_or_insert_with(|| error.clone());
72            return Err(error);
73        }
74        self.staged_inputs.insert(node, boxed_input(value));
75        Ok(())
76    }
77
78    /// Commits staged input changes atomically.
79    pub fn commit(&mut self) -> GraphResult<TransactionResult<C, O>> {
80        self.ensure_open()?;
81        let mut phase_trace = vec![TransactionPhase::StageOperations];
82        phase_trace.push(TransactionPhase::ValidateTransaction);
83        if let Some(error) = self.failed.clone() {
84            self.close();
85            return Err(error);
86        }
87
88        let mut changed_inputs = Vec::new();
89        for (node, staged) in &self.staged_inputs {
90            let changed = self
91                .working
92                .input_values
93                .get(node)
94                .is_none_or(|current| !current.equals(staged.as_ref()));
95            if changed || !self.options.skip_equal_inputs {
96                changed_inputs.push(*node);
97            }
98        }
99
100        let next_revision = if changed_inputs.is_empty() && !self.graph_mutated {
101            self.graph.revision
102        } else {
103            self.graph.revision.next()
104        };
105
106        let mut audit_events = self.staged_events.clone();
107        for node in self.staged_inputs.keys() {
108            let event = if changed_inputs.contains(node) {
109                AuditEvent::InputChanged(*node)
110            } else {
111                AuditEvent::InputUnchanged(*node)
112            };
113            audit_events.push(event);
114        }
115
116        phase_trace.push(TransactionPhase::CommitCanonicalInputs);
117        for node in &changed_inputs {
118            if let Some(staged) = self.staged_inputs.get(node) {
119                self.working.input_values.insert(*node, staged.clone());
120                if let Some(meta) = self.working.nodes.get_mut(node) {
121                    meta.mark_changed(next_revision);
122                }
123            }
124        }
125        for event in &self.staged_events {
126            if let AuditEvent::NodeCreated(node) = event
127                && let Some(meta) = self.working.nodes.get_mut(node)
128            {
129                meta.mark_created(next_revision);
130            }
131        }
132        let created_nodes: Vec<NodeId> = self
133            .staged_events
134            .iter()
135            .filter_map(|event| match event {
136                AuditEvent::NodeCreated(node) => Some(*node),
137                _ => None,
138            })
139            .collect();
140        let mut initial_changed = changed_inputs.clone();
141        initial_changed.extend(created_nodes);
142        phase_trace.push(TransactionPhase::MarkDirtyNodes);
143        phase_trace.push(TransactionPhase::RecomputeDerivedNodes);
144        let changed_derived_nodes = match self.working.recompute_dirty_derived(&initial_changed) {
145            Ok(nodes) => nodes,
146            Err(error) => {
147                self.close();
148                return Err(error);
149            }
150        };
151        for node in &changed_derived_nodes {
152            if let Some(meta) = self.working.nodes.get_mut(node) {
153                meta.mark_changed(next_revision);
154            }
155            audit_events.push(AuditEvent::DerivedChanged(*node));
156        }
157        initial_changed.extend(changed_derived_nodes.iter().copied());
158        phase_trace.push(TransactionPhase::RecomputeCollectionNodes);
159        let changed_collection_nodes =
160            match self.working.recompute_dirty_collections(&initial_changed) {
161                Ok(nodes) => nodes,
162                Err(error) => {
163                    self.close();
164                    return Err(error);
165                }
166            };
167        for node in &changed_collection_nodes {
168            if let Some(meta) = self.working.nodes.get_mut(node) {
169                meta.mark_changed(next_revision);
170            }
171            audit_events.push(AuditEvent::CollectionChanged(*node));
172        }
173        phase_trace.push(TransactionPhase::ComputeStructuralDiffs);
174        self.working
175            .baseline_collection_diffs(&self.staged_resource_planner_collections);
176        phase_trace.push(TransactionPhase::ResolveScopeLifecycle);
177        let closed_scopes: Vec<_> = self
178            .staged_events
179            .iter()
180            .filter_map(|event| match event {
181                AuditEvent::ScopeClosed(scope) => Some(*scope),
182                _ => None,
183            })
184            .collect();
185        phase_trace.push(TransactionPhase::ProduceResourcePlans);
186        let resource_plan = match self.working.produce_resource_plan(&closed_scopes) {
187            Ok(plan) => plan,
188            Err(error) => {
189                self.close();
190                return Err(error);
191            }
192        };
193        let mut output_changed = initial_changed.clone();
194        output_changed.extend(changed_collection_nodes.iter().copied());
195        phase_trace.push(TransactionPhase::ProduceOutputFrames);
196        let output_frames = match self.working.produce_output_frames(
197            &output_changed,
198            &closed_scopes,
199            &self.staged_output_rebaselines,
200            self.id,
201            next_revision,
202        ) {
203            Ok(frames) => frames,
204            Err(error) => {
205                self.close();
206                return Err(error);
207            }
208        };
209        let audit_log = audit_events
210            .into_iter()
211            .map(|event| AuditEntry {
212                transaction_id: self.id,
213                revision: next_revision,
214                event,
215            })
216            .collect();
217        phase_trace.push(TransactionPhase::CommitGraphRevision);
218        self.working.revision = next_revision;
219        self.working.next_node_id = self.graph.next_node_id;
220        self.working.next_scope_id = self.graph.next_scope_id;
221        self.working.next_output_key = self.graph.next_output_key;
222
223        phase_trace.push(TransactionPhase::ReturnTransactionResult);
224        let result = TransactionResult {
225            transaction_id: self.id,
226            revision: next_revision,
227            changed_inputs,
228            changed_derived_nodes,
229            changed_collection_nodes,
230            resource_plan,
231            output_frames,
232            audit_log,
233            phase_trace,
234        };
235        self.working.record_transaction_audit(&result);
236        *self.graph = self.working.clone();
237        self.graph.transaction_open = true;
238        self.close();
239        Ok(result)
240    }
241
242    pub(crate) fn ensure_open(&self) -> GraphResult<()> {
243        if self.closed {
244            Err(GraphError::TransactionClosed(self.id))
245        } else {
246            Ok(())
247        }
248    }
249
250    fn close(&mut self) {
251        self.closed = true;
252        self.graph.transaction_open = false;
253    }
254}
255
256impl<C, O> Drop for Transaction<'_, C, O> {
257    fn drop(&mut self) {
258        if !self.closed {
259            self.graph.transaction_open = false;
260        }
261    }
262}