use std::time::Duration;
use alloy::{
network::Ethereum,
primitives::{
aliases::{I24, U160},
Address, Bytes, TxHash, I256, U256,
},
providers::{PendingTransactionConfig, Provider},
rpc::types::{TransactionReceipt, TransactionRequest},
};
use anyhow::{Context, Result};
use tracing::{debug, error, info, trace};
#[derive(Debug, Clone)]
pub struct MethodParameters {
pub calldata: Bytes,
pub value: U256,
}
use super::{adjust_gas_prices, AdjustedGasPrices};
use crate::types::{
swap_params::GasPriceOptions,
swap_results::{V3SwapResult, V3SwapWithIntermediateResult},
};
#[tracing::instrument(skip(provider, tx, error_handler))]
pub async fn send_and_wait_for_transaction<P, F>(
provider: &P,
tx: TransactionRequest,
timeout: Option<Duration>,
error_handler: Option<F>,
) -> Result<TransactionReceipt>
where
P: Provider<Ethereum> + ?Sized,
F: FnOnce(Box<dyn std::fmt::Display + Send + Sync>) -> anyhow::Error,
{
let pending_tx = if let Some(handler) = error_handler {
provider.send_transaction(tx).await.map_err(|e| handler(Box::new(e)))?
} else {
provider
.send_transaction(tx)
.await
.map_err(|e| anyhow::anyhow!("Failed to send transaction: {}", e))?
};
let tx_hash = pending_tx.tx_hash();
info!(tx_hash = ?tx_hash, "Transaction sent, waiting for confirmation");
let timeout_duration = timeout.unwrap_or(Duration::from_secs(120));
let pending_tx_config =
PendingTransactionConfig::new(*tx_hash).with_timeout(Some(timeout_duration));
match provider.watch_pending_transaction(pending_tx_config).await?.await {
Ok(tx_hash) => info!(tx_hash = ?tx_hash, "Transaction confirmed"),
Err(e) => {
error!(error = ?e, "Failed to watch pending transaction");
}
};
info!(
tx_hash = ?tx_hash,
timeout_seconds = timeout_duration.as_secs(),
"Transaction confirmed or timed out"
);
let error_message = format!("Failed to get transaction receipt: {}", tx_hash);
let receipt = provider
.get_transaction_receipt(*tx_hash)
.await
.context(error_message.clone())?
.context(error_message)?;
info!(
tx_hash = ?receipt.transaction_hash,
block_number = ?receipt.block_number,
gas_used = ?receipt.gas_used,
status = ?receipt.status(),
"Transaction receipt retrieved"
);
Ok(receipt)
}
#[tracing::instrument(skip(provider, method_params, gas_price_options), fields(
sender_address = ?sender_address,
to_address = ?to_address,
value = ?method_params.value
))]
pub async fn build_transaction_with_gas_prices<P: Provider<Ethereum> + ?Sized>(
provider: &P,
sender_address: Address,
to_address: Address,
method_params: MethodParameters,
gas_price_options: Option<GasPriceOptions>,
) -> Result<TransactionRequest> {
let provided_max_priority_fee_per_gas =
gas_price_options.as_ref().and_then(|g| g.max_priority_fee_per_gas);
let provided_max_fee_per_gas = gas_price_options.as_ref().and_then(|g| g.max_fee_per_gas);
let gas_price_buffer_multiplier =
gas_price_options.as_ref().and_then(|g| g.gas_price_buffer_multiplier);
let nonce = provider.get_transaction_count(sender_address).await?;
info!(nonce = ?nonce, "Fetched transaction nonce");
let AdjustedGasPrices { max_priority_fee_per_gas, max_fee_per_gas } =
if let Some(buffer_multiplier) = gas_price_buffer_multiplier {
adjust_gas_prices(
provider,
provided_max_priority_fee_per_gas,
provided_max_fee_per_gas,
buffer_multiplier,
)
.await?
} else {
AdjustedGasPrices {
max_priority_fee_per_gas: provided_max_priority_fee_per_gas,
max_fee_per_gas: provided_max_fee_per_gas,
}
};
let mut tx = TransactionRequest::default()
.from(sender_address)
.to(to_address)
.input(method_params.calldata.into())
.value(method_params.value)
.nonce(nonce);
if let Some(max_priority_fee) = max_priority_fee_per_gas {
let priority_fee_u128 = max_priority_fee.to::<u128>();
tx = tx.max_priority_fee_per_gas(priority_fee_u128);
debug!(max_priority_fee_per_gas = ?max_priority_fee, "Priority gas fee set");
}
if let Some(max_fee) = max_fee_per_gas {
let max_fee_u128 = max_fee.to::<u128>();
tx = tx.max_fee_per_gas(max_fee_u128);
debug!(max_fee_per_gas = ?max_fee, "Max fee per gas set");
}
info!(value = ?method_params.value, "Transaction built with gas prices");
Ok(tx)
}
pub struct SwapEventData {
pub sender: Address,
pub recipient: Address,
pub amount0: I256,
pub amount1: I256,
pub sqrt_price_x96: U160,
pub liquidity: u128,
pub tick: I24,
}
#[tracing::instrument(skip(receipt, decode_fn), fields(
tx_hash = ?receipt.transaction_hash,
log_count = receipt.logs().len()
))]
pub fn find_event<F, R>(receipt: &TransactionReceipt, decode_fn: F) -> Result<R>
where
F: Fn(&alloy::rpc::types::Log, TxHash) -> Result<R>,
{
let tx_hash = receipt.transaction_hash;
info!(
tx_hash = ?tx_hash,
log_count = receipt.logs().len(),
"Processing transaction logs"
);
for log in receipt.logs() {
match decode_fn(log, tx_hash) {
Ok(result) => {
info!(
tx_hash = ?tx_hash,
"Event decoded successfully"
);
return Ok(result);
}
Err(e) => {
trace!(error = ?e, "Log is not a matching event, skipping");
continue;
}
}
}
error!(
tx_hash = ?tx_hash,
log_count = receipt.logs().len(),
"No matching event found in transaction receipt"
);
Err(anyhow::anyhow!("No matching event found in transaction receipt"))
}
#[tracing::instrument(skip(receipt, decoder), fields(
tx_hash = ?receipt.transaction_hash,
log_count = receipt.logs().len()
))]
pub fn find_events<F, R>(receipt: &TransactionReceipt, decoder: F) -> Result<Vec<R>>
where
F: Fn(&alloy::rpc::types::Log, TxHash) -> Result<R>,
{
let tx_hash = receipt.transaction_hash;
info!(
tx_hash = ?tx_hash,
log_count = receipt.logs().len(),
"Processing transaction logs with decoder"
);
let mut results = Vec::new();
for log in receipt.logs() {
match decoder(log, tx_hash) {
Ok(result) => {
info!(
tx_hash = ?tx_hash,
"Event decoded successfully"
);
results.push(result);
}
Err(e) => {
trace!(
error = ?e,
"Failed to decode log, skipping"
);
continue;
}
}
}
if results.is_empty() {
error!(
tx_hash = ?tx_hash,
log_count = receipt.logs().len(),
"No matching event found in transaction receipt"
);
Err(anyhow::anyhow!("No matching event found in transaction receipt"))
} else {
info!(
tx_hash = ?tx_hash,
result_count = results.len(),
"Found {} events using decoder",
results.len()
);
Ok(results)
}
}
#[tracing::instrument(skip(swap_results), fields(
tx_hash = ?tx_hash,
swap_count = swap_results.len()
))]
pub fn merge_two_hop_swap_results(
swap_results: Vec<V3SwapWithIntermediateResult>,
tx_hash: TxHash,
) -> Result<V3SwapResult> {
if swap_results.len() < 2 {
error!(
tx_hash = ?tx_hash,
found_events = swap_results.len(),
"Expected 2 swap events for two-hop swap, found {}",
swap_results.len()
);
return Err(anyhow::anyhow!(
"Expected 2 swap events for two-hop swap, found {}",
swap_results.len()
));
}
let (first_swap, second_swap) = if swap_results[0].is_to_intermediate {
if swap_results[1].is_to_intermediate {
error!(
tx_hash = ?tx_hash,
"Both swap results have is_to_intermediate = true, expected one true and one false"
);
return Err(anyhow::anyhow!(
"Both swap results have is_to_intermediate = true, expected one true and one false"
));
}
(&swap_results[0], &swap_results[1])
} else {
if !swap_results[1].is_to_intermediate {
error!(
tx_hash = ?tx_hash,
"Both swap results have is_to_intermediate = false, expected one true and one false"
);
return Err(anyhow::anyhow!(
"Both swap results have is_to_intermediate = false, expected one true and one \
false"
));
}
(&swap_results[1], &swap_results[0])
};
info!(
tx_hash = ?tx_hash,
first_amount_in = ?first_swap.amount_in,
first_amount_out = ?first_swap.amount_out,
second_amount_in = ?second_swap.amount_in,
second_amount_out = ?second_swap.amount_out,
first_is_to_intermediate = first_swap.is_to_intermediate,
second_is_to_intermediate = second_swap.is_to_intermediate,
"Merging two-hop swap results"
);
Ok(V3SwapResult {
sender: first_swap.sender,
recipient: second_swap.recipient,
amount_in: first_swap.amount_in.clone(),
amount_out: second_swap.amount_out.clone(),
sqrt_price_x96: second_swap.sqrt_price_x96,
liquidity: second_swap.liquidity,
tick: second_swap.tick,
tx_hash,
})
}