use crate::common::constants::MIN_PROFIT_THRESHOLD_WEI;
use crate::common::error::AppError;
use crate::core::executor::{BundleItem, SharedBundleSender};
use crate::core::nonce::NonceManager;
use crate::core::portfolio::PortfolioManager;
use crate::core::safety::SafetyGuard;
use crate::core::simulation::Simulator;
use crate::data::db::Database;
use crate::network::gas::{GasFees, GasOracle};
use crate::network::price_feed::PriceFeed;
use crate::network::provider::HttpProvider;
use crate::network::mev_share::MevShareHint;
use alloy::consensus::{SignableTransaction, Transaction as ConsensusTxTrait, TxEip1559};
use alloy::eips::eip2718::Encodable2718;
use alloy::network::{TransactionResponse, TxSignerSync};
use alloy::primitives::{Address, TxKind, B256, U256};
use alloy::providers::Provider;
use alloy::rpc::types::eth::state::StateOverridesBuilder;
use alloy::rpc::types::eth::Transaction;
use alloy::rpc::types::eth::TransactionInput;
use alloy::rpc::types::eth::TransactionRequest;
use alloy::rpc::types::Header;
use alloy::signers::local::PrivateKeySigner;
use alloy::sol;
use alloy::sol_types::SolCall;
use alloy_consensus::TxEnvelope;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::collections::HashSet;
use tokio::sync::{broadcast::Receiver, mpsc::UnboundedReceiver};
use crate::common::retry::retry_async;
use std::time::Duration;
#[derive(Debug)]
pub enum StrategyWork {
Mempool(Transaction),
MevShareHint(MevShareHint),
}
sol! {
#[derive(Debug, PartialEq, Eq)]
#[sol(rpc)]
contract UniV2Router {
function swapExactETHForTokens(uint256 amountOutMin, address[] calldata path, address to, uint256 deadline) payable returns (uint256[] memory amounts);
function swapExactTokensForETH(uint256 amountIn, uint256 amountOutMin, address[] calldata path, address to, uint256 deadline) returns (uint256[] memory amounts);
function swapExactTokensForTokens(uint256 amountIn, uint256 amountOutMin, address[] calldata path, address to, uint256 deadline) returns (uint256[] memory amounts);
function getAmountsOut(uint256 amountIn, address[] calldata path) external view returns (uint256[] memory amounts);
}
#[derive(Debug, PartialEq, Eq)]
#[sol(rpc)]
contract UniV3Router {
struct ExactInputSingleParams {
address tokenIn;
address tokenOut;
uint24 fee;
address recipient;
uint256 deadline;
uint256 amountIn;
uint256 amountOutMinimum;
uint160 sqrtPriceLimitX96;
}
struct ExactInputParams {
bytes path;
address recipient;
uint256 deadline;
uint256 amountIn;
uint256 amountOutMinimum;
}
function exactInputSingle(ExactInputSingleParams calldata params) external payable returns (uint256 amountOut);
function exactInput(ExactInputParams calldata params) external payable returns (uint256 amountOut);
}
}
use UniV2Router::{swapExactETHForTokensCall, swapExactTokensForETHCall, swapExactTokensForTokensCall};
#[derive(Default)]
pub struct StrategyStats {
pub processed: AtomicU64,
pub submitted: AtomicU64,
pub skipped: AtomicU64,
pub failed: AtomicU64,
pub skip_unknown_router: AtomicU64,
pub skip_decode_failed: AtomicU64,
pub skip_missing_wrapped: AtomicU64,
pub skip_gas_cap: AtomicU64,
pub skip_sim_failed: AtomicU64,
pub skip_profit_guard: AtomicU64,
pub skip_unsupported_router: AtomicU64,
pub skip_token_call: AtomicU64,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum RouterKind {
V2Like,
V3Like,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum SwapDirection {
BuyWithEth,
SellForEth,
Other,
}
pub struct StrategyExecutor {
tx_rx: UnboundedReceiver<StrategyWork>,
mut_block_rx: Receiver<Header>,
safety_guard: Arc<SafetyGuard>,
bundle_sender: SharedBundleSender,
db: Database,
portfolio: Arc<PortfolioManager>,
gas_oracle: GasOracle,
price_feed: PriceFeed,
chain_id: u64,
stats: Arc<StrategyStats>,
max_gas_price_gwei: u64,
simulator: Simulator,
signer: PrivateKeySigner,
nonce_manager: NonceManager,
slippage_bps: u64,
http_provider: HttpProvider,
dry_run: bool,
router_allowlist: HashSet<Address>,
wrapped_native: Address,
}
impl StrategyExecutor {
fn log_skip(&self, reason: &str, detail: &str) {
if self.dry_run {
tracing::info!(target: "strategy_skip", %reason, %detail, "Dry-run skip");
} else {
tracing::debug!(target: "strategy_skip", %reason, %detail);
}
match reason {
"unknown_router" => {
self.stats
.skip_unknown_router
.fetch_add(1, Ordering::Relaxed);
}
"decode_failed" => {
self.stats
.skip_decode_failed
.fetch_add(1, Ordering::Relaxed);
}
"zero_amount_or_no_wrapped_native" => {
self.stats
.skip_missing_wrapped
.fetch_add(1, Ordering::Relaxed);
}
"gas_price_cap" => {
self.stats.skip_gas_cap.fetch_add(1, Ordering::Relaxed);
}
"simulation_failed" => {
self.stats
.skip_sim_failed
.fetch_add(1, Ordering::Relaxed);
}
"profit_or_gas_guard" => {
self.stats
.skip_profit_guard
.fetch_add(1, Ordering::Relaxed);
}
"unsupported_router_type" => {
self.stats
.skip_unsupported_router
.fetch_add(1, Ordering::Relaxed);
}
"token_call" => {
self.stats
.skip_token_call
.fetch_add(1, Ordering::Relaxed);
}
_ => {}
}
}
}
#[derive(Clone, Debug)]
struct ObservedSwap {
router: Address,
path: Vec<Address>,
amount_in: U256,
min_out: U256,
recipient: Address,
router_kind: RouterKind,
}
struct BackrunTx {
raw: Vec<u8>,
hash: B256,
to: Address,
value: U256,
request: TransactionRequest,
expected_out: U256,
}
struct FrontRunTx {
raw: Vec<u8>,
hash: B256,
to: Address,
value: U256,
request: TransactionRequest,
expected_tokens: U256,
}
impl StrategyExecutor {
pub fn new(
tx_rx: UnboundedReceiver<StrategyWork>,
block_rx: Receiver<Header>,
safety_guard: Arc<SafetyGuard>,
bundle_sender: SharedBundleSender,
db: Database,
portfolio: Arc<PortfolioManager>,
gas_oracle: GasOracle,
price_feed: PriceFeed,
chain_id: u64,
max_gas_price_gwei: u64,
simulator: Simulator,
stats: Arc<StrategyStats>,
signer: PrivateKeySigner,
nonce_manager: NonceManager,
slippage_bps: u64,
http_provider: HttpProvider,
dry_run: bool,
router_allowlist: HashSet<Address>,
wrapped_native: Address,
) -> Self {
Self {
tx_rx,
mut_block_rx: block_rx,
safety_guard,
bundle_sender,
db,
portfolio,
gas_oracle,
price_feed,
chain_id,
stats,
max_gas_price_gwei,
simulator,
signer,
nonce_manager,
slippage_bps,
http_provider,
dry_run,
router_allowlist,
wrapped_native,
}
}
pub async fn run(mut self) -> Result<(), AppError> {
tracing::info!("StrategyExecutor: waiting for pending transactions");
while let Some(work) = self.tx_rx.recv().await {
while let Ok(header) = self.mut_block_rx.try_recv() {
tracing::debug!("StrategyExecutor: observed new block {:?}", header.hash);
}
self.safety_guard.check()?;
let outcome = match work {
StrategyWork::Mempool(tx) => {
let from = tx.from();
let res = self.evaluate_mempool_tx(&tx).await;
(res, Some(from), Some(tx.tx_hash()))
}
StrategyWork::MevShareHint(hint) => {
let res = self.evaluate_mev_share_hint(&hint).await;
(res, hint.from, Some(hint.tx_hash))
}
};
match outcome {
(Ok(Some(tx_hash)), from, _) => {
tracing::info!(
target: "strategy",
from = ?from,
tx_hash = %tx_hash,
"Bundle submitted"
);
self.safety_guard.report_success();
self.stats.submitted.fetch_add(1, Ordering::Relaxed);
}
(Ok(None), from, tx_hash) => {
tracing::debug!(
target: "strategy",
from=?from,
tx_hash=?tx_hash,
"Skipped item"
);
self.stats.skipped.fetch_add(1, Ordering::Relaxed);
}
(Err(e), _, _) => {
self.safety_guard.report_failure();
self.stats.failed.fetch_add(1, Ordering::Relaxed);
tracing::error!(target: "strategy", error=%e, "Strategy failed");
}
};
let processed = self.stats.processed.fetch_add(1, Ordering::Relaxed) + 1;
if processed % 50 == 0 {
tracing::info!(
target: "strategy_summary",
processed,
submitted = self.stats.submitted.load(Ordering::Relaxed),
skipped = self.stats.skipped.load(Ordering::Relaxed),
failed = self.stats.failed.load(Ordering::Relaxed),
skip_unknown_router = self.stats.skip_unknown_router.load(Ordering::Relaxed),
skip_decode = self.stats.skip_decode_failed.load(Ordering::Relaxed),
skip_missing_wrapped = self.stats.skip_missing_wrapped.load(Ordering::Relaxed),
skip_gas_cap = self.stats.skip_gas_cap.load(Ordering::Relaxed),
skip_sim_failed = self.stats.skip_sim_failed.load(Ordering::Relaxed),
skip_profit_guard = self.stats.skip_profit_guard.load(Ordering::Relaxed),
skip_unsupported_router = self.stats.skip_unsupported_router.load(Ordering::Relaxed),
skip_token_call = self.stats.skip_token_call.load(Ordering::Relaxed),
"Strategy loop summary"
);
}
}
Ok(())
}
async fn evaluate_mempool_tx(&self, tx: &Transaction) -> Result<Option<String>, AppError> {
let to_addr = match tx.kind() {
TxKind::Call(addr) => addr,
TxKind::Create => return Ok(None),
};
if !self.router_allowlist.contains(&to_addr) {
if Self::is_common_token_call(tx.input()) {
self.log_skip("token_call", "erc20 transfer/approve");
return Ok(None);
}
self.log_skip("unknown_router", &format!("to={to_addr:#x}"));
return Ok(None);
}
let Some(observed_swap) = Self::decode_swap(tx) else {
self.log_skip("decode_failed", "unable to decode swap input");
return Ok(None);
};
if observed_swap.router_kind == RouterKind::V3Like {
self.log_skip("unsupported_router_type", "uniswap_v3 not yet implemented for backrun");
return Ok(None);
}
if observed_swap.amount_in.is_zero() || !observed_swap.path.contains(&self.wrapped_native) {
self.log_skip(
"zero_amount_or_no_wrapped_native",
"path missing wrapped native or zero amount",
);
return Ok(None);
}
let direction = Self::direction(&observed_swap, self.wrapped_native);
let tx_value = tx.value();
let gas_fees: GasFees = self.gas_oracle.estimate_eip1559_fees().await?;
let gas_cap_wei = U256::from(self.max_gas_price_gwei) * U256::from(1_000_000_000u64);
if U256::from(gas_fees.max_fee_per_gas) > gas_cap_wei {
self.log_skip(
"gas_price_cap",
&format!(
"max_fee_per_gas={} cap_gwei={}",
gas_fees.max_fee_per_gas, self.max_gas_price_gwei
),
);
return Ok(None);
}
let real_balance = self
.portfolio
.update_eth_balance(self.chain_id)
.await?;
let (wallet_chain_balance, used_mock_balance) = if self.dry_run {
let gas_headroom =
U256::from(tx.gas_limit()) * U256::from(gas_fees.max_fee_per_gas);
let value_headroom = tx.value().saturating_mul(U256::from(2u64));
let mock = gas_headroom
.saturating_add(value_headroom)
.max(U256::from(500_000_000_000_000_000u128)); (mock, true)
} else {
(real_balance, false)
};
let base_gas_budget = U256::from(tx.gas_limit()) * U256::from(gas_fees.max_fee_per_gas);
if !self.dry_run {
self.portfolio
.ensure_funding(self.chain_id, base_gas_budget)?;
}
let mut attack_value_eth = U256::ZERO;
let mut bundle_requests: Vec<TransactionRequest> = Vec::new();
let mut raw_bundle: Vec<Vec<u8>> = Vec::new();
let mut front_run: Option<FrontRunTx> = None;
if direction == SwapDirection::BuyWithEth {
let nonce_front = self.nonce_manager.get_next_nonce().await?;
match self
.build_front_run_tx(
&observed_swap,
gas_fees.max_fee_per_gas,
gas_fees.max_priority_fee_per_gas,
wallet_chain_balance,
tx.gas_limit(),
nonce_front,
)
.await
{
Ok(Some(f)) => {
attack_value_eth = f.value;
bundle_requests.push(f.request.clone());
raw_bundle.push(f.raw.clone());
front_run = Some(f);
}
Ok(None) => {}
Err(e) => {
self.log_skip("front_run_build_failed", &e.to_string());
return Ok(None);
}
}
}
let nonce_backrun = self.nonce_manager.get_next_nonce().await?;
let backrun = match self
.build_backrun_tx(
&observed_swap,
gas_fees.max_fee_per_gas,
gas_fees.max_priority_fee_per_gas,
wallet_chain_balance,
tx.gas_limit(),
front_run.as_ref().map(|f| f.expected_tokens),
nonce_backrun,
)
.await
{
Ok(b) => b,
Err(e) => {
self.log_skip("backrun_build_failed", &e.to_string());
return Ok(None);
}
};
let backrun_raw = backrun.raw.clone();
let sim_nonce = front_run
.as_ref()
.and_then(|f| f.request.nonce)
.or_else(|| backrun.request.nonce)
.unwrap_or_default();
let overrides = StateOverridesBuilder::default()
.with_balance(self.signer.address(), wallet_chain_balance)
.with_nonce(self.signer.address(), sim_nonce)
.build();
bundle_requests.push(tx.clone().into_request());
bundle_requests.push(backrun.request.clone());
raw_bundle.push(tx.inner.encoded_2718());
raw_bundle.push(backrun_raw.clone());
let bundle_sims = retry_async(
move |_| {
let simulator = self.simulator.clone();
let reqs = bundle_requests.clone();
let overrides = overrides.clone();
async move { simulator.simulate_bundle_requests(&reqs, Some(overrides)).await }
},
2,
Duration::from_millis(100),
)
.await?;
if bundle_sims.iter().any(|o| !o.success) {
self.log_skip("simulation_failed", "bundle sim returned failure");
return Ok(None);
}
let mut gas_used_total = 0u64;
for sim in &bundle_sims {
gas_used_total = gas_used_total.saturating_add(sim.gas_used);
}
let bundle_gas_limit = gas_used_total.max(tx.gas_limit());
let gas_cost_wei =
U256::from(bundle_gas_limit) * U256::from(gas_fees.max_fee_per_gas);
if !self.dry_run {
let spend = backrun.value.saturating_add(attack_value_eth);
self.portfolio
.ensure_funding(self.chain_id, spend + gas_cost_wei)?;
}
let total_eth_in = backrun.value.saturating_add(attack_value_eth);
let gross_profit_wei = backrun.expected_out.saturating_sub(total_eth_in);
let net_profit_wei = gross_profit_wei.saturating_sub(gas_cost_wei);
let eth_quote = self.price_feed.get_price("ETHUSD").await?;
let profit_eth = wei_to_eth_f64(gross_profit_wei);
let gas_cost_eth = wei_to_eth_f64(gas_cost_wei);
let net_profit_eth = profit_eth - gas_cost_eth;
let profit_floor = StrategyExecutor::dynamic_profit_floor(wallet_chain_balance);
let gas_ratio_ok = if gross_profit_wei.is_zero() {
false
} else {
gas_cost_wei
.saturating_mul(U256::from(100u64))
<= gross_profit_wei.saturating_mul(U256::from(50u64))
};
tracing::info!(
target: "strategy",
gas_limit = bundle_gas_limit,
max_fee_per_gas = gas_fees.max_fee_per_gas,
gas_cost_eth = gas_cost_eth,
backrun_value_eth = wei_to_eth_f64(backrun.value),
expected_out_eth = wei_to_eth_f64(backrun.expected_out),
front_run_value_eth = wei_to_eth_f64(attack_value_eth),
net_profit_eth = net_profit_eth,
wallet_eth = wei_to_eth_f64(wallet_chain_balance),
price_source = %eth_quote.source,
price = eth_quote.price,
victim_min_out = ?observed_swap.min_out,
victim_recipient = ?observed_swap.recipient,
path_len = observed_swap.path.len(),
path = ?observed_swap.path,
router = ?observed_swap.router,
used_mock_balance,
profit_floor_wei = %profit_floor,
gas_ratio_ok,
sandwich = front_run.is_some(),
"Strategy evaluation"
);
if net_profit_wei < profit_floor || net_profit_eth <= 0.0 || !gas_ratio_ok {
self.log_skip(
"profit_or_gas_guard",
&format!(
"net_profit_wei={} floor={} gas_ratio_ok={} net_profit_eth={}",
net_profit_wei, profit_floor, gas_ratio_ok, net_profit_eth
),
);
return Ok(None);
}
let tx_hash = tx.tx_hash();
if self.dry_run {
tracing::info!(
target: "strategy_dry_run",
tx_hash = %format!("{:#x}", tx_hash),
net_profit_eth = net_profit_eth,
gross_profit_eth = profit_eth,
gas_cost_eth = gas_cost_eth,
front_run_value_eth = wei_to_eth_f64(attack_value_eth),
wallet_eth = wei_to_eth_f64(wallet_chain_balance),
path_len = observed_swap.path.len(),
router = ?observed_swap.router,
used_mock_balance,
sandwich = front_run.is_some(),
"Dry-run only: simulated profitable bundle (not sent)"
);
return Ok(Some(format!("{tx_hash:#x}")));
}
let _ = self
.db
.update_status(&format!("{:#x}", backrun.hash), None, Some(false))
.await;
self.bundle_sender
.send_bundle(&raw_bundle, self.chain_id)
.await?;
let to_addr = match tx.kind() {
TxKind::Call(addr) => Some(addr),
TxKind::Create => None,
};
self.db
.save_transaction(
&format!("{tx_hash:#x}"),
self.chain_id,
&format!("{:#x}", tx.from()),
to_addr.as_ref().map(|a| format!("{:#x}", a)).as_deref(),
tx_value.to_string().as_str(),
Some("strategy_v1"),
)
.await?;
self.db
.save_transaction(
&format!("{:#x}", backrun.hash),
self.chain_id,
&format!("{:#x}", self.signer.address()),
Some(format!("{:#x}", backrun.to)).as_deref(),
backrun.value.to_string().as_str(),
Some("strategy_backrun"),
)
.await?;
if let Some(f) = &front_run {
self.db
.save_transaction(
&format!("{:#x}", f.hash),
self.chain_id,
&format!("{:#x}", self.signer.address()),
Some(format!("{:#x}", f.to)).as_deref(),
f.value.to_string().as_str(),
Some("strategy_front_run"),
)
.await?;
}
self.db
.save_profit_record(
&format!("{tx_hash:#x}"),
self.chain_id,
"strategy_v1",
profit_eth,
gas_cost_eth,
net_profit_eth,
)
.await?;
self.portfolio
.record_profit(self.chain_id, profit_eth, gas_cost_eth);
let _ = self
.db
.save_market_price(self.chain_id, "ETHUSD", eth_quote.price, ð_quote.source)
.await;
let _ = self.await_receipt(&backrun.hash).await;
Ok(Some(format!("{tx_hash:#x}")))
}
async fn evaluate_mev_share_hint(
&self,
hint: &MevShareHint,
) -> Result<Option<String>, AppError> {
if !self.router_allowlist.contains(&hint.router) {
self.log_skip("unknown_router", &format!("to={:#x}", hint.router));
return Ok(None);
}
let Some(observed_swap) =
Self::decode_swap_input(hint.router, &hint.call_data, hint.value)
else {
self.log_skip("decode_failed", "unable to decode swap input");
return Ok(None);
};
if observed_swap.router_kind == RouterKind::V3Like {
self.log_skip(
"unsupported_router_type",
"uniswap_v3 not yet implemented for backrun",
);
return Ok(None);
}
if observed_swap.amount_in.is_zero() || !observed_swap.path.contains(&self.wrapped_native) {
self.log_skip(
"zero_amount_or_no_wrapped_native",
"path missing wrapped native or zero amount",
);
return Ok(None);
}
let direction = Self::direction(&observed_swap, self.wrapped_native);
let gas_limit_hint = hint.gas_limit.unwrap_or(220_000);
let gas_fees: GasFees = self.gas_oracle.estimate_eip1559_fees().await?;
let gas_cap_wei = U256::from(self.max_gas_price_gwei) * U256::from(1_000_000_000u64);
if U256::from(gas_fees.max_fee_per_gas) > gas_cap_wei {
self.log_skip(
"gas_price_cap",
&format!(
"max_fee_per_gas={} cap_gwei={}",
gas_fees.max_fee_per_gas, self.max_gas_price_gwei
),
);
return Ok(None);
}
let real_balance = self.portfolio.update_eth_balance(self.chain_id).await?;
let (wallet_chain_balance, used_mock_balance) = if self.dry_run {
let gas_headroom =
U256::from(gas_limit_hint) * U256::from(gas_fees.max_fee_per_gas);
let value_headroom = hint.value.saturating_mul(U256::from(2u64));
let mock = gas_headroom
.saturating_add(value_headroom)
.max(U256::from(500_000_000_000_000_000u128)); (mock, true)
} else {
(real_balance, false)
};
let base_gas_budget = U256::from(gas_limit_hint) * U256::from(gas_fees.max_fee_per_gas);
if !self.dry_run {
self.portfolio
.ensure_funding(self.chain_id, base_gas_budget)?;
}
let mut attack_value_eth = U256::ZERO;
let mut bundle_requests: Vec<TransactionRequest> = Vec::new();
let mut bundle_body: Vec<BundleItem> = Vec::new();
let mut front_run: Option<FrontRunTx> = None;
if direction == SwapDirection::BuyWithEth {
let nonce_front = self.nonce_manager.get_next_nonce().await?;
match self
.build_front_run_tx(
&observed_swap,
gas_fees.max_fee_per_gas,
gas_fees.max_priority_fee_per_gas,
wallet_chain_balance,
gas_limit_hint,
nonce_front,
)
.await
{
Ok(Some(f)) => {
attack_value_eth = f.value;
bundle_requests.push(f.request.clone());
front_run = Some(f);
}
Ok(None) => {}
Err(e) => {
self.log_skip("front_run_build_failed", &e.to_string());
return Ok(None);
}
}
}
let nonce_backrun = self.nonce_manager.get_next_nonce().await?;
let backrun = match self
.build_backrun_tx(
&observed_swap,
gas_fees.max_fee_per_gas,
gas_fees.max_priority_fee_per_gas,
wallet_chain_balance,
gas_limit_hint,
front_run.as_ref().map(|f| f.expected_tokens),
nonce_backrun,
)
.await
{
Ok(b) => b,
Err(e) => {
self.log_skip("backrun_build_failed", &e.to_string());
return Ok(None);
}
};
let backrun_raw = backrun.raw.clone();
let sim_nonce = front_run
.as_ref()
.and_then(|f| f.request.nonce)
.or_else(|| backrun.request.nonce)
.unwrap_or_default();
let overrides = StateOverridesBuilder::default()
.with_balance(self.signer.address(), wallet_chain_balance)
.with_nonce(self.signer.address(), sim_nonce)
.build();
let max_fee_hint = hint
.max_fee_per_gas
.unwrap_or(gas_fees.max_fee_per_gas);
let max_prio_hint = hint
.max_priority_fee_per_gas
.unwrap_or(gas_fees.max_priority_fee_per_gas);
let victim_request = TransactionRequest {
from: hint.from,
to: Some(TxKind::Call(hint.router)),
max_fee_per_gas: Some(max_fee_hint),
max_priority_fee_per_gas: Some(max_prio_hint),
gas: Some(gas_limit_hint),
value: Some(hint.value),
input: TransactionInput::new(hint.call_data.clone().into()),
nonce: None,
chain_id: Some(self.chain_id),
..Default::default()
};
bundle_requests.push(victim_request);
bundle_requests.push(backrun.request.clone());
if let Some(f) = &front_run {
bundle_body.push(BundleItem::Tx {
tx: format!("0x{}", hex::encode(&f.raw)),
can_revert: false,
});
}
bundle_body.push(BundleItem::Hash {
hash: format!("{:#x}", hint.tx_hash),
});
bundle_body.push(BundleItem::Tx {
tx: format!("0x{}", hex::encode(&backrun_raw)),
can_revert: false,
});
let bundle_sims = retry_async(
move |_| {
let simulator = self.simulator.clone();
let reqs = bundle_requests.clone();
let overrides = overrides.clone();
async move { simulator.simulate_bundle_requests(&reqs, Some(overrides)).await }
},
2,
Duration::from_millis(100),
)
.await?;
if bundle_sims.iter().any(|o| !o.success) {
self.log_skip("simulation_failed", "bundle sim returned failure");
return Ok(None);
}
let mut gas_used_total = 0u64;
for sim in &bundle_sims {
gas_used_total = gas_used_total.saturating_add(sim.gas_used);
}
let bundle_gas_limit = gas_used_total.max(gas_limit_hint);
let gas_cost_wei =
U256::from(bundle_gas_limit) * U256::from(gas_fees.max_fee_per_gas);
if !self.dry_run {
let spend = backrun.value.saturating_add(attack_value_eth);
self.portfolio
.ensure_funding(self.chain_id, spend + gas_cost_wei)?;
}
let total_eth_in = backrun.value.saturating_add(attack_value_eth);
let gross_profit_wei = backrun.expected_out.saturating_sub(total_eth_in);
let net_profit_wei = gross_profit_wei.saturating_sub(gas_cost_wei);
let eth_quote = self.price_feed.get_price("ETHUSD").await?;
let profit_eth = wei_to_eth_f64(gross_profit_wei);
let gas_cost_eth = wei_to_eth_f64(gas_cost_wei);
let net_profit_eth = profit_eth - gas_cost_eth;
let profit_floor = StrategyExecutor::dynamic_profit_floor(wallet_chain_balance);
let gas_ratio_ok = if gross_profit_wei.is_zero() {
false
} else {
gas_cost_wei
.saturating_mul(U256::from(100u64))
<= gross_profit_wei.saturating_mul(U256::from(50u64))
};
tracing::info!(
target: "strategy",
gas_limit = bundle_gas_limit,
max_fee_per_gas = gas_fees.max_fee_per_gas,
gas_cost_eth = gas_cost_eth,
backrun_value_eth = wei_to_eth_f64(backrun.value),
expected_out_eth = wei_to_eth_f64(backrun.expected_out),
front_run_value_eth = wei_to_eth_f64(attack_value_eth),
net_profit_eth = net_profit_eth,
wallet_eth = wei_to_eth_f64(wallet_chain_balance),
price_source = %eth_quote.source,
price = eth_quote.price,
victim_min_out = ?observed_swap.min_out,
victim_recipient = ?observed_swap.recipient,
path_len = observed_swap.path.len(),
path = ?observed_swap.path,
router = ?observed_swap.router,
used_mock_balance,
profit_floor_wei = %profit_floor,
gas_ratio_ok,
sandwich = front_run.is_some(),
"MEV-Share strategy evaluation"
);
if net_profit_wei < profit_floor || net_profit_eth <= 0.0 || !gas_ratio_ok {
self.log_skip(
"profit_or_gas_guard",
&format!(
"net_profit_wei={} floor={} gas_ratio_ok={} net_profit_eth={}",
net_profit_wei, profit_floor, gas_ratio_ok, net_profit_eth
),
);
return Ok(None);
}
let tx_hash = format!("{:#x}", hint.tx_hash);
if self.dry_run {
tracing::info!(
target: "strategy_dry_run",
tx_hash = %tx_hash,
net_profit_eth = net_profit_eth,
gross_profit_eth = profit_eth,
gas_cost_eth = gas_cost_eth,
front_run_value_eth = wei_to_eth_f64(attack_value_eth),
wallet_eth = wei_to_eth_f64(wallet_chain_balance),
path_len = observed_swap.path.len(),
router = ?observed_swap.router,
used_mock_balance,
sandwich = front_run.is_some(),
"Dry-run only: simulated profitable MEV-Share bundle (not sent)"
);
return Ok(Some(tx_hash));
}
let _ = self
.db
.update_status(&tx_hash, None, Some(false))
.await;
self.bundle_sender
.send_mev_share_bundle(&bundle_body)
.await?;
let from_addr = hint.from.unwrap_or(Address::ZERO);
self.db
.save_transaction(
&tx_hash,
self.chain_id,
&format!("{:#x}", from_addr),
Some(format!("{:#x}", hint.router)).as_deref(),
hint.value.to_string().as_str(),
Some("strategy_mev_share"),
)
.await?;
self.db
.save_transaction(
&format!("{:#x}", backrun.hash),
self.chain_id,
&format!("{:#x}", self.signer.address()),
Some(format!("{:#x}", backrun.to)).as_deref(),
backrun.value.to_string().as_str(),
Some("strategy_backrun"),
)
.await?;
if let Some(f) = &front_run {
self.db
.save_transaction(
&format!("{:#x}", f.hash),
self.chain_id,
&format!("{:#x}", self.signer.address()),
Some(format!("{:#x}", f.to)).as_deref(),
f.value.to_string().as_str(),
Some("strategy_front_run"),
)
.await?;
}
self.db
.save_profit_record(
&tx_hash,
self.chain_id,
"strategy_mev_share",
profit_eth,
gas_cost_eth,
net_profit_eth,
)
.await?;
self.portfolio
.record_profit(self.chain_id, profit_eth, gas_cost_eth);
let _ = self
.db
.save_market_price(self.chain_id, "ETHUSD", eth_quote.price, ð_quote.source)
.await;
let _ = self.await_receipt(&backrun.hash).await;
Ok(Some(tx_hash))
}
}
impl StrategyExecutor {
fn dynamic_profit_floor(wallet_balance: U256) -> U256 {
let abs_floor = *MIN_PROFIT_THRESHOLD_WEI;
let scaled = wallet_balance
.checked_div(U256::from(100_000u64))
.unwrap_or(U256::ZERO);
if scaled > abs_floor {
scaled
} else {
abs_floor
}
}
fn dynamic_backrun_value(
observed_in: U256,
wallet_balance: U256,
slippage_bps: u64,
gas_limit_hint: u64,
max_fee_per_gas: u128,
) -> Result<U256, AppError> {
let mut value =
observed_in.saturating_mul(U256::from(slippage_bps)) / U256::from(10_000u64);
let min_backrun = U256::from(100_000_000_000_000u64);
if value < min_backrun {
value = min_backrun;
}
let mut max_value = wallet_balance
.checked_div(U256::from(4u64))
.unwrap_or(wallet_balance);
let gas_buffer =
U256::from(max_fee_per_gas).saturating_mul(U256::from(gas_limit_hint.max(210_000)));
if gas_buffer > wallet_balance / U256::from(8u64) {
max_value = wallet_balance
.checked_div(U256::from(8u64))
.unwrap_or(wallet_balance);
}
if value > max_value {
value = max_value;
}
if value.is_zero() {
return Err(AppError::Strategy(
"Backrun value is zero after caps".into(),
));
}
Ok(value)
}
fn decode_swap(tx: &Transaction) -> Option<ObservedSwap> {
let router = match tx.kind() {
TxKind::Call(addr) => addr,
TxKind::Create => return None,
};
Self::decode_swap_input(router, tx.input(), tx.value())
}
fn decode_swap_input(router: Address, input: &[u8], eth_value: U256) -> Option<ObservedSwap> {
if input.len() < 4 {
return None;
}
if let Ok(decoded) = swapExactETHForTokensCall::abi_decode(input) {
return Some(ObservedSwap {
router,
path: decoded.path,
amount_in: eth_value,
min_out: decoded.amountOutMin,
recipient: decoded.to,
router_kind: RouterKind::V2Like,
});
}
if let Ok(decoded) = swapExactTokensForETHCall::abi_decode(input) {
return Some(ObservedSwap {
router,
path: decoded.path,
amount_in: decoded.amountIn,
min_out: decoded.amountOutMin,
recipient: decoded.to,
router_kind: RouterKind::V2Like,
});
}
if let Ok(decoded) = swapExactTokensForTokensCall::abi_decode(input) {
return Some(ObservedSwap {
router,
path: decoded.path,
amount_in: decoded.amountIn,
min_out: decoded.amountOutMin,
recipient: decoded.to,
router_kind: RouterKind::V2Like,
});
}
if let Ok(decoded) = UniV3Router::exactInputSingleCall::abi_decode(input) {
let params = decoded.params;
return Some(ObservedSwap {
router,
path: vec![params.tokenIn, params.tokenOut],
amount_in: params.amountIn,
min_out: params.amountOutMinimum,
recipient: params.recipient,
router_kind: RouterKind::V3Like,
});
}
if let Ok(decoded) = UniV3Router::exactInputCall::abi_decode(input) {
let params = decoded.params;
if let Some(path) = Self::parse_v3_path(¶ms.path) {
return Some(ObservedSwap {
router,
path,
amount_in: params.amountIn,
min_out: params.amountOutMinimum,
recipient: params.recipient,
router_kind: RouterKind::V3Like,
});
}
}
None
}
fn target_token(path: &[Address], wrapped_native: Address) -> Option<Address> {
path.iter().copied().find(|addr| addr != &wrapped_native)
}
fn direction(observed: &ObservedSwap, wrapped_native: Address) -> SwapDirection {
let starts_with_native = observed.path.first().copied() == Some(wrapped_native);
let ends_with_native = observed.path.last().copied() == Some(wrapped_native);
if starts_with_native {
SwapDirection::BuyWithEth
} else if ends_with_native {
SwapDirection::SellForEth
} else {
SwapDirection::Other
}
}
fn parse_v3_path(path: &[u8]) -> Option<Vec<Address>> {
if path.len() < 43 {
return None;
}
let mut tokens = Vec::new();
let mut offset = 0;
tokens.push(Address::from_slice(&path[offset..offset + 20]));
offset += 20;
while offset < path.len() {
if path.len() < offset + 3 + 20 {
return None;
}
offset += 3; tokens.push(Address::from_slice(&path[offset..offset + 20]));
offset += 20;
}
Some(tokens)
}
fn is_common_token_call(input: &[u8]) -> bool {
if input.len() < 4 {
return false;
}
let selector = &input[..4];
const TRANSFER: [u8; 4] = [0xa9, 0x05, 0x9c, 0xbb];
const TRANSFER_FROM: [u8; 4] = [0x23, 0xb8, 0x72, 0xdd];
const APPROVE: [u8; 4] = [0x09, 0x5e, 0xa7, 0xb3];
const PERMIT: [u8; 4] = [0xd5, 0x05, 0xac, 0xcf]; selector == TRANSFER
|| selector == TRANSFER_FROM
|| selector == APPROVE
|| selector == PERMIT
}
async fn build_front_run_tx(
&self,
observed: &ObservedSwap,
max_fee_per_gas: u128,
max_priority_fee_per_gas: u128,
wallet_balance: U256,
gas_limit_hint: u64,
nonce: u64,
) -> Result<Option<FrontRunTx>, AppError> {
if wallet_balance.is_zero() {
return Ok(None);
}
let target_token = Self::target_token(&observed.path, self.wrapped_native)
.ok_or_else(|| AppError::Strategy("Unable to derive target token".into()))?;
let value = StrategyExecutor::dynamic_backrun_value(
observed.amount_in,
wallet_balance,
self.slippage_bps,
gas_limit_hint,
max_fee_per_gas,
)?;
let router_contract = UniV2Router::new(observed.router, self.http_provider.clone());
let path = vec![self.wrapped_native, target_token];
let quote_path = path.clone();
let quote_contract = router_contract.clone();
let quote_value = value;
let quote: Vec<U256> = retry_async(
move |_| {
let c = quote_contract.clone();
let p = quote_path.clone();
async move { c.getAmountsOut(quote_value, p.clone()).call().await }
},
3,
Duration::from_millis(100),
)
.await
.map_err(|e| AppError::Strategy(format!("Front-run quote failed: {}", e)))?;
let expected_tokens = *quote
.last()
.ok_or_else(|| AppError::Strategy("Front-run quote missing amounts".into()))?;
let min_out = expected_tokens
.saturating_mul(U256::from(10_000u64 - self.slippage_bps))
/ U256::from(10_000u64);
let deadline = U256::from((chrono::Utc::now().timestamp() as u64) + 300);
let calldata = router_contract
.swapExactETHForTokens(min_out, path, self.signer.address(), deadline)
.calldata()
.to_vec();
let mut gas_limit = gas_limit_hint
.saturating_mul(11)
.checked_div(10)
.unwrap_or(300_000);
if gas_limit < 160_000 {
gas_limit = 160_000;
}
let mut tx = TxEip1559 {
chain_id: self.chain_id,
nonce,
max_priority_fee_per_gas,
max_fee_per_gas,
gas_limit,
to: TxKind::Call(observed.router),
value,
access_list: Default::default(),
input: calldata.clone().into(),
};
let sig = TxSignerSync::sign_transaction_sync(&self.signer, &mut tx)
.map_err(|e| AppError::Strategy(format!("Sign front-run failed: {}", e)))?;
let signed: TxEnvelope = tx.into_signed(sig).into();
let raw = signed.encoded_2718();
let request = TransactionRequest {
from: Some(self.signer.address()),
to: Some(TxKind::Call(observed.router)),
max_fee_per_gas: Some(max_fee_per_gas),
max_priority_fee_per_gas: Some(max_priority_fee_per_gas),
gas: Some(gas_limit),
value: Some(value),
input: TransactionInput::new(calldata.into()),
nonce: Some(nonce),
chain_id: Some(self.chain_id),
..Default::default()
};
Ok(Some(FrontRunTx {
raw,
hash: *signed.tx_hash(),
to: observed.router,
value,
request,
expected_tokens,
}))
}
async fn build_backrun_tx(
&self,
observed: &ObservedSwap,
max_fee_per_gas: u128,
max_priority_fee_per_gas: u128,
wallet_balance: U256,
gas_limit_hint: u64,
token_in_override: Option<U256>,
nonce: u64,
) -> Result<BackrunTx, AppError> {
let target_token = Self::target_token(&observed.path, self.wrapped_native)
.ok_or_else(|| AppError::Strategy("Unable to derive target token".into()))?;
if wallet_balance.is_zero() {
return Err(AppError::Strategy(
"No balance available for backrun".into(),
));
}
let router_contract = UniV2Router::new(observed.router, self.http_provider.clone());
let (value, expected_out, calldata) = if let Some(tokens_in) = token_in_override {
let sell_path = vec![target_token, self.wrapped_native];
let quote_path = sell_path.clone();
let quote_contract = router_contract.clone();
let sell_amount = tokens_in;
let quote: Vec<U256> = retry_async(
move |_| {
let c = quote_contract.clone();
let p = quote_path.clone();
async move { c.getAmountsOut(sell_amount, p.clone()).call().await }
},
3,
Duration::from_millis(100),
)
.await
.map_err(|e| AppError::Strategy(format!("Sell quote failed: {}", e)))?;
let expected_out = *quote
.last()
.ok_or_else(|| AppError::Strategy("Sell quote missing amounts".into()))?;
let min_out = expected_out
.saturating_mul(U256::from(10_000u64 - self.slippage_bps))
/ U256::from(10_000u64);
let deadline = U256::from((chrono::Utc::now().timestamp() as u64) + 300);
let calldata = router_contract
.swapExactTokensForETH(sell_amount, min_out, sell_path, self.signer.address(), deadline)
.calldata()
.to_vec();
(U256::ZERO, expected_out, calldata)
} else {
let value = StrategyExecutor::dynamic_backrun_value(
observed.amount_in,
wallet_balance,
self.slippage_bps,
gas_limit_hint,
max_fee_per_gas,
)?;
let buy_path = vec![self.wrapped_native, target_token, self.wrapped_native];
let quote_path = buy_path.clone();
let quote_contract = router_contract.clone();
let quote_value = value;
let quote: Vec<U256> = retry_async(
move |_| {
let c = quote_contract.clone();
let p = quote_path.clone();
async move { c.getAmountsOut(quote_value, p.clone()).call().await }
},
3,
Duration::from_millis(100),
)
.await
.map_err(|e| AppError::Strategy(format!("Quote failed: {}", e)))?;
let expected_out = *quote
.last()
.ok_or_else(|| AppError::Strategy("Quote missing amounts".into()))?;
let re_quote_contract = router_contract.clone();
let re_quote_path = buy_path.clone();
let second_quote: Vec<U256> = retry_async(
move |_| {
let c = re_quote_contract.clone();
let p = re_quote_path.clone();
async move { c.getAmountsOut(value, p.clone()).call().await }
},
3,
Duration::from_millis(100),
)
.await
.map_err(|e| AppError::Strategy(format!("Re-quote failed: {}", e)))?;
let second_out = *second_quote
.last()
.ok_or_else(|| AppError::Strategy("Re-quote missing amounts".into()))?;
let drift = if expected_out > second_out {
expected_out - second_out
} else {
second_out - expected_out
};
let tolerance = expected_out
.saturating_mul(U256::from(self.slippage_bps * 2))
/ U256::from(10_000u64);
if drift > tolerance {
return Err(AppError::Strategy("Quote drift too high, skip".into()));
}
let min_out = expected_out.saturating_mul(U256::from(10_000u64 - self.slippage_bps))
/ U256::from(10_000u64);
let deadline = U256::from((chrono::Utc::now().timestamp() as u64) + 300);
let calldata = router_contract
.swapExactETHForTokens(min_out, buy_path, self.signer.address(), deadline)
.calldata()
.to_vec();
(value, expected_out, calldata)
};
let mut gas_limit = gas_limit_hint
.saturating_mul(12)
.checked_div(10)
.unwrap_or(350_000);
if gas_limit < 200_000 {
gas_limit = 200_000;
}
let mut tx = TxEip1559 {
chain_id: self.chain_id,
nonce,
max_priority_fee_per_gas,
max_fee_per_gas,
gas_limit,
to: TxKind::Call(observed.router),
value,
access_list: Default::default(),
input: calldata.clone().into(),
};
let sig = TxSignerSync::sign_transaction_sync(&self.signer, &mut tx)
.map_err(|e| AppError::Strategy(format!("Sign backrun failed: {}", e)))?;
let signed: TxEnvelope = tx.into_signed(sig).into();
let raw = signed.encoded_2718();
let request = TransactionRequest {
from: Some(self.signer.address()),
to: Some(TxKind::Call(observed.router)),
max_fee_per_gas: Some(max_fee_per_gas),
max_priority_fee_per_gas: Some(max_priority_fee_per_gas),
gas: Some(gas_limit),
value: Some(value),
input: TransactionInput::new(calldata.into()),
nonce: Some(nonce),
chain_id: Some(self.chain_id),
..Default::default()
};
Ok(BackrunTx {
raw,
hash: *signed.tx_hash(),
to: observed.router,
value,
request,
expected_out,
})
}
async fn await_receipt(&self, hash: &B256) -> Result<(), AppError> {
for _ in 0..3 {
if let Ok(Some(rcpt)) = self.http_provider.get_transaction_receipt(*hash).await {
let block_num = rcpt.block_number;
let status = rcpt.status();
let _ = self.db.update_status(
&format!("{:#x}", hash),
block_num.map(|b| b as i64),
Some(status),
);
break;
}
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
}
Ok(())
}
}
fn wei_to_eth_f64(value: U256) -> f64 {
let wei_in_eth = 1_000_000_000_000_000_000u128;
let num: u128 = value.try_into().unwrap_or(u128::MAX);
(num as f64) / (wei_in_eth as f64)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::common::constants::WETH_MAINNET;
#[test]
fn decodes_eth_swap() {
let router = WETH_MAINNET;
let call = swapExactETHForTokensCall {
amountOutMin: U256::from(5u64),
path: vec![WETH_MAINNET, Address::from([2u8; 20])],
to: Address::from([3u8; 20]),
deadline: U256::from(100u64),
};
let data = call.abi_encode();
let decoded = StrategyExecutor::decode_swap_input(
router,
&data,
U256::from(1_000_000_000_000_000_000u128),
)
.expect("decode");
assert_eq!(decoded.path.len(), 2);
assert_eq!(decoded.min_out, U256::from(5u64));
}
#[test]
fn wei_to_eth_conversion() {
let two_eth = U256::from(2_000_000_000_000_000_000u128);
let eth = wei_to_eth_f64(two_eth);
assert!((eth - 2.0).abs() < 1e-9);
}
#[test]
fn decodes_uniswap_v3_exact_input_single() {
use alloy::primitives::{U160, aliases::U24};
let params = UniV3Router::ExactInputSingleParams {
tokenIn: WETH_MAINNET,
tokenOut: Address::from([2u8; 20]),
fee: U24::from(500u32),
recipient: Address::from([3u8; 20]),
deadline: U256::from(100u64),
amountIn: U256::from(1_000_000_000_000_000_000u128),
amountOutMinimum: U256::from(5u64),
sqrtPriceLimitX96: U160::ZERO,
};
let call = UniV3Router::exactInputSingleCall { params };
let data = call.abi_encode();
let decoded = StrategyExecutor::decode_swap_input(
WETH_MAINNET,
&data,
U256::from(0u64),
)
.expect("decode v3 single");
assert_eq!(decoded.router_kind, RouterKind::V3Like);
assert_eq!(decoded.path.len(), 2);
}
#[test]
fn parses_uniswap_v3_path() {
let mut path: Vec<u8> = Vec::new();
path.extend_from_slice(WETH_MAINNET.as_slice());
path.extend_from_slice(&[0u8, 1u8, 244u8]); let out = Address::from([9u8; 20]);
path.extend_from_slice(out.as_slice());
let parsed = StrategyExecutor::parse_v3_path(&path).expect("parse path");
assert_eq!(parsed.len(), 2);
assert_eq!(parsed[1], out);
}
#[test]
fn detects_token_calls() {
let transfer_selector = [0xa9, 0x05, 0x9c, 0xbb, 0u8];
assert!(StrategyExecutor::is_common_token_call(&transfer_selector));
let random = [0x12, 0x34, 0x56, 0x78];
assert!(!StrategyExecutor::is_common_token_call(&random));
}
#[test]
fn classifies_swap_direction() {
let buy = ObservedSwap {
router: Address::ZERO,
path: vec![WETH_MAINNET, Address::from([2u8; 20])],
amount_in: U256::from(1u64),
min_out: U256::ZERO,
recipient: Address::ZERO,
router_kind: RouterKind::V2Like,
};
assert_eq!(
StrategyExecutor::direction(&buy, WETH_MAINNET),
SwapDirection::BuyWithEth
);
let sell = ObservedSwap {
path: vec![Address::from([2u8; 20]), WETH_MAINNET],
..buy
};
assert_eq!(
StrategyExecutor::direction(&sell, WETH_MAINNET),
SwapDirection::SellForEth
);
}
}