1use std::{collections::BTreeMap, vec};
5
6use custom_debug_derive::Debug;
7use linera_base::{
8 data_types::{ArithmeticError, Blob, Event, OracleResponse, Timestamp},
9 ensure,
10 identifiers::{BlobId, ChainId, ChannelFullName, StreamId},
11};
12
13use crate::{ExecutionError, OutgoingMessage};
14
15#[derive(Debug, Default)]
18pub struct TransactionTracker {
19 #[debug(skip_if = Option::is_none)]
20 replaying_oracle_responses: Option<vec::IntoIter<OracleResponse>>,
21 #[debug(skip_if = Vec::is_empty)]
22 oracle_responses: Vec<OracleResponse>,
23 #[debug(skip_if = Vec::is_empty)]
24 outgoing_messages: Vec<OutgoingMessage>,
25 local_time: Timestamp,
27 transaction_index: u32,
29 next_message_index: u32,
30 next_application_index: u32,
31 events: Vec<Event>,
33 blobs: BTreeMap<BlobId, Blob>,
35 subscribe: Vec<(ChannelFullName, ChainId)>,
37 unsubscribe: Vec<(ChannelFullName, ChainId)>,
39 operation_result: Option<Vec<u8>>,
41}
42
43#[derive(Debug, Default)]
45pub struct TransactionOutcome {
46 #[debug(skip_if = Vec::is_empty)]
47 pub oracle_responses: Vec<OracleResponse>,
48 #[debug(skip_if = Vec::is_empty)]
49 pub outgoing_messages: Vec<OutgoingMessage>,
50 pub next_message_index: u32,
51 pub next_application_index: u32,
52 pub events: Vec<Event>,
54 pub blobs: Vec<Blob>,
56 pub subscribe: Vec<(ChannelFullName, ChainId)>,
58 pub unsubscribe: Vec<(ChannelFullName, ChainId)>,
60 pub operation_result: Vec<u8>,
62}
63
64impl TransactionTracker {
65 pub fn new(
66 local_time: Timestamp,
67 transaction_index: u32,
68 next_message_index: u32,
69 next_application_index: u32,
70 oracle_responses: Option<Vec<OracleResponse>>,
71 ) -> Self {
72 TransactionTracker {
73 local_time,
74 transaction_index,
75 next_message_index,
76 next_application_index,
77 replaying_oracle_responses: oracle_responses.map(Vec::into_iter),
78 ..Self::default()
79 }
80 }
81
82 pub fn with_blobs(mut self, blobs: BTreeMap<BlobId, Blob>) -> Self {
83 self.blobs = blobs;
84 self
85 }
86
87 pub fn local_time(&self) -> Timestamp {
88 self.local_time
89 }
90
91 pub fn set_local_time(&mut self, local_time: Timestamp) {
92 self.local_time = local_time;
93 }
94
95 pub fn transaction_index(&self) -> u32 {
96 self.transaction_index
97 }
98
99 pub fn next_message_index(&self) -> u32 {
100 self.next_message_index
101 }
102
103 pub fn next_application_index(&mut self) -> u32 {
104 let index = self.next_application_index;
105 self.next_application_index += 1;
106 index
107 }
108
109 pub fn add_outgoing_message(
110 &mut self,
111 message: OutgoingMessage,
112 ) -> Result<(), ArithmeticError> {
113 self.next_message_index = self
114 .next_message_index
115 .checked_add(1)
116 .ok_or(ArithmeticError::Overflow)?;
117 self.outgoing_messages.push(message);
118 Ok(())
119 }
120
121 pub fn add_outgoing_messages(
122 &mut self,
123 messages: impl IntoIterator<Item = OutgoingMessage>,
124 ) -> Result<(), ArithmeticError> {
125 for message in messages {
126 self.add_outgoing_message(message)?;
127 }
128 Ok(())
129 }
130
131 pub fn add_event(&mut self, stream_id: StreamId, index: u32, value: Vec<u8>) {
132 self.events.push(Event {
133 stream_id,
134 index,
135 value,
136 });
137 }
138
139 pub fn add_created_blob(&mut self, blob: Blob) {
140 self.blobs.insert(blob.id(), blob);
141 }
142
143 pub fn created_blobs(&self) -> &BTreeMap<BlobId, Blob> {
144 &self.blobs
145 }
146
147 pub fn subscribe(&mut self, name: ChannelFullName, subscriber: ChainId) {
148 self.subscribe.push((name, subscriber));
149 }
150
151 pub fn unsubscribe(&mut self, name: ChannelFullName, subscriber: ChainId) {
152 self.unsubscribe.push((name, subscriber));
153 }
154
155 pub fn add_oracle_response(&mut self, oracle_response: OracleResponse) {
156 self.oracle_responses.push(oracle_response);
157 }
158
159 pub fn add_operation_result(&mut self, result: Option<Vec<u8>>) {
160 self.operation_result = result
161 }
162
163 pub fn replay_oracle_response(
166 &mut self,
167 oracle_response: OracleResponse,
168 ) -> Result<bool, ExecutionError> {
169 let replaying = if let Some(recorded_response) = self.next_replayed_oracle_response()? {
170 ensure!(
171 recorded_response == oracle_response,
172 ExecutionError::OracleResponseMismatch
173 );
174 true
175 } else {
176 false
177 };
178 self.add_oracle_response(oracle_response);
179 Ok(replaying)
180 }
181
182 pub fn next_replayed_oracle_response(
190 &mut self,
191 ) -> Result<Option<OracleResponse>, ExecutionError> {
192 let Some(responses) = &mut self.replaying_oracle_responses else {
193 return Ok(None); };
195 let response = responses
196 .next()
197 .ok_or_else(|| ExecutionError::MissingOracleResponse)?;
198 Ok(Some(response))
199 }
200
201 pub fn into_outcome(self) -> Result<TransactionOutcome, ExecutionError> {
202 let TransactionTracker {
203 replaying_oracle_responses,
204 oracle_responses,
205 outgoing_messages,
206 local_time: _,
207 transaction_index: _,
208 next_message_index,
209 next_application_index,
210 events,
211 blobs,
212 subscribe,
213 unsubscribe,
214 operation_result,
215 } = self;
216 if let Some(mut responses) = replaying_oracle_responses {
217 ensure!(
218 responses.next().is_none(),
219 ExecutionError::UnexpectedOracleResponse
220 );
221 }
222 Ok(TransactionOutcome {
223 outgoing_messages,
224 oracle_responses,
225 next_message_index,
226 next_application_index,
227 events,
228 blobs: blobs.into_values().collect(),
229 subscribe,
230 unsubscribe,
231 operation_result: operation_result.unwrap_or_default(),
232 })
233 }
234}
235
236#[cfg(with_testing)]
237impl TransactionTracker {
238 pub fn new_replaying(oracle_responses: Vec<OracleResponse>) -> Self {
241 TransactionTracker::new(Timestamp::from(0), 0, 0, 0, Some(oracle_responses))
242 }
243
244 pub fn new_replaying_blobs<T>(blob_ids: T) -> Self
247 where
248 T: IntoIterator,
249 T::Item: std::borrow::Borrow<BlobId>,
250 {
251 use std::borrow::Borrow;
252
253 let oracle_responses = blob_ids
254 .into_iter()
255 .map(|blob_id| OracleResponse::Blob(*blob_id.borrow()))
256 .collect();
257 TransactionTracker::new_replaying(oracle_responses)
258 }
259}