1use std::{
5 collections::{BTreeMap, BTreeSet},
6 mem, vec,
7};
8
9use custom_debug_derive::Debug;
10use linera_base::{
11 data_types::{Blob, BlobContent, Event, OracleResponse, StreamUpdate, Timestamp},
12 ensure,
13 identifiers::{ApplicationId, BlobId, ChainId, StreamId},
14};
15
16use crate::{ExecutionError, OutgoingMessage};
17
18type AppStreamUpdates = BTreeMap<(ChainId, StreamId), (u32, u32)>;
19
20#[derive(Debug, Default)]
23pub struct TransactionTracker {
24 #[debug(skip_if = Option::is_none)]
25 replaying_oracle_responses: Option<vec::IntoIter<OracleResponse>>,
26 #[debug(skip_if = Vec::is_empty)]
27 oracle_responses: Vec<OracleResponse>,
28 #[debug(skip_if = Vec::is_empty)]
29 outgoing_messages: Vec<OutgoingMessage>,
30 local_time: Timestamp,
32 transaction_index: u32,
34 next_application_index: u32,
35 next_chain_index: u32,
36 events: Vec<Event>,
38 blobs: BTreeMap<BlobId, BlobContent>,
48 previously_created_blobs: BTreeMap<BlobId, BlobContent>,
50 operation_result: Option<Vec<u8>>,
52 streams_to_process: BTreeMap<ApplicationId, AppStreamUpdates>,
54 blobs_published: BTreeSet<BlobId>,
56}
57
58#[derive(Debug, Default)]
60pub struct TransactionOutcome {
61 #[debug(skip_if = Vec::is_empty)]
62 pub oracle_responses: Vec<OracleResponse>,
63 #[debug(skip_if = Vec::is_empty)]
64 pub outgoing_messages: Vec<OutgoingMessage>,
65 pub next_application_index: u32,
66 pub next_chain_index: u32,
67 pub events: Vec<Event>,
69 pub blobs: Vec<Blob>,
71 pub operation_result: Vec<u8>,
73 pub blobs_published: BTreeSet<BlobId>,
75}
76
77impl TransactionTracker {
78 pub fn new(
79 local_time: Timestamp,
80 transaction_index: u32,
81 next_application_index: u32,
82 next_chain_index: u32,
83 oracle_responses: Option<Vec<OracleResponse>>,
84 blobs: &[Vec<Blob>],
85 ) -> Self {
86 let mut previously_created_blobs = BTreeMap::new();
87 for tx_blobs in blobs {
88 for blob in tx_blobs {
89 previously_created_blobs.insert(blob.id(), blob.content().clone());
90 }
91 }
92 TransactionTracker {
93 local_time,
94 transaction_index,
95 next_application_index,
96 next_chain_index,
97 replaying_oracle_responses: oracle_responses.map(Vec::into_iter),
98 previously_created_blobs,
99 ..Self::default()
100 }
101 }
102
103 pub fn with_blobs(mut self, blobs: BTreeMap<BlobId, BlobContent>) -> Self {
104 self.blobs = blobs;
105 self
106 }
107
108 pub fn local_time(&self) -> Timestamp {
109 self.local_time
110 }
111
112 pub fn set_local_time(&mut self, local_time: Timestamp) {
113 self.local_time = local_time;
114 }
115
116 pub fn transaction_index(&self) -> u32 {
117 self.transaction_index
118 }
119
120 pub fn next_application_index(&mut self) -> u32 {
121 let index = self.next_application_index;
122 self.next_application_index += 1;
123 index
124 }
125
126 pub fn next_chain_index(&mut self) -> u32 {
127 let index = self.next_chain_index;
128 self.next_chain_index += 1;
129 index
130 }
131
132 pub fn add_outgoing_message(&mut self, message: OutgoingMessage) {
133 self.outgoing_messages.push(message);
134 }
135
136 pub fn add_outgoing_messages(&mut self, messages: impl IntoIterator<Item = OutgoingMessage>) {
137 for message in messages {
138 self.add_outgoing_message(message);
139 }
140 }
141
142 pub fn add_event(&mut self, stream_id: StreamId, index: u32, value: Vec<u8>) {
143 self.events.push(Event {
144 stream_id,
145 index,
146 value,
147 });
148 }
149
150 pub fn get_blob_content(&self, blob_id: &BlobId) -> Option<&BlobContent> {
151 if let Some(content) = self.blobs.get(blob_id) {
152 return Some(content);
153 }
154 self.previously_created_blobs.get(blob_id)
155 }
156
157 pub fn add_created_blob(&mut self, blob: Blob) {
158 self.blobs.insert(blob.id(), blob.into_content());
159 }
160
161 pub fn add_published_blob(&mut self, blob_id: BlobId) {
162 self.blobs_published.insert(blob_id);
163 }
164
165 pub fn created_blobs(&self) -> &BTreeMap<BlobId, BlobContent> {
166 &self.blobs
167 }
168
169 pub fn add_oracle_response(&mut self, oracle_response: OracleResponse) {
170 self.oracle_responses.push(oracle_response);
171 }
172
173 pub fn add_operation_result(&mut self, result: Option<Vec<u8>>) {
174 self.operation_result = result
175 }
176
177 pub fn add_stream_to_process(
178 &mut self,
179 application_id: ApplicationId,
180 chain_id: ChainId,
181 stream_id: StreamId,
182 previous_index: u32,
183 next_index: u32,
184 ) {
185 if next_index == previous_index {
186 return; }
188 self.streams_to_process
189 .entry(application_id)
190 .or_default()
191 .entry((chain_id, stream_id))
192 .and_modify(|(pi, ni)| {
193 *pi = (*pi).min(previous_index);
194 *ni = (*ni).max(next_index);
195 })
196 .or_insert_with(|| (previous_index, next_index));
197 }
198
199 pub fn remove_stream_to_process(
200 &mut self,
201 application_id: ApplicationId,
202 chain_id: ChainId,
203 stream_id: StreamId,
204 ) {
205 let Some(streams) = self.streams_to_process.get_mut(&application_id) else {
206 return;
207 };
208 if streams.remove(&(chain_id, stream_id)).is_some() && streams.is_empty() {
209 self.streams_to_process.remove(&application_id);
210 }
211 }
212
213 pub fn take_streams_to_process(&mut self) -> BTreeMap<ApplicationId, Vec<StreamUpdate>> {
214 mem::take(&mut self.streams_to_process)
215 .into_iter()
216 .map(|(app_id, streams)| {
217 let updates = streams
218 .into_iter()
219 .map(
220 |((chain_id, stream_id), (previous_index, next_index))| StreamUpdate {
221 chain_id,
222 stream_id,
223 previous_index,
224 next_index,
225 },
226 )
227 .collect();
228 (app_id, updates)
229 })
230 .collect()
231 }
232
233 pub fn replay_oracle_response(
236 &mut self,
237 oracle_response: OracleResponse,
238 ) -> Result<bool, ExecutionError> {
239 let replaying = if let Some(recorded_response) = self.next_replayed_oracle_response()? {
240 ensure!(
241 recorded_response == oracle_response,
242 ExecutionError::OracleResponseMismatch
243 );
244 true
245 } else {
246 false
247 };
248 self.add_oracle_response(oracle_response);
249 Ok(replaying)
250 }
251
252 pub fn next_replayed_oracle_response(
260 &mut self,
261 ) -> Result<Option<OracleResponse>, ExecutionError> {
262 let Some(responses) = &mut self.replaying_oracle_responses else {
263 return Ok(None); };
265 let response = responses
266 .next()
267 .ok_or_else(|| ExecutionError::MissingOracleResponse)?;
268 Ok(Some(response))
269 }
270
271 pub fn into_outcome(self) -> Result<TransactionOutcome, ExecutionError> {
272 let TransactionTracker {
273 replaying_oracle_responses,
274 oracle_responses,
275 outgoing_messages,
276 local_time: _,
277 transaction_index: _,
278 next_application_index,
279 next_chain_index,
280 events,
281 blobs,
282 previously_created_blobs: _,
283 operation_result,
284 streams_to_process,
285 blobs_published,
286 } = self;
287 ensure!(
288 streams_to_process.is_empty(),
289 ExecutionError::UnprocessedStreams
290 );
291 if let Some(mut responses) = replaying_oracle_responses {
292 ensure!(
293 responses.next().is_none(),
294 ExecutionError::UnexpectedOracleResponse
295 );
296 }
297 let blobs = blobs
298 .into_iter()
299 .map(|(blob_id, content)| Blob::new_with_hash_unchecked(blob_id, content))
300 .collect::<Vec<_>>();
301 Ok(TransactionOutcome {
302 outgoing_messages,
303 oracle_responses,
304 next_application_index,
305 next_chain_index,
306 events,
307 blobs,
308 operation_result: operation_result.unwrap_or_default(),
309 blobs_published,
310 })
311 }
312}
313
314#[cfg(with_testing)]
315impl TransactionTracker {
316 pub fn new_replaying(oracle_responses: Vec<OracleResponse>) -> Self {
319 TransactionTracker::new(Timestamp::from(0), 0, 0, 0, Some(oracle_responses), &[])
320 }
321
322 pub fn new_replaying_blobs<T>(blob_ids: T) -> Self
325 where
326 T: IntoIterator,
327 T::Item: std::borrow::Borrow<BlobId>,
328 {
329 use std::borrow::Borrow;
330
331 let oracle_responses = blob_ids
332 .into_iter()
333 .map(|blob_id| OracleResponse::Blob(*blob_id.borrow()))
334 .collect();
335 TransactionTracker::new_replaying(oracle_responses)
336 }
337}