1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
use alloy::primitives::U256;
use anyhow::Result;
use hdp_primitives::{
    processed_types::{
        header::ProcessedHeader, receipt::ProcessedReceipt, transaction::ProcessedTransaction,
    },
    task::datalake::{
        transactions::{TransactionsCollection, TransactionsInBlockDatalake},
        DatalakeField,
    },
};

use hdp_provider::evm::provider::EvmProvider;
use std::collections::HashSet;

use super::{FetchError, Fetchable, FetchedDatalake};

impl Fetchable for TransactionsInBlockDatalake {
    async fn fetch(&self, provider: EvmProvider) -> Result<FetchedDatalake, FetchError> {
        let mut aggregation_set: Vec<U256> = Vec::new();

        let (mmr_metas, headers_proofs) = provider
            .get_range_of_header_proofs(self.target_block, self.target_block, self.increment)
            .await?;

        let mut headers: HashSet<ProcessedHeader> = HashSet::new();
        let mut transactions: HashSet<ProcessedTransaction> = HashSet::new();
        let mut transaction_receipts: HashSet<ProcessedReceipt> = HashSet::new();
        let fetched_block = headers_proofs.get(&self.target_block).unwrap();

        headers.insert(ProcessedHeader::new(
            fetched_block.rlp_block_header.clone(),
            fetched_block.element_index,
            fetched_block.siblings_hashes.clone(),
        ));

        match &self.sampled_property {
            TransactionsCollection::Transactions(property) => {
                for tx in provider
                    .get_tx_with_proof_from_block(
                        self.target_block,
                        self.start_index,
                        self.end_index,
                        self.increment,
                    )
                    .await?
                {
                    transactions.insert(ProcessedTransaction::new(
                        tx.tx_index,
                        tx.block_number,
                        tx.transaction_proof,
                    ));

                    // depends on datalake.included_types filter the value to be included in the aggregation set
                    if self.included_types.is_included(tx.tx_type) {
                        let value = property.decode_field_from_rlp(&tx.encoded_transaction);
                        aggregation_set.push(value);
                    }
                }
            }
            TransactionsCollection::TranasactionReceipts(property) => {
                for tx_receipt in provider
                    .get_tx_receipt_with_proof_from_block(
                        self.target_block,
                        self.start_index,
                        self.end_index,
                        self.increment,
                    )
                    .await?
                {
                    transaction_receipts.insert(ProcessedReceipt::new(
                        tx_receipt.tx_index,
                        tx_receipt.block_number,
                        tx_receipt.receipt_proof,
                    ));

                    // depends on datalake.included_types filter the value to be included in the aggregation set
                    if self.included_types.is_included(tx_receipt.tx_type) {
                        let value = property.decode_field_from_rlp(&tx_receipt.encoded_receipt);
                        aggregation_set.push(value);
                    }
                }
            }
        }

        Ok(FetchedDatalake {
            values: aggregation_set,
            headers,
            accounts: HashSet::new(),
            storages: HashSet::new(),
            transactions,
            transaction_receipts,
            mmr_metas,
        })
    }
}