1use crate::fuel_core_graphql_api::database::ReadView;
2use fuel_core_storage::{
3 Error as StorageError,
4 Result as StorageResult,
5 StorageAsRef,
6 iter::{
7 BoxedIter,
8 IterDirection,
9 },
10 not_found,
11 tables::Messages,
12};
13use fuel_core_types::{
14 blockchain::block::CompressedBlock,
15 entities::relayer::message::{
16 MerkleProof,
17 Message,
18 MessageProof,
19 MessageStatus,
20 },
21 fuel_merkle::binary::in_memory::MerkleTree,
22 fuel_tx::{
23 Receipt,
24 TxId,
25 input::message::compute_message_id,
26 },
27 fuel_types::{
28 Address,
29 BlockHeight,
30 Bytes32,
31 MessageId,
32 Nonce,
33 },
34 services::transaction_status::TransactionExecutionStatus,
35};
36use futures::{
37 Stream,
38 StreamExt,
39 TryStreamExt,
40};
41use itertools::Itertools;
42use std::borrow::Cow;
43
44#[cfg(test)]
45mod test;
46
47pub trait MessageQueryData: Send + Sync {
48 fn message(&self, message_id: &Nonce) -> StorageResult<Message>;
49
50 fn owned_message_ids(
51 &self,
52 owner: &Address,
53 start_message_id: Option<Nonce>,
54 direction: IterDirection,
55 ) -> BoxedIter<'_, StorageResult<Nonce>>;
56
57 fn owned_messages(
58 &self,
59 owner: &Address,
60 start_message_id: Option<Nonce>,
61 direction: IterDirection,
62 ) -> BoxedIter<'_, StorageResult<Message>>;
63
64 fn all_messages(
65 &self,
66 start_message_id: Option<Nonce>,
67 direction: IterDirection,
68 ) -> BoxedIter<'_, StorageResult<Message>>;
69}
70
71impl ReadView {
72 pub fn message(&self, id: &Nonce) -> StorageResult<Message> {
73 self.on_chain
74 .as_ref()
75 .storage::<Messages>()
76 .get(id)?
77 .ok_or(not_found!(Messages))
78 .map(Cow::into_owned)
79 }
80
81 pub async fn messages(
82 &self,
83 ids: Vec<Nonce>,
84 ) -> impl Iterator<Item = StorageResult<Message>> + '_ {
85 let messages = ids.into_iter().map(|id| self.message(&id));
88 tokio::task::yield_now().await;
90 messages
91 }
92
93 pub fn owned_messages<'a>(
94 &'a self,
95 owner: &'a Address,
96 start_message_id: Option<Nonce>,
97 direction: IterDirection,
98 ) -> impl Stream<Item = StorageResult<Message>> + 'a {
99 self.owned_message_ids(owner, start_message_id, direction)
100 .chunks(self.batch_size)
101 .map(|chunk| {
102 let chunk = chunk.into_iter().try_collect::<_, Vec<_>, _>()?;
103 Ok(chunk)
104 })
105 .try_filter_map(move |chunk| async move {
106 let chunk = self.messages(chunk).await;
107 Ok::<_, StorageError>(Some(futures::stream::iter(chunk)))
108 })
109 .try_flatten()
110 }
111}
112
113pub trait MessageProofData {
115 fn block(&self, id: &BlockHeight) -> StorageResult<CompressedBlock>;
117
118 fn transaction_status(
120 &self,
121 transaction_id: &TxId,
122 ) -> StorageResult<TransactionExecutionStatus>;
123
124 fn block_history_proof(
127 &self,
128 message_block_height: &BlockHeight,
129 commit_block_height: &BlockHeight,
130 ) -> StorageResult<MerkleProof>;
131}
132
133impl MessageProofData for ReadView {
134 fn block(&self, id: &BlockHeight) -> StorageResult<CompressedBlock> {
135 self.block(id)
136 }
137
138 fn transaction_status(
139 &self,
140 transaction_id: &TxId,
141 ) -> StorageResult<TransactionExecutionStatus> {
142 self.tx_status(transaction_id)
143 }
144
145 fn block_history_proof(
146 &self,
147 message_block_height: &BlockHeight,
148 commit_block_height: &BlockHeight,
149 ) -> StorageResult<MerkleProof> {
150 self.block_history_proof(message_block_height, commit_block_height)
151 }
152}
153
154pub fn message_proof<T: MessageProofData + ?Sized>(
156 database: &T,
157 transaction_id: Bytes32,
158 desired_nonce: Nonce,
159 commit_block_height: BlockHeight,
160) -> StorageResult<MessageProof> {
161 let (message_block_height, (sender, recipient, nonce, amount, data)) = match database.transaction_status(&transaction_id) {
163 Ok(TransactionExecutionStatus::Success { block_height, receipts, .. }) => (
164 block_height,
165 receipts.iter()
166 .find_map(|r| match r {
167 Receipt::MessageOut {
168 sender,
169 recipient,
170 nonce,
171 amount,
172 data,
173 ..
174 } if r.nonce() == Some(&desired_nonce) => {
175 Some((*sender, *recipient, *nonce, *amount, data.clone()))
176 }
177 _ => None,
178 })
179 .ok_or::<StorageError>(
180 anyhow::anyhow!("Desired `nonce` missing in transaction receipts").into(),
181 )?
182 ),
183 Ok(TransactionExecutionStatus::Submitted { .. }) => {
184 return Err(anyhow::anyhow!(
185 "Unable to obtain the message block height. The transaction has not been processed yet"
186 )
187 .into())
188 }
189 Ok(TransactionExecutionStatus::SqueezedOut { reason }) => {
190 return Err(anyhow::anyhow!(
191 "Unable to obtain the message block height. The transaction was squeezed out: {reason}"
192 )
193 .into())
194 }
195 Ok(TransactionExecutionStatus::Failed { .. }) => {
196 return Err(anyhow::anyhow!(
197 "Unable to obtain the message block height. The transaction failed"
198 )
199 .into())
200 }
201 Err(err) => {
202 return Err(anyhow::anyhow!(
203 "Unable to obtain the message block height: {err}"
204 )
205 .into())
206 }
207 };
208 let Some(data) = data else {
209 return Err(anyhow::anyhow!("Output message doesn't contain any `data`").into())
210 };
211
212 let (message_block_header, message_block_txs) =
214 match database.block(&message_block_height) {
215 Ok(message_block) => message_block.into_inner(),
216 Err(err) => {
217 return Err(anyhow::anyhow!(
218 "Unable to get the message block from the database: {err}"
219 )
220 .into())
221 }
222 };
223
224 let message_id = compute_message_id(&sender, &recipient, &nonce, amount, &data);
225
226 let message_proof = message_receipts_proof(database, message_id, &message_block_txs)?;
227
228 let (commit_block_header, _) = match database.block(&commit_block_height) {
230 Ok(commit_block_header) => commit_block_header.into_inner(),
231 Err(err) => {
232 return Err(anyhow::anyhow!(
233 "Unable to get commit block header from database: {err}"
234 )
235 .into())
236 }
237 };
238
239 let Some(verifiable_commit_block_height) = commit_block_header.height().pred() else {
240 return Err(anyhow::anyhow!(
241 "Impossible to generate proof beyond the genesis block"
242 )
243 .into())
244 };
245 let block_proof = database.block_history_proof(
246 message_block_header.height(),
247 &verifiable_commit_block_height,
248 )?;
249
250 Ok(MessageProof {
251 message_proof,
252 block_proof,
253 message_block_header,
254 commit_block_header,
255 sender,
256 recipient,
257 nonce,
258 amount,
259 data: data.into_inner(),
260 })
261}
262
263fn message_receipts_proof<T: MessageProofData + ?Sized>(
264 database: &T,
265 message_id: MessageId,
266 message_block_txs: &[Bytes32],
267) -> StorageResult<MerkleProof> {
268 let leaves: Vec<_> = message_block_txs
270 .iter()
271 .filter_map(|id| match database.transaction_status(id) {
272 Ok(TransactionExecutionStatus::Success { receipts, .. }) => {
273 Some(Ok(receipts))
274 }
275 Ok(TransactionExecutionStatus::Submitted { .. })
276 | Ok(TransactionExecutionStatus::SqueezedOut { .. })
277 | Ok(TransactionExecutionStatus::Failed { .. }) => None,
278 Err(err) => Some(Err(err)),
279 })
280 .try_collect()?;
281 let leaves = leaves.iter()
282 .flat_map(|receipts|
285 receipts.iter().filter_map(|r| r.message_id()));
286
287 let mut tree = MerkleTree::new();
289
290 let mut proof_index = None;
291
292 for (index, id) in leaves.enumerate() {
293 if message_id == id {
295 proof_index = Some(index as u64);
297 }
298
299 tree.push(id.as_ref());
301 }
302
303 let Some(proof_index) = proof_index else {
305 return Err(anyhow::anyhow!(
306 "Unable to find the message receipt in the transaction to generate the proof"
307 )
308 .into())
309 };
310
311 let Some((_, proof_set)) = tree.prove(proof_index) else {
313 return Err(anyhow::anyhow!(
314 "Unable to generate the Merkle proof for the message from its receipts"
315 )
316 .into());
317 };
318
319 Ok(MerkleProof {
321 proof_set,
322 proof_index,
323 })
324}
325
326pub fn message_status(
327 database: &ReadView,
328 message_nonce: Nonce,
329) -> StorageResult<MessageStatus> {
330 if database.message_is_spent(&message_nonce)? {
331 Ok(MessageStatus::spent())
332 } else if database.message_exists(&message_nonce)? {
333 Ok(MessageStatus::unspent())
334 } else {
335 Ok(MessageStatus::not_found())
336 }
337}
338
339#[cfg(test)]
340mod tests {
341 use fuel_core_storage::not_found;
342 use fuel_core_types::{
343 blockchain::block::CompressedBlock,
344 entities::relayer::message::MerkleProof,
345 fuel_tx::{
346 Address,
347 Bytes32,
348 Receipt,
349 TxId,
350 },
351 fuel_types::{
352 BlockHeight,
353 Nonce,
354 },
355 services::transaction_status::TransactionExecutionStatus,
356 tai64::Tai64,
357 };
358 use std::collections::HashMap;
359
360 use super::{
361 MessageProofData,
362 message_proof,
363 };
364
365 pub struct FakeDB {
366 pub blocks: HashMap<BlockHeight, CompressedBlock>,
367 pub transaction_statuses: HashMap<TxId, TransactionExecutionStatus>,
368 pub receipts: HashMap<TxId, Vec<Receipt>>,
369 }
370
371 impl FakeDB {
372 fn new() -> Self {
373 Self {
374 blocks: HashMap::new(),
375 transaction_statuses: HashMap::new(),
376 receipts: HashMap::new(),
377 }
378 }
379
380 fn insert_block(&mut self, block_height: BlockHeight, block: CompressedBlock) {
381 self.blocks.insert(block_height, block);
382 }
383
384 fn insert_transaction_status(
385 &mut self,
386 transaction_id: TxId,
387 status: TransactionExecutionStatus,
388 ) {
389 self.transaction_statuses.insert(transaction_id, status);
390 }
391
392 fn insert_receipts(&mut self, transaction_id: TxId, receipts: Vec<Receipt>) {
393 self.receipts.insert(transaction_id, receipts);
394 }
395 }
396
397 impl MessageProofData for FakeDB {
398 fn block(&self, id: &BlockHeight) -> fuel_core_storage::Result<CompressedBlock> {
399 self.blocks.get(id).cloned().ok_or(not_found!("Block"))
400 }
401
402 fn transaction_status(
403 &self,
404 transaction_id: &TxId,
405 ) -> fuel_core_storage::Result<TransactionExecutionStatus> {
406 self.transaction_statuses
407 .get(transaction_id)
408 .cloned()
409 .ok_or(not_found!("Transaction status"))
410 }
411
412 fn block_history_proof(
413 &self,
414 _message_block_height: &BlockHeight,
415 _commit_block_height: &BlockHeight,
416 ) -> fuel_core_storage::Result<MerkleProof> {
417 Ok(MerkleProof::default())
419 }
420 }
421
422 #[test]
425 fn test_message_proof_ignore_failed() {
426 let mut database = FakeDB::new();
428
429 let mut block = CompressedBlock::default();
432 let block_height: BlockHeight = BlockHeight::new(1);
433 block.header_mut().set_block_height(block_height);
434 let valid_tx_id = Bytes32::new([1; 32]);
435 let mut valid_tx_receipts = vec![];
436 for i in 0..100 {
437 valid_tx_receipts.push(Receipt::MessageOut {
438 sender: Address::default(),
439 recipient: Address::default(),
440 amount: 0,
441 nonce: 0.into(),
442 len: 32,
443 digest: Bytes32::default(),
444 data: Some(vec![i; 32].into()),
445 });
446 }
447 block.transactions_mut().push(valid_tx_id);
448 database.insert_block(block_height, block.clone());
449 database.insert_transaction_status(
450 valid_tx_id,
451 TransactionExecutionStatus::Success {
452 time: Tai64::UNIX_EPOCH,
453 block_height,
454 receipts: std::sync::Arc::new(valid_tx_receipts.clone()),
455 total_fee: 0,
456 total_gas: 0,
457 result: None,
458 },
459 );
460 database.insert_receipts(valid_tx_id, valid_tx_receipts.clone());
461
462 let message_proof_valid_tx =
464 message_proof(&database, valid_tx_id, Nonce::default(), block_height)
465 .unwrap();
466
467 let invalid_tx_id = Bytes32::new([2; 32]);
469 block.transactions_mut().push(invalid_tx_id);
470 database.insert_block(block_height, block.clone());
471 let mut invalid_tx_receipts = vec![];
472 for i in 0..100 {
473 invalid_tx_receipts.push(Receipt::MessageOut {
474 sender: Address::default(),
475 recipient: Address::default(),
476 amount: 0,
477 nonce: 0.into(),
478 len: 33,
479 digest: Bytes32::default(),
480 data: Some(vec![i; 33].into()),
481 });
482 }
483 database.insert_transaction_status(
484 invalid_tx_id,
485 TransactionExecutionStatus::Failed {
486 time: Tai64::UNIX_EPOCH,
487 block_height,
488 result: None,
489 total_fee: 0,
490 total_gas: 0,
491 receipts: std::sync::Arc::new(invalid_tx_receipts.clone()),
492 },
493 );
494 database.insert_receipts(invalid_tx_id, invalid_tx_receipts.clone());
495
496 let message_proof_invalid_tx =
499 message_proof(&database, valid_tx_id, Nonce::default(), block_height)
500 .unwrap();
501
502 assert_eq!(message_proof_valid_tx, message_proof_invalid_tx);
505 }
506}