fuel_core/query/
message.rs

1use crate::fuel_core_graphql_api::database::ReadView;
2use fuel_core_storage::{
3    Error as StorageError,
4    Result as StorageResult,
5    StorageAsRef,
6    iter::{
7        BoxedIter,
8        IterDirection,
9    },
10    not_found,
11    tables::Messages,
12};
13use fuel_core_types::{
14    blockchain::block::CompressedBlock,
15    entities::relayer::message::{
16        MerkleProof,
17        Message,
18        MessageProof,
19        MessageStatus,
20    },
21    fuel_merkle::binary::in_memory::MerkleTree,
22    fuel_tx::{
23        Receipt,
24        TxId,
25        input::message::compute_message_id,
26    },
27    fuel_types::{
28        Address,
29        BlockHeight,
30        Bytes32,
31        MessageId,
32        Nonce,
33    },
34    services::transaction_status::TransactionExecutionStatus,
35};
36use futures::{
37    Stream,
38    StreamExt,
39    TryStreamExt,
40};
41use itertools::Itertools;
42use std::borrow::Cow;
43
44#[cfg(test)]
45mod test;
46
47pub trait MessageQueryData: Send + Sync {
48    fn message(&self, message_id: &Nonce) -> StorageResult<Message>;
49
50    fn owned_message_ids(
51        &self,
52        owner: &Address,
53        start_message_id: Option<Nonce>,
54        direction: IterDirection,
55    ) -> BoxedIter<'_, StorageResult<Nonce>>;
56
57    fn owned_messages(
58        &self,
59        owner: &Address,
60        start_message_id: Option<Nonce>,
61        direction: IterDirection,
62    ) -> BoxedIter<'_, StorageResult<Message>>;
63
64    fn all_messages(
65        &self,
66        start_message_id: Option<Nonce>,
67        direction: IterDirection,
68    ) -> BoxedIter<'_, StorageResult<Message>>;
69}
70
71impl ReadView {
72    pub fn message(&self, id: &Nonce) -> StorageResult<Message> {
73        self.on_chain
74            .as_ref()
75            .storage::<Messages>()
76            .get(id)?
77            .ok_or(not_found!(Messages))
78            .map(Cow::into_owned)
79    }
80
81    pub async fn messages(
82        &self,
83        ids: Vec<Nonce>,
84    ) -> impl Iterator<Item = StorageResult<Message>> + '_ {
85        // TODO: Use multiget when it's implemented.
86        //  https://github.com/FuelLabs/fuel-core/issues/2344
87        let messages = ids.into_iter().map(|id| self.message(&id));
88        // Give a chance to other tasks to run.
89        tokio::task::yield_now().await;
90        messages
91    }
92
93    pub fn owned_messages<'a>(
94        &'a self,
95        owner: &'a Address,
96        start_message_id: Option<Nonce>,
97        direction: IterDirection,
98    ) -> impl Stream<Item = StorageResult<Message>> + 'a {
99        self.owned_message_ids(owner, start_message_id, direction)
100            .chunks(self.batch_size)
101            .map(|chunk| {
102                let chunk = chunk.into_iter().try_collect::<_, Vec<_>, _>()?;
103                Ok(chunk)
104            })
105            .try_filter_map(move |chunk| async move {
106                let chunk = self.messages(chunk).await;
107                Ok::<_, StorageError>(Some(futures::stream::iter(chunk)))
108            })
109            .try_flatten()
110    }
111}
112
113/// Trait that specifies all the data required by the output message query.
114pub trait MessageProofData {
115    /// Get the block.
116    fn block(&self, id: &BlockHeight) -> StorageResult<CompressedBlock>;
117
118    /// Get the status of a transaction.
119    fn transaction_status(
120        &self,
121        transaction_id: &TxId,
122    ) -> StorageResult<TransactionExecutionStatus>;
123
124    /// Gets the [`MerkleProof`] for the message block at `message_block_height` height
125    /// relatively to the commit block where message block <= commit block.
126    fn block_history_proof(
127        &self,
128        message_block_height: &BlockHeight,
129        commit_block_height: &BlockHeight,
130    ) -> StorageResult<MerkleProof>;
131}
132
133impl MessageProofData for ReadView {
134    fn block(&self, id: &BlockHeight) -> StorageResult<CompressedBlock> {
135        self.block(id)
136    }
137
138    fn transaction_status(
139        &self,
140        transaction_id: &TxId,
141    ) -> StorageResult<TransactionExecutionStatus> {
142        self.tx_status(transaction_id)
143    }
144
145    fn block_history_proof(
146        &self,
147        message_block_height: &BlockHeight,
148        commit_block_height: &BlockHeight,
149    ) -> StorageResult<MerkleProof> {
150        self.block_history_proof(message_block_height, commit_block_height)
151    }
152}
153
154/// Generate an output proof.
155pub fn message_proof<T: MessageProofData + ?Sized>(
156    database: &T,
157    transaction_id: Bytes32,
158    desired_nonce: Nonce,
159    commit_block_height: BlockHeight,
160) -> StorageResult<MessageProof> {
161    // Get the block id from the transaction status if it's ready.
162    let (message_block_height, (sender, recipient, nonce, amount, data)) = match database.transaction_status(&transaction_id) {
163        Ok(TransactionExecutionStatus::Success { block_height, receipts, .. }) => (
164            block_height,
165            receipts.iter()
166            .find_map(|r| match r {
167                Receipt::MessageOut {
168                    sender,
169                    recipient,
170                    nonce,
171                    amount,
172                    data,
173                    ..
174                } if r.nonce() == Some(&desired_nonce) => {
175                    Some((*sender, *recipient, *nonce, *amount, data.clone()))
176                }
177                _ => None,
178            })
179            .ok_or::<StorageError>(
180                anyhow::anyhow!("Desired `nonce` missing in transaction receipts").into(),
181            )?
182        ),
183        Ok(TransactionExecutionStatus::Submitted { .. }) => {
184            return Err(anyhow::anyhow!(
185                "Unable to obtain the message block height. The transaction has not been processed yet"
186            )
187            .into())
188        }
189        Ok(TransactionExecutionStatus::SqueezedOut { reason }) => {
190            return Err(anyhow::anyhow!(
191                "Unable to obtain the message block height. The transaction was squeezed out: {reason}"
192            )
193            .into())
194        }
195        Ok(TransactionExecutionStatus::Failed { .. }) => {
196            return Err(anyhow::anyhow!(
197                "Unable to obtain the message block height. The transaction failed"
198            )
199            .into())
200        }
201        Err(err) => {
202            return Err(anyhow::anyhow!(
203                "Unable to obtain the message block height: {err}"
204            )
205            .into())
206        }
207    };
208    let Some(data) = data else {
209        return Err(anyhow::anyhow!("Output message doesn't contain any `data`").into())
210    };
211
212    // Get the message fuel block header.
213    let (message_block_header, message_block_txs) =
214        match database.block(&message_block_height) {
215            Ok(message_block) => message_block.into_inner(),
216            Err(err) => {
217                return Err(anyhow::anyhow!(
218                    "Unable to get the message block from the database: {err}"
219                )
220                .into())
221            }
222        };
223
224    let message_id = compute_message_id(&sender, &recipient, &nonce, amount, &data);
225
226    let message_proof = message_receipts_proof(database, message_id, &message_block_txs)?;
227
228    // Get the commit fuel block header.
229    let (commit_block_header, _) = match database.block(&commit_block_height) {
230        Ok(commit_block_header) => commit_block_header.into_inner(),
231        Err(err) => {
232            return Err(anyhow::anyhow!(
233                "Unable to get commit block header from database: {err}"
234            )
235            .into())
236        }
237    };
238
239    let Some(verifiable_commit_block_height) = commit_block_header.height().pred() else {
240        return Err(anyhow::anyhow!(
241            "Impossible to generate proof beyond the genesis block"
242        )
243        .into())
244    };
245    let block_proof = database.block_history_proof(
246        message_block_header.height(),
247        &verifiable_commit_block_height,
248    )?;
249
250    Ok(MessageProof {
251        message_proof,
252        block_proof,
253        message_block_header,
254        commit_block_header,
255        sender,
256        recipient,
257        nonce,
258        amount,
259        data: data.into_inner(),
260    })
261}
262
263fn message_receipts_proof<T: MessageProofData + ?Sized>(
264    database: &T,
265    message_id: MessageId,
266    message_block_txs: &[Bytes32],
267) -> StorageResult<MerkleProof> {
268    // Get the message receipts from the block.
269    let leaves: Vec<_> = message_block_txs
270        .iter()
271        .filter_map(|id| match database.transaction_status(id) {
272            Ok(TransactionExecutionStatus::Success { receipts, .. }) => {
273                Some(Ok(receipts))
274            }
275            Ok(TransactionExecutionStatus::Submitted { .. })
276            | Ok(TransactionExecutionStatus::SqueezedOut { .. })
277            | Ok(TransactionExecutionStatus::Failed { .. }) => None,
278            Err(err) => Some(Err(err)),
279        })
280        .try_collect()?;
281    let leaves = leaves.iter()
282        // Flatten the receipts after filtering on output messages
283        // and mapping to message ids.
284        .flat_map(|receipts|
285            receipts.iter().filter_map(|r| r.message_id()));
286
287    // Build the merkle proof from the above iterator.
288    let mut tree = MerkleTree::new();
289
290    let mut proof_index = None;
291
292    for (index, id) in leaves.enumerate() {
293        // Check if this is the message id being proved.
294        if message_id == id {
295            // Save the index of this message to use as the proof index.
296            proof_index = Some(index as u64);
297        }
298
299        // Build the merkle tree.
300        tree.push(id.as_ref());
301    }
302
303    // Check if we found a leaf.
304    let Some(proof_index) = proof_index else {
305        return Err(anyhow::anyhow!(
306            "Unable to find the message receipt in the transaction to generate the proof"
307        )
308        .into())
309    };
310
311    // Get the proof set.
312    let Some((_, proof_set)) = tree.prove(proof_index) else {
313        return Err(anyhow::anyhow!(
314            "Unable to generate the Merkle proof for the message from its receipts"
315        )
316        .into());
317    };
318
319    // Return the proof.
320    Ok(MerkleProof {
321        proof_set,
322        proof_index,
323    })
324}
325
326pub fn message_status(
327    database: &ReadView,
328    message_nonce: Nonce,
329) -> StorageResult<MessageStatus> {
330    if database.message_is_spent(&message_nonce)? {
331        Ok(MessageStatus::spent())
332    } else if database.message_exists(&message_nonce)? {
333        Ok(MessageStatus::unspent())
334    } else {
335        Ok(MessageStatus::not_found())
336    }
337}
338
339#[cfg(test)]
340mod tests {
341    use fuel_core_storage::not_found;
342    use fuel_core_types::{
343        blockchain::block::CompressedBlock,
344        entities::relayer::message::MerkleProof,
345        fuel_tx::{
346            Address,
347            Bytes32,
348            Receipt,
349            TxId,
350        },
351        fuel_types::{
352            BlockHeight,
353            Nonce,
354        },
355        services::transaction_status::TransactionExecutionStatus,
356        tai64::Tai64,
357    };
358    use std::collections::HashMap;
359
360    use super::{
361        MessageProofData,
362        message_proof,
363    };
364
365    pub struct FakeDB {
366        pub blocks: HashMap<BlockHeight, CompressedBlock>,
367        pub transaction_statuses: HashMap<TxId, TransactionExecutionStatus>,
368        pub receipts: HashMap<TxId, Vec<Receipt>>,
369    }
370
371    impl FakeDB {
372        fn new() -> Self {
373            Self {
374                blocks: HashMap::new(),
375                transaction_statuses: HashMap::new(),
376                receipts: HashMap::new(),
377            }
378        }
379
380        fn insert_block(&mut self, block_height: BlockHeight, block: CompressedBlock) {
381            self.blocks.insert(block_height, block);
382        }
383
384        fn insert_transaction_status(
385            &mut self,
386            transaction_id: TxId,
387            status: TransactionExecutionStatus,
388        ) {
389            self.transaction_statuses.insert(transaction_id, status);
390        }
391
392        fn insert_receipts(&mut self, transaction_id: TxId, receipts: Vec<Receipt>) {
393            self.receipts.insert(transaction_id, receipts);
394        }
395    }
396
397    impl MessageProofData for FakeDB {
398        fn block(&self, id: &BlockHeight) -> fuel_core_storage::Result<CompressedBlock> {
399            self.blocks.get(id).cloned().ok_or(not_found!("Block"))
400        }
401
402        fn transaction_status(
403            &self,
404            transaction_id: &TxId,
405        ) -> fuel_core_storage::Result<TransactionExecutionStatus> {
406            self.transaction_statuses
407                .get(transaction_id)
408                .cloned()
409                .ok_or(not_found!("Transaction status"))
410        }
411
412        fn block_history_proof(
413            &self,
414            _message_block_height: &BlockHeight,
415            _commit_block_height: &BlockHeight,
416        ) -> fuel_core_storage::Result<MerkleProof> {
417            // Unused in current tests
418            Ok(MerkleProof::default())
419        }
420    }
421
422    // Test will try to get the message receipt proof with a block with only valid transactions
423    // Then add an invalid transaction and check if the proof is still the same (meaning the invalid transaction was ignored)
424    #[test]
425    fn test_message_proof_ignore_failed() {
426        // Create a fake database
427        let mut database = FakeDB::new();
428
429        // Given
430        // Create a block with a valid transaction and receipts
431        let mut block = CompressedBlock::default();
432        let block_height: BlockHeight = BlockHeight::new(1);
433        block.header_mut().set_block_height(block_height);
434        let valid_tx_id = Bytes32::new([1; 32]);
435        let mut valid_tx_receipts = vec![];
436        for i in 0..100 {
437            valid_tx_receipts.push(Receipt::MessageOut {
438                sender: Address::default(),
439                recipient: Address::default(),
440                amount: 0,
441                nonce: 0.into(),
442                len: 32,
443                digest: Bytes32::default(),
444                data: Some(vec![i; 32].into()),
445            });
446        }
447        block.transactions_mut().push(valid_tx_id);
448        database.insert_block(block_height, block.clone());
449        database.insert_transaction_status(
450            valid_tx_id,
451            TransactionExecutionStatus::Success {
452                time: Tai64::UNIX_EPOCH,
453                block_height,
454                receipts: std::sync::Arc::new(valid_tx_receipts.clone()),
455                total_fee: 0,
456                total_gas: 0,
457                result: None,
458            },
459        );
460        database.insert_receipts(valid_tx_id, valid_tx_receipts.clone());
461
462        // Get the message proof with the valid transaction
463        let message_proof_valid_tx =
464            message_proof(&database, valid_tx_id, Nonce::default(), block_height)
465                .unwrap();
466
467        // Add an invalid transaction with receipts to the block
468        let invalid_tx_id = Bytes32::new([2; 32]);
469        block.transactions_mut().push(invalid_tx_id);
470        database.insert_block(block_height, block.clone());
471        let mut invalid_tx_receipts = vec![];
472        for i in 0..100 {
473            invalid_tx_receipts.push(Receipt::MessageOut {
474                sender: Address::default(),
475                recipient: Address::default(),
476                amount: 0,
477                nonce: 0.into(),
478                len: 33,
479                digest: Bytes32::default(),
480                data: Some(vec![i; 33].into()),
481            });
482        }
483        database.insert_transaction_status(
484            invalid_tx_id,
485            TransactionExecutionStatus::Failed {
486                time: Tai64::UNIX_EPOCH,
487                block_height,
488                result: None,
489                total_fee: 0,
490                total_gas: 0,
491                receipts: std::sync::Arc::new(invalid_tx_receipts.clone()),
492            },
493        );
494        database.insert_receipts(invalid_tx_id, invalid_tx_receipts.clone());
495
496        // When
497        // Get the message proof with the same message id
498        let message_proof_invalid_tx =
499            message_proof(&database, valid_tx_id, Nonce::default(), block_height)
500                .unwrap();
501
502        // Then
503        // The proof should be the same because the invalid transaction was ignored
504        assert_eq!(message_proof_valid_tx, message_proof_invalid_tx);
505    }
506}