linera_execution/
transaction_tracker.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{BTreeMap, BTreeSet},
6    mem, vec,
7};
8
9use custom_debug_derive::Debug;
10use linera_base::{
11    data_types::{Blob, BlobContent, Event, OracleResponse, StreamUpdate, Timestamp},
12    ensure,
13    identifiers::{ApplicationId, BlobId, ChainId, StreamId},
14};
15
16use crate::{ExecutionError, OutgoingMessage};
17
18type AppStreamUpdates = BTreeMap<(ChainId, StreamId), (u32, u32)>;
19
20/// Tracks oracle responses and execution outcomes of an ongoing transaction execution, as well
21/// as replayed oracle responses.
22#[derive(Debug, Default)]
23pub struct TransactionTracker {
24    #[debug(skip_if = Option::is_none)]
25    replaying_oracle_responses: Option<vec::IntoIter<OracleResponse>>,
26    #[debug(skip_if = Vec::is_empty)]
27    oracle_responses: Vec<OracleResponse>,
28    #[debug(skip_if = Vec::is_empty)]
29    outgoing_messages: Vec<OutgoingMessage>,
30    /// The current local time.
31    local_time: Timestamp,
32    /// The index of the current transaction in the block.
33    transaction_index: u32,
34    next_application_index: u32,
35    next_chain_index: u32,
36    /// Events recorded by contracts' `emit` calls.
37    events: Vec<Event>,
38    /// Blobs created by contracts.
39    ///
40    /// As of right now, blobs created by the contracts are one of the following types:
41    /// - [`Data`]
42    /// - [`ContractBytecode`]
43    /// - [`ServiceBytecode`]
44    /// - [`EvmBytecode`]
45    /// - [`ApplicationDescription`]
46    /// - [`ChainDescription`]
47    blobs: BTreeMap<BlobId, BlobContent>,
48    /// The blobs created in the previous transactions.
49    previously_created_blobs: BTreeMap<BlobId, BlobContent>,
50    /// Operation result.
51    operation_result: Option<Vec<u8>>,
52    /// Streams that have been updated but not yet processed during this transaction.
53    streams_to_process: BTreeMap<ApplicationId, AppStreamUpdates>,
54    /// Published blobs this transaction refers to by [`BlobId`].
55    blobs_published: BTreeSet<BlobId>,
56}
57
58/// The [`TransactionTracker`] contents after a transaction has finished.
59#[derive(Debug, Default)]
60pub struct TransactionOutcome {
61    #[debug(skip_if = Vec::is_empty)]
62    pub oracle_responses: Vec<OracleResponse>,
63    #[debug(skip_if = Vec::is_empty)]
64    pub outgoing_messages: Vec<OutgoingMessage>,
65    pub next_application_index: u32,
66    pub next_chain_index: u32,
67    /// Events recorded by contracts' `emit` calls.
68    pub events: Vec<Event>,
69    /// Blobs created by contracts.
70    pub blobs: Vec<Blob>,
71    /// Operation result.
72    pub operation_result: Vec<u8>,
73    /// Blobs published by this transaction.
74    pub blobs_published: BTreeSet<BlobId>,
75}
76
77impl TransactionTracker {
78    pub fn new(
79        local_time: Timestamp,
80        transaction_index: u32,
81        next_application_index: u32,
82        next_chain_index: u32,
83        oracle_responses: Option<Vec<OracleResponse>>,
84        blobs: &[Vec<Blob>],
85    ) -> Self {
86        let mut previously_created_blobs = BTreeMap::new();
87        for tx_blobs in blobs {
88            for blob in tx_blobs {
89                previously_created_blobs.insert(blob.id(), blob.content().clone());
90            }
91        }
92        TransactionTracker {
93            local_time,
94            transaction_index,
95            next_application_index,
96            next_chain_index,
97            replaying_oracle_responses: oracle_responses.map(Vec::into_iter),
98            previously_created_blobs,
99            ..Self::default()
100        }
101    }
102
103    pub fn with_blobs(mut self, blobs: BTreeMap<BlobId, BlobContent>) -> Self {
104        self.blobs = blobs;
105        self
106    }
107
108    pub fn local_time(&self) -> Timestamp {
109        self.local_time
110    }
111
112    pub fn set_local_time(&mut self, local_time: Timestamp) {
113        self.local_time = local_time;
114    }
115
116    pub fn transaction_index(&self) -> u32 {
117        self.transaction_index
118    }
119
120    pub fn next_application_index(&mut self) -> u32 {
121        let index = self.next_application_index;
122        self.next_application_index += 1;
123        index
124    }
125
126    pub fn next_chain_index(&mut self) -> u32 {
127        let index = self.next_chain_index;
128        self.next_chain_index += 1;
129        index
130    }
131
132    pub fn add_outgoing_message(&mut self, message: OutgoingMessage) {
133        self.outgoing_messages.push(message);
134    }
135
136    pub fn add_outgoing_messages(&mut self, messages: impl IntoIterator<Item = OutgoingMessage>) {
137        for message in messages {
138            self.add_outgoing_message(message);
139        }
140    }
141
142    pub fn add_event(&mut self, stream_id: StreamId, index: u32, value: Vec<u8>) {
143        self.events.push(Event {
144            stream_id,
145            index,
146            value,
147        });
148    }
149
150    pub fn get_blob_content(&self, blob_id: &BlobId) -> Option<&BlobContent> {
151        if let Some(content) = self.blobs.get(blob_id) {
152            return Some(content);
153        }
154        self.previously_created_blobs.get(blob_id)
155    }
156
157    pub fn add_created_blob(&mut self, blob: Blob) {
158        self.blobs.insert(blob.id(), blob.into_content());
159    }
160
161    pub fn add_published_blob(&mut self, blob_id: BlobId) {
162        self.blobs_published.insert(blob_id);
163    }
164
165    pub fn created_blobs(&self) -> &BTreeMap<BlobId, BlobContent> {
166        &self.blobs
167    }
168
169    pub fn add_oracle_response(&mut self, oracle_response: OracleResponse) {
170        self.oracle_responses.push(oracle_response);
171    }
172
173    pub fn add_operation_result(&mut self, result: Option<Vec<u8>>) {
174        self.operation_result = result
175    }
176
177    pub fn add_stream_to_process(
178        &mut self,
179        application_id: ApplicationId,
180        chain_id: ChainId,
181        stream_id: StreamId,
182        previous_index: u32,
183        next_index: u32,
184    ) {
185        if next_index == previous_index {
186            return; // No new events in the stream.
187        }
188        self.streams_to_process
189            .entry(application_id)
190            .or_default()
191            .entry((chain_id, stream_id))
192            .and_modify(|(pi, ni)| {
193                *pi = (*pi).min(previous_index);
194                *ni = (*ni).max(next_index);
195            })
196            .or_insert_with(|| (previous_index, next_index));
197    }
198
199    pub fn remove_stream_to_process(
200        &mut self,
201        application_id: ApplicationId,
202        chain_id: ChainId,
203        stream_id: StreamId,
204    ) {
205        let Some(streams) = self.streams_to_process.get_mut(&application_id) else {
206            return;
207        };
208        if streams.remove(&(chain_id, stream_id)).is_some() && streams.is_empty() {
209            self.streams_to_process.remove(&application_id);
210        }
211    }
212
213    pub fn take_streams_to_process(&mut self) -> BTreeMap<ApplicationId, Vec<StreamUpdate>> {
214        mem::take(&mut self.streams_to_process)
215            .into_iter()
216            .map(|(app_id, streams)| {
217                let updates = streams
218                    .into_iter()
219                    .map(
220                        |((chain_id, stream_id), (previous_index, next_index))| StreamUpdate {
221                            chain_id,
222                            stream_id,
223                            previous_index,
224                            next_index,
225                        },
226                    )
227                    .collect();
228                (app_id, updates)
229            })
230            .collect()
231    }
232
233    /// Adds the oracle response to the record.
234    /// If replaying, it also checks that it matches the next replayed one and returns `true`.
235    pub fn replay_oracle_response(
236        &mut self,
237        oracle_response: OracleResponse,
238    ) -> Result<bool, ExecutionError> {
239        let replaying = if let Some(recorded_response) = self.next_replayed_oracle_response()? {
240            ensure!(
241                recorded_response == oracle_response,
242                ExecutionError::OracleResponseMismatch
243            );
244            true
245        } else {
246            false
247        };
248        self.add_oracle_response(oracle_response);
249        Ok(replaying)
250    }
251
252    /// If in replay mode, returns the next oracle response, or an error if it is missing.
253    ///
254    /// If not in replay mode, `None` is returned, and the caller must execute the actual oracle
255    /// to obtain the value.
256    ///
257    /// In both cases, the value (returned or obtained from the oracle) must be recorded using
258    /// `add_oracle_response`.
259    pub fn next_replayed_oracle_response(
260        &mut self,
261    ) -> Result<Option<OracleResponse>, ExecutionError> {
262        let Some(responses) = &mut self.replaying_oracle_responses else {
263            return Ok(None); // Not in replay mode.
264        };
265        let response = responses
266            .next()
267            .ok_or_else(|| ExecutionError::MissingOracleResponse)?;
268        Ok(Some(response))
269    }
270
271    pub fn into_outcome(self) -> Result<TransactionOutcome, ExecutionError> {
272        let TransactionTracker {
273            replaying_oracle_responses,
274            oracle_responses,
275            outgoing_messages,
276            local_time: _,
277            transaction_index: _,
278            next_application_index,
279            next_chain_index,
280            events,
281            blobs,
282            previously_created_blobs: _,
283            operation_result,
284            streams_to_process,
285            blobs_published,
286        } = self;
287        ensure!(
288            streams_to_process.is_empty(),
289            ExecutionError::UnprocessedStreams
290        );
291        if let Some(mut responses) = replaying_oracle_responses {
292            ensure!(
293                responses.next().is_none(),
294                ExecutionError::UnexpectedOracleResponse
295            );
296        }
297        let blobs = blobs
298            .into_iter()
299            .map(|(blob_id, content)| Blob::new_with_hash_unchecked(blob_id, content))
300            .collect::<Vec<_>>();
301        Ok(TransactionOutcome {
302            outgoing_messages,
303            oracle_responses,
304            next_application_index,
305            next_chain_index,
306            events,
307            blobs,
308            operation_result: operation_result.unwrap_or_default(),
309            blobs_published,
310        })
311    }
312}
313
314#[cfg(with_testing)]
315impl TransactionTracker {
316    /// Creates a new [`TransactionTracker`] for testing, with default values and the given
317    /// oracle responses.
318    pub fn new_replaying(oracle_responses: Vec<OracleResponse>) -> Self {
319        TransactionTracker::new(Timestamp::from(0), 0, 0, 0, Some(oracle_responses), &[])
320    }
321
322    /// Creates a new [`TransactionTracker`] for testing, with default values and oracle responses
323    /// for the given blobs.
324    pub fn new_replaying_blobs<T>(blob_ids: T) -> Self
325    where
326        T: IntoIterator,
327        T::Item: std::borrow::Borrow<BlobId>,
328    {
329        use std::borrow::Borrow;
330
331        let oracle_responses = blob_ids
332            .into_iter()
333            .map(|blob_id| OracleResponse::Blob(*blob_id.borrow()))
334            .collect();
335        TransactionTracker::new_replaying(oracle_responses)
336    }
337}