linera_execution/
transaction_tracker.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{collections::BTreeMap, vec};
5
6use custom_debug_derive::Debug;
7use linera_base::{
8    data_types::{ArithmeticError, Blob, Event, OracleResponse, Timestamp},
9    ensure,
10    identifiers::{BlobId, ChainId, ChannelFullName, StreamId},
11};
12
13use crate::{ExecutionError, OutgoingMessage};
14
15/// Tracks oracle responses and execution outcomes of an ongoing transaction execution, as well
16/// as replayed oracle responses.
17#[derive(Debug, Default)]
18pub struct TransactionTracker {
19    #[debug(skip_if = Option::is_none)]
20    replaying_oracle_responses: Option<vec::IntoIter<OracleResponse>>,
21    #[debug(skip_if = Vec::is_empty)]
22    oracle_responses: Vec<OracleResponse>,
23    #[debug(skip_if = Vec::is_empty)]
24    outgoing_messages: Vec<OutgoingMessage>,
25    /// The current local time.
26    local_time: Timestamp,
27    /// The index of the current transaction in the block.
28    transaction_index: u32,
29    next_message_index: u32,
30    next_application_index: u32,
31    /// Events recorded by contracts' `emit` calls.
32    events: Vec<Event>,
33    /// Blobs created by contracts.
34    blobs: BTreeMap<BlobId, Blob>,
35    /// Subscribe chains to channels.
36    subscribe: Vec<(ChannelFullName, ChainId)>,
37    /// Unsubscribe chains from channels.
38    unsubscribe: Vec<(ChannelFullName, ChainId)>,
39    /// Operation result.
40    operation_result: Option<Vec<u8>>,
41}
42
43/// The [`TransactionTracker`] contents after a transaction has finished.
44#[derive(Debug, Default)]
45pub struct TransactionOutcome {
46    #[debug(skip_if = Vec::is_empty)]
47    pub oracle_responses: Vec<OracleResponse>,
48    #[debug(skip_if = Vec::is_empty)]
49    pub outgoing_messages: Vec<OutgoingMessage>,
50    pub next_message_index: u32,
51    pub next_application_index: u32,
52    /// Events recorded by contracts' `emit` calls.
53    pub events: Vec<Event>,
54    /// Blobs created by contracts.
55    pub blobs: Vec<Blob>,
56    /// Subscribe chains to channels.
57    pub subscribe: Vec<(ChannelFullName, ChainId)>,
58    /// Unsubscribe chains from channels.
59    pub unsubscribe: Vec<(ChannelFullName, ChainId)>,
60    /// Operation result.
61    pub operation_result: Vec<u8>,
62}
63
64impl TransactionTracker {
65    pub fn new(
66        local_time: Timestamp,
67        transaction_index: u32,
68        next_message_index: u32,
69        next_application_index: u32,
70        oracle_responses: Option<Vec<OracleResponse>>,
71    ) -> Self {
72        TransactionTracker {
73            local_time,
74            transaction_index,
75            next_message_index,
76            next_application_index,
77            replaying_oracle_responses: oracle_responses.map(Vec::into_iter),
78            ..Self::default()
79        }
80    }
81
82    pub fn with_blobs(mut self, blobs: BTreeMap<BlobId, Blob>) -> Self {
83        self.blobs = blobs;
84        self
85    }
86
87    pub fn local_time(&self) -> Timestamp {
88        self.local_time
89    }
90
91    pub fn set_local_time(&mut self, local_time: Timestamp) {
92        self.local_time = local_time;
93    }
94
95    pub fn transaction_index(&self) -> u32 {
96        self.transaction_index
97    }
98
99    pub fn next_message_index(&self) -> u32 {
100        self.next_message_index
101    }
102
103    pub fn next_application_index(&mut self) -> u32 {
104        let index = self.next_application_index;
105        self.next_application_index += 1;
106        index
107    }
108
109    pub fn add_outgoing_message(
110        &mut self,
111        message: OutgoingMessage,
112    ) -> Result<(), ArithmeticError> {
113        self.next_message_index = self
114            .next_message_index
115            .checked_add(1)
116            .ok_or(ArithmeticError::Overflow)?;
117        self.outgoing_messages.push(message);
118        Ok(())
119    }
120
121    pub fn add_outgoing_messages(
122        &mut self,
123        messages: impl IntoIterator<Item = OutgoingMessage>,
124    ) -> Result<(), ArithmeticError> {
125        for message in messages {
126            self.add_outgoing_message(message)?;
127        }
128        Ok(())
129    }
130
131    pub fn add_event(&mut self, stream_id: StreamId, index: u32, value: Vec<u8>) {
132        self.events.push(Event {
133            stream_id,
134            index,
135            value,
136        });
137    }
138
139    pub fn add_created_blob(&mut self, blob: Blob) {
140        self.blobs.insert(blob.id(), blob);
141    }
142
143    pub fn created_blobs(&self) -> &BTreeMap<BlobId, Blob> {
144        &self.blobs
145    }
146
147    pub fn subscribe(&mut self, name: ChannelFullName, subscriber: ChainId) {
148        self.subscribe.push((name, subscriber));
149    }
150
151    pub fn unsubscribe(&mut self, name: ChannelFullName, subscriber: ChainId) {
152        self.unsubscribe.push((name, subscriber));
153    }
154
155    pub fn add_oracle_response(&mut self, oracle_response: OracleResponse) {
156        self.oracle_responses.push(oracle_response);
157    }
158
159    pub fn add_operation_result(&mut self, result: Option<Vec<u8>>) {
160        self.operation_result = result
161    }
162
163    /// Adds the oracle response to the record.
164    /// If replaying, it also checks that it matches the next replayed one and returns `true`.
165    pub fn replay_oracle_response(
166        &mut self,
167        oracle_response: OracleResponse,
168    ) -> Result<bool, ExecutionError> {
169        let replaying = if let Some(recorded_response) = self.next_replayed_oracle_response()? {
170            ensure!(
171                recorded_response == oracle_response,
172                ExecutionError::OracleResponseMismatch
173            );
174            true
175        } else {
176            false
177        };
178        self.add_oracle_response(oracle_response);
179        Ok(replaying)
180    }
181
182    /// If in replay mode, returns the next oracle response, or an error if it is missing.
183    ///
184    /// If not in replay mode, `None` is returned, and the caller must execute the actual oracle
185    /// to obtain the value.
186    ///
187    /// In both cases, the value (returned or obtained from the oracle) must be recorded using
188    /// `add_oracle_response`.
189    pub fn next_replayed_oracle_response(
190        &mut self,
191    ) -> Result<Option<OracleResponse>, ExecutionError> {
192        let Some(responses) = &mut self.replaying_oracle_responses else {
193            return Ok(None); // Not in replay mode.
194        };
195        let response = responses
196            .next()
197            .ok_or_else(|| ExecutionError::MissingOracleResponse)?;
198        Ok(Some(response))
199    }
200
201    pub fn into_outcome(self) -> Result<TransactionOutcome, ExecutionError> {
202        let TransactionTracker {
203            replaying_oracle_responses,
204            oracle_responses,
205            outgoing_messages,
206            local_time: _,
207            transaction_index: _,
208            next_message_index,
209            next_application_index,
210            events,
211            blobs,
212            subscribe,
213            unsubscribe,
214            operation_result,
215        } = self;
216        if let Some(mut responses) = replaying_oracle_responses {
217            ensure!(
218                responses.next().is_none(),
219                ExecutionError::UnexpectedOracleResponse
220            );
221        }
222        Ok(TransactionOutcome {
223            outgoing_messages,
224            oracle_responses,
225            next_message_index,
226            next_application_index,
227            events,
228            blobs: blobs.into_values().collect(),
229            subscribe,
230            unsubscribe,
231            operation_result: operation_result.unwrap_or_default(),
232        })
233    }
234}
235
236#[cfg(with_testing)]
237impl TransactionTracker {
238    /// Creates a new [`TransactionTracker`] for testing, with default values and the given
239    /// oracle responses.
240    pub fn new_replaying(oracle_responses: Vec<OracleResponse>) -> Self {
241        TransactionTracker::new(Timestamp::from(0), 0, 0, 0, Some(oracle_responses))
242    }
243
244    /// Creates a new [`TransactionTracker`] for testing, with default values and oracle responses
245    /// for the given blobs.
246    pub fn new_replaying_blobs<T>(blob_ids: T) -> Self
247    where
248        T: IntoIterator,
249        T::Item: std::borrow::Borrow<BlobId>,
250    {
251        use std::borrow::Borrow;
252
253        let oracle_responses = blob_ids
254            .into_iter()
255            .map(|blob_id| OracleResponse::Blob(*blob_id.borrow()))
256            .collect();
257        TransactionTracker::new_replaying(oracle_responses)
258    }
259}