use std::{borrow::Cow, sync::Arc};
use alloy_chains::NamedChain;
use alloy_eips::Typed2718;
use alloy_network::{AnyRpcTransaction, Network};
use alloy_provider::Provider;
use alloy_rpc_types::TransactionTrait;
use alloy_transport::TransportError;
use futures::future::join_all;
use tracing::{info, warn, Instrument};
use crate::errors::RetrievalError;
use crate::gas::adapter::ReceiptAdapter;
use crate::tracing::spans;
use super::failure::{build_lookup_attempt, build_lookup_failure, lookup_request_failed};
use super::gas_extractor::{extract_gas_and_amount, TransactionGasData};
use super::transfer_log_scanner::LogBatchEntry;
use super::types::{
CombinedDataLookupFailure, CombinedDataLookupPass, CombinedDataLookupStage, GasAndAmountForTx,
};
pub(crate) fn should_attempt_permissive_tx_decode(
chain: NamedChain,
error: &TransportError,
) -> bool {
matches!(chain, NamedChain::ZkSync | NamedChain::ZkSyncTestnet) && error.is_deser_error()
}
pub(crate) struct TxReceiptEnricher<N, P>
where
N: Network,
N::TransactionResponse:
TransactionTrait + alloy_provider::network::eip2718::Typed2718 + Send + Sync + Clone,
N::ReceiptResponse: Send + Sync + std::fmt::Debug + Clone,
P: Provider<N> + Send + Sync + Clone + 'static,
{
provider: Arc<P>,
network_marker: std::marker::PhantomData<N>,
}
impl<N, P> TxReceiptEnricher<N, P>
where
N: Network,
N::TransactionResponse:
TransactionTrait + alloy_provider::network::eip2718::Typed2718 + Send + Sync + Clone,
N::ReceiptResponse: Send + Sync + std::fmt::Debug + Clone,
P: Provider<N> + Send + Sync + Clone + 'static,
{
pub(crate) fn new(provider: Arc<P>) -> Self {
Self {
provider,
network_marker: std::marker::PhantomData,
}
}
pub(crate) async fn enrich_batch<A>(
&self,
chain: NamedChain,
entries: &[LogBatchEntry],
adapter: &A,
) -> Vec<Result<GasAndAmountForTx, CombinedDataLookupFailure>>
where
A: ReceiptAdapter<N> + Send + Sync,
{
if entries.is_empty() {
return vec![];
}
info!(
count = entries.len(),
"Batch fetching transaction data for logs"
);
let fetch_futures: Vec<_> = entries
.iter()
.copied()
.map(|entry| async move {
self.enrich_one(chain, entry, CombinedDataLookupPass::Batch, adapter)
.await
})
.collect();
join_all(fetch_futures).await
}
pub(crate) async fn retry_failed<A>(
&self,
chain: NamedChain,
mut failure: CombinedDataLookupFailure,
max_attempts: usize,
adapter: &A,
) -> (Result<GasAndAmountForTx, CombinedDataLookupFailure>, usize)
where
A: ReceiptAdapter<N> + Send + Sync,
{
let entry = LogBatchEntry {
tx_hash: failure.tx_hash,
block_number: failure.block_number,
transfer_value: failure.transfer_value,
};
let mut attempts = 0;
while attempts < max_attempts {
attempts += 1;
warn!(
?failure.tx_hash,
block_number = failure.block_number,
transfer_value = ?failure.transfer_value,
attempt = attempts,
max_attempts,
"Retrying combined data lookup serially after batch failure"
);
match self
.enrich_one(
chain,
entry,
CombinedDataLookupPass::SerialFallback,
adapter,
)
.await
{
Ok(data) => return (Ok(data), attempts),
Err(retry_failure) => failure.attempts.extend(retry_failure.attempts),
}
}
(Err(failure), attempts)
}
async fn enrich_one<A>(
&self,
chain: NamedChain,
entry: LogBatchEntry,
pass: CombinedDataLookupPass,
adapter: &A,
) -> Result<GasAndAmountForTx, CombinedDataLookupFailure>
where
A: ReceiptAdapter<N> + Send + Sync,
{
let provider = self.provider.clone();
let tx_hash = entry.tx_hash;
let span = spans::process_log_for_combined_data(tx_hash);
let (tx_result, receipt_result) = async move {
tokio::join!(
self.fetch_transaction_gas_data(chain, entry, pass),
provider.get_transaction_receipt(tx_hash)
)
}
.instrument(span)
.await;
extract_gas_and_amount::<N, A>(entry, tx_result, receipt_result, pass, adapter)
}
async fn fetch_transaction_gas_data(
&self,
chain: NamedChain,
entry: LogBatchEntry,
pass: CombinedDataLookupPass,
) -> Result<Option<TransactionGasData>, CombinedDataLookupFailure> {
let tx_hash = entry.tx_hash;
match self.provider.get_transaction_by_hash(tx_hash).await {
Ok(transaction) => Ok(transaction
.as_ref()
.map(TransactionGasData::from_transaction)),
Err(error) if should_attempt_permissive_tx_decode(chain, &error) => {
warn!(
?chain,
?tx_hash,
original_error = %error,
"Typed transaction lookup failed; retrying with permissive raw transaction decoding"
);
match self
.provider
.raw_request::<_, Option<AnyRpcTransaction>>(
Cow::Borrowed("eth_getTransactionByHash"),
(tx_hash,),
)
.await
{
Ok(transaction) => {
if let Some(transaction) = transaction.as_ref() {
info!(
?chain,
?tx_hash,
tx_type = transaction.ty(),
"Recovered transaction lookup with permissive raw transaction decoding"
);
}
Ok(transaction
.as_ref()
.map(TransactionGasData::from_transaction))
}
Err(raw_error) => {
warn!(
?chain,
?tx_hash,
original_error = %error,
raw_fallback_error = %raw_error,
"Permissive raw transaction decoding failed after typed lookup error"
);
let typed_failure = lookup_request_failed(
tx_hash,
CombinedDataLookupStage::Transaction,
error,
);
let raw_fallback_failure =
RetrievalError::Rpc(crate::errors::RpcError::request_failed(
format!("permissive_raw_get_transaction_by_hash({tx_hash})"),
raw_error,
));
let mut failure = build_lookup_failure(
entry,
pass,
CombinedDataLookupStage::Transaction,
typed_failure,
);
failure.attempts.push(build_lookup_attempt(
pass,
CombinedDataLookupStage::Transaction,
&raw_fallback_failure,
));
Err(failure)
}
}
}
Err(error) => Err(build_lookup_failure(
entry,
pass,
CombinedDataLookupStage::Transaction,
lookup_request_failed(tx_hash, CombinedDataLookupStage::Transaction, error),
)),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use alloy_transport::TransportErrorKind;
#[test]
fn permissive_tx_decode_guard_only_accepts_zksync_deser_errors() {
let deser_error =
serde_json::from_str::<u64>("\"not-a-number\"").expect_err("response should fail");
let zksync_error = TransportError::deser_err(deser_error, "\"not-a-number\"");
let transport_error = TransportError::from(TransportErrorKind::custom_str("boom"));
assert!(should_attempt_permissive_tx_decode(
NamedChain::ZkSync,
&zksync_error
));
assert!(should_attempt_permissive_tx_decode(
NamedChain::ZkSyncTestnet,
&zksync_error
));
assert!(!should_attempt_permissive_tx_decode(
NamedChain::Mainnet,
&zksync_error
));
assert!(!should_attempt_permissive_tx_decode(
NamedChain::ZkSync,
&transport_error
));
}
}