use crate::common::error::AppError;
use crate::network::provider::HttpProvider;
use alloy::primitives::keccak256;
use alloy::providers::Provider;
use alloy::signers::local::PrivateKeySigner;
use alloy::signers::SignerSync;
use reqwest::header::HeaderValue;
use serde::Serialize;
use serde_json::json;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
#[derive(Clone, Debug, Serialize)]
#[serde(untagged)]
pub enum BundleItem {
Hash { hash: String },
Tx { tx: String, #[serde(rename = "canRevert")] can_revert: bool },
}
pub struct BundleSender {
provider: HttpProvider,
dry_run: bool,
relay_url: String,
signer: PrivateKeySigner,
}
impl BundleSender {
pub fn new(
provider: HttpProvider,
dry_run: bool,
relay_url: String,
signer: PrivateKeySigner,
) -> Self {
Self {
provider,
dry_run,
relay_url,
signer,
}
}
pub async fn send_mev_share_bundle(&self, body: &[BundleItem]) -> Result<(), AppError> {
if self.dry_run {
tracing::info!(target: "executor", "Dry-run: would send mev_sendBundle with {} legs", body.len());
return Ok(());
}
let block_number = self.provider.get_block_number().await.map_err(|e| {
AppError::Connection(format!("Failed to fetch block number: {}", e))
})?;
let params = json!({
"version": "v0.1",
"inclusion": {
"block": format!("0x{:x}", block_number + 1),
"maxBlock": format!("0x{:x}", block_number + 4),
},
"body": body,
"privacy": {
"builders": ["flashbots"]
}
});
let payload = json!({
"jsonrpc": "2.0",
"id": 1,
"method": "mev_sendBundle",
"params": [params]
});
let body_bytes =
serde_json::to_vec(&payload).map_err(|e| AppError::Initialization(e.to_string()))?;
let sig_header = self.sign_request(&body_bytes)?;
let client = reqwest::Client::new();
let mut attempts = 0;
loop {
attempts += 1;
let resp = client
.post(&self.relay_url)
.header("Content-Type", "application/json")
.header(
"X-Flashbots-Signature",
HeaderValue::from_str(&sig_header).map_err(|e| {
AppError::Connection(format!("Signature header invalid: {}", e))
})?,
)
.body(body_bytes.clone())
.send()
.await
.map_err(|e| AppError::Connection(format!("Relay POST failed: {}", e)))?;
let status = resp.status();
let body_text = resp.text().await.unwrap_or_default();
if status.is_success() {
tracing::info!(target: "executor", relay=%self.relay_url, block=block_number + 1, legs=body.len(), body=%body_text, "MEV-Share bundle submitted");
break;
} else if attempts < 2 {
tracing::warn!(target: "executor", status=%status, body=%body_text, attempt=attempts, "Relay rejected mev_sendBundle, retrying");
continue;
} else {
return Err(AppError::Connection(format!(
"Relay rejected mev_sendBundle: {} body={}",
status, body_text
)));
}
}
Ok(())
}
pub async fn send_bundle(&self, raw_txs: &[Vec<u8>], chain_id: u64) -> Result<(), AppError> {
if self.dry_run {
tracing::info!("Dry-run: would send bundle with {} txs", raw_txs.len());
return Ok(());
}
if chain_id == 1 {
self.send_flashbots(raw_txs).await
} else {
self.send_direct(raw_txs).await
}
}
async fn send_direct(&self, raw_txs: &[Vec<u8>]) -> Result<(), AppError> {
for raw in raw_txs {
let mut attempts = 0;
loop {
attempts += 1;
let res = self.provider.send_raw_transaction(raw.as_slice()).await;
match res {
Ok(_) => break,
Err(e) if attempts < 2 => {
tracing::warn!(target: "executor", error=%e, attempt=attempts, "Retrying raw tx send");
continue;
}
Err(e) => {
return Err(AppError::Connection(format!("Bundle send failed: {}", e)))
}
}
}
}
Ok(())
}
async fn send_flashbots(&self, raw_txs: &[Vec<u8>]) -> Result<(), AppError> {
let block_number =
self.provider.get_block_number().await.map_err(|e| {
AppError::Connection(format!("Failed to fetch block number: {}", e))
})?;
let target_block = block_number + 1;
let params = json!({
"txs": raw_txs.iter().map(|r| format!("0x{}", hex::encode(r))).collect::<Vec<_>>(),
"blockNumber": format!("0x{:x}", target_block),
"minTimestamp": current_unix(),
});
let body = json!({
"jsonrpc": "2.0",
"id": 1,
"method": "eth_sendBundle",
"params": [params]
});
let body_bytes =
serde_json::to_vec(&body).map_err(|e| AppError::Initialization(e.to_string()))?;
let sig_header = self.sign_request(&body_bytes)?;
let client = reqwest::Client::new();
let mut attempts = 0;
loop {
attempts += 1;
let resp = client
.post(&self.relay_url)
.header("Content-Type", "application/json")
.header(
"X-Flashbots-Signature",
HeaderValue::from_str(&sig_header).map_err(|e| {
AppError::Connection(format!("Signature header invalid: {}", e))
})?,
)
.body(body_bytes.clone())
.send()
.await
.map_err(|e| AppError::Connection(format!("Relay POST failed: {}", e)))?;
let status = resp.status();
let body_text = resp.text().await.unwrap_or_default();
if status.is_success() {
tracing::info!(target: "executor", relay=%self.relay_url, block=target_block, txs=raw_txs.len(), body=%body_text, "Flashbots bundle submitted");
break;
} else if attempts < 2 {
tracing::warn!(target: "executor", status=%status, body=%body_text, attempt=attempts, "Relay rejected bundle, retrying");
continue;
} else {
return Err(AppError::Connection(format!(
"Relay rejected bundle: {} body={}",
status, body_text
)));
}
}
Ok(())
}
fn sign_request(&self, body_bytes: &[u8]) -> Result<String, AppError> {
let hash = keccak256(body_bytes);
let sig = self
.signer
.sign_hash_sync(&hash)
.map_err(|e| AppError::Connection(format!("Bundle signing failed: {}", e)))?;
let sig_hex = format!("0x{}", hex::encode(sig.as_bytes()));
Ok(format!("{:#x}:{}", self.signer.address(), sig_hex))
}
}
pub type SharedBundleSender = Arc<BundleSender>;
fn current_unix() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
}