use std::sync::Arc;
use alloy_chains::NamedChain;
use alloy_network::Network;
use alloy_primitives::{Address, BlockNumber, TxHash};
use alloy_provider::Provider;
use alloy_rpc_types::Log as RpcLog;
use alloy_sol_types::SolEvent;
use tracing::{error, info};
use crate::config::SemioscanConfig;
use crate::errors::RetrievalError;
use crate::events::definitions::Transfer;
use crate::scan::LogScanner;
use super::gas_calculation::GasCalculationCore;
#[derive(Debug, Clone, Copy)]
pub(crate) struct LogBatchEntry {
pub(crate) tx_hash: TxHash,
pub(crate) block_number: BlockNumber,
pub(crate) transfer_value: alloy_primitives::U256,
}
pub(crate) struct TransferLogScanner<N: Network, P: Provider<N> + Send + Sync + Clone + 'static> {
provider: Arc<P>,
config: SemioscanConfig,
network_marker: std::marker::PhantomData<N>,
}
impl<N: Network, P: Provider<N> + Send + Sync + Clone + 'static> TransferLogScanner<N, P> {
pub(crate) fn new(provider: Arc<P>, config: SemioscanConfig) -> Self {
Self {
provider,
config,
network_marker: std::marker::PhantomData,
}
}
pub(crate) async fn scan(
&self,
chain: NamedChain,
from_address: Address,
to_address: Address,
token_address: Address,
from_block: BlockNumber,
to_block: BlockNumber,
) -> Result<Vec<LogBatchEntry>, RetrievalError> {
let filter_template =
GasCalculationCore::create_transfer_filter(token_address, from_address, to_address);
let scanner = LogScanner::<_, N>::new(Arc::clone(&self.provider), self.config.clone());
let logs: Vec<RpcLog> = scanner
.scan::<RetrievalError, _>(
chain,
filter_template,
from_block,
to_block,
|chunk_from, chunk_to, e| {
Some(RetrievalError::Rpc(
crate::errors::RpcError::get_logs_failed(
format!("get_logs for blocks {chunk_from}-{chunk_to} on {chain:?}"),
e,
),
))
},
)
.await?;
Ok(decode_transfer_logs(
&logs,
chain,
from_address,
to_address,
token_address,
))
}
}
pub(crate) fn decode_transfer_logs(
logs: &[RpcLog],
chain: NamedChain,
from_address: Address,
to_address: Address,
token_address: Address,
) -> Vec<LogBatchEntry> {
let mut entries = Vec::with_capacity(logs.len());
for rpc_log_entry in logs {
match Transfer::decode_log(&rpc_log_entry.inner) {
Ok(transfer_event_data) => {
let tx_hash = match rpc_log_entry.transaction_hash {
Some(hash) => hash,
None => {
error!("Missing transaction hash in log entry");
continue;
}
};
let block_number = match rpc_log_entry.block_number {
Some(num) => num,
None => {
error!("Missing block number in log entry");
continue;
}
};
info!(
?chain, ?from_address, ?to_address, ?token_address,
amount = ?transfer_event_data.value,
block = block_number,
?tx_hash,
"Decoded Transfer event for batch processing"
);
entries.push(LogBatchEntry {
tx_hash,
block_number,
transfer_value: transfer_event_data.value,
});
}
Err(e) => {
error!(
error = %e,
log_data = ?rpc_log_entry.data(),
log_topics = ?rpc_log_entry.topics(),
"Failed to decode Transfer log. Skipping log."
);
}
}
}
entries
}
#[cfg(test)]
mod tests {
use super::*;
use alloy_primitives::{address, LogData, B256, U256};
use alloy_sol_types::SolValue;
fn make_transfer_log(
tx_hash: Option<TxHash>,
block_number: Option<BlockNumber>,
token_address: Address,
from_address: Address,
to_address: Address,
transfer_value: U256,
) -> RpcLog {
RpcLog {
inner: alloy_primitives::Log {
address: token_address,
data: LogData::new(
vec![
Transfer::SIGNATURE_HASH,
from_address.into_word(),
to_address.into_word(),
],
transfer_value.abi_encode().into(),
)
.expect("valid log data"),
},
block_hash: Some(B256::repeat_byte(0x11)),
block_number,
block_timestamp: Some(1_700_000_000),
transaction_hash: tx_hash,
transaction_index: Some(0),
log_index: Some(0),
removed: false,
}
}
fn make_malformed_log() -> RpcLog {
RpcLog {
inner: alloy_primitives::Log {
address: Address::ZERO,
data: LogData::new(vec![B256::repeat_byte(0xAA)], vec![].into())
.expect("valid log data"),
},
block_hash: Some(B256::repeat_byte(0x11)),
block_number: Some(1),
block_timestamp: Some(1_700_000_000),
transaction_hash: Some(TxHash::from(B256::repeat_byte(0x22))),
transaction_index: Some(0),
log_index: Some(0),
removed: false,
}
}
#[test]
fn decode_valid_transfer_log_yields_entry_with_required_fields() {
let token = address!("0xc333333333333333333333333333333333333333");
let from = address!("0xa111111111111111111111111111111111111111");
let to = address!("0xb222222222222222222222222222222222222222");
let tx_hash = TxHash::from(B256::repeat_byte(0x10));
let value = U256::from(1_234_u64);
let log = make_transfer_log(Some(tx_hash), Some(42), token, from, to, value);
let entries = decode_transfer_logs(&[log], NamedChain::Mainnet, from, to, token);
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].tx_hash, tx_hash);
assert_eq!(entries[0].block_number, 42);
assert_eq!(entries[0].transfer_value, value);
}
#[test]
fn log_missing_tx_hash_is_skipped_not_panicking() {
let token = address!("0xc333333333333333333333333333333333333333");
let from = address!("0xa111111111111111111111111111111111111111");
let to = address!("0xb222222222222222222222222222222222222222");
let log = make_transfer_log(None, Some(42), token, from, to, U256::from(1_u64));
let entries = decode_transfer_logs(&[log], NamedChain::Mainnet, from, to, token);
assert!(entries.is_empty());
}
#[test]
fn log_missing_block_number_is_skipped_not_panicking() {
let token = address!("0xc333333333333333333333333333333333333333");
let from = address!("0xa111111111111111111111111111111111111111");
let to = address!("0xb222222222222222222222222222222222222222");
let tx_hash = TxHash::from(B256::repeat_byte(0x10));
let log = make_transfer_log(Some(tx_hash), None, token, from, to, U256::from(1_u64));
let entries = decode_transfer_logs(&[log], NamedChain::Mainnet, from, to, token);
assert!(entries.is_empty());
}
#[test]
fn malformed_log_topics_are_skipped_without_aborting_the_batch() {
let token = address!("0xc333333333333333333333333333333333333333");
let from = address!("0xa111111111111111111111111111111111111111");
let to = address!("0xb222222222222222222222222222222222222222");
let tx_hash = TxHash::from(B256::repeat_byte(0x10));
let valid = make_transfer_log(Some(tx_hash), Some(42), token, from, to, U256::from(7_u64));
let entries = decode_transfer_logs(
&[make_malformed_log(), valid],
NamedChain::Mainnet,
from,
to,
token,
);
assert_eq!(
entries.len(),
1,
"the malformed entry must not abort the batch"
);
assert_eq!(entries[0].transfer_value, U256::from(7_u64));
}
}