use alloy::primitives::{B256, U256};
use anyhow::{bail, Result};
use hdp_primitives::{
datalake::{envelope::DatalakeEnvelope, task::DatalakeCompute},
processed_types::{
account::ProcessedAccount, header::ProcessedHeader, mmr::MMRMeta,
receipt::ProcessedReceipt, storage::ProcessedStorage, transaction::ProcessedTransaction,
},
};
use hdp_provider::evm::provider::{EvmProvider, EvmProviderConfig};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
use tokio::sync::RwLock;
use self::{block_sampled::compile_block_sampled_datalake, transactions::compile_tx_datalake};
pub mod block_sampled;
pub mod transactions;
pub struct DatalakeComputeCompilationResults {
pub pre_processable: bool,
pub commit_results_maps: HashMap<B256, U256>,
pub headers: HashSet<ProcessedHeader>,
pub accounts: HashSet<ProcessedAccount>,
pub storages: HashSet<ProcessedStorage>,
pub transactions: HashSet<ProcessedTransaction>,
pub transaction_receipts: HashSet<ProcessedReceipt>,
pub mmr_meta: MMRMeta,
}
impl DatalakeComputeCompilationResults {
#[allow(clippy::too_many_arguments)]
pub fn new(
pre_processable: bool,
commit_results_maps: HashMap<B256, U256>,
headers: HashSet<ProcessedHeader>,
accounts: HashSet<ProcessedAccount>,
storages: HashSet<ProcessedStorage>,
transactions: HashSet<ProcessedTransaction>,
transaction_receipts: HashSet<ProcessedReceipt>,
mmr_meta: MMRMeta,
) -> Self {
Self {
pre_processable,
commit_results_maps,
headers,
accounts,
storages,
transactions,
transaction_receipts,
mmr_meta,
}
}
}
pub struct DatalakeCompiler {
provider: Arc<RwLock<EvmProvider>>,
}
impl DatalakeCompiler {
pub fn new_from_config(config: EvmProviderConfig) -> Self {
let provider = EvmProvider::new(config);
Self {
provider: Arc::new(provider.into()),
}
}
pub async fn compile(
&self,
datalake_computes: &[DatalakeCompute],
) -> Result<DatalakeComputeCompilationResults> {
let mut commit_results_maps = HashMap::new();
let mut headers: HashSet<ProcessedHeader> = HashSet::new();
let mut accounts: HashSet<ProcessedAccount> = HashSet::new();
let mut storages: HashSet<ProcessedStorage> = HashSet::new();
let mut transactions: HashSet<ProcessedTransaction> = HashSet::new();
let mut transaction_receipts: HashSet<ProcessedReceipt> = HashSet::new();
let mut mmr = None;
let mut pre_processable = true;
for datalake_compute in datalake_computes {
let task_commitment = datalake_compute.commit();
let aggregation_fn = &datalake_compute.compute.aggregate_fn_id;
let fn_context = datalake_compute.compute.aggregate_fn_ctx.clone();
match datalake_compute.datalake {
DatalakeEnvelope::BlockSampled(ref datalake) => {
let compiled_block_sampled =
compile_block_sampled_datalake(datalake.clone(), &self.provider).await?;
headers.extend(compiled_block_sampled.headers);
accounts.extend(compiled_block_sampled.accounts);
storages.extend(compiled_block_sampled.storages);
if mmr.is_some() && mmr.unwrap() != compiled_block_sampled.mmr_meta {
bail!("MMR meta data is not consistent");
} else {
mmr = Some(compiled_block_sampled.mmr_meta);
}
let aggregated_result = aggregation_fn
.operation(&compiled_block_sampled.values, Some(fn_context))?;
commit_results_maps.insert(task_commitment, aggregated_result);
if !aggregation_fn.is_pre_processable() {
pre_processable = false;
}
}
DatalakeEnvelope::Transactions(ref datalake) => {
let compiled_tx_datalake =
compile_tx_datalake(datalake.clone(), &self.provider).await?;
headers.extend(compiled_tx_datalake.headers);
transactions.extend(compiled_tx_datalake.transactions);
transaction_receipts.extend(compiled_tx_datalake.transaction_receipts);
if mmr.is_some() && mmr.unwrap() != compiled_tx_datalake.mmr_meta {
bail!("MMR meta data is not consistent");
} else {
mmr = Some(compiled_tx_datalake.mmr_meta);
}
let aggregated_result =
aggregation_fn.operation(&compiled_tx_datalake.values, Some(fn_context))?;
commit_results_maps.insert(task_commitment, aggregated_result);
if !aggregation_fn.is_pre_processable() {
pre_processable = false;
}
}
};
}
Ok(DatalakeComputeCompilationResults::new(
pre_processable,
commit_results_maps,
headers,
accounts,
storages,
transactions,
transaction_receipts,
mmr.unwrap(),
))
}
}