1use crate::common::error::AppError;
2use crate::network::provider::HttpProvider;
3use alloy::primitives::keccak256;
4use alloy::providers::Provider;
5use alloy::signers::local::PrivateKeySigner;
6use alloy::signers::SignerSync;
7use reqwest::header::HeaderValue;
8use serde::Serialize;
9use serde_json::json;
10use std::sync::Arc;
11use std::time::{SystemTime, UNIX_EPOCH};
12
13#[derive(Clone, Debug, Serialize)]
14#[serde(untagged)]
15pub enum BundleItem {
16 Hash { hash: String },
17 Tx { tx: String, #[serde(rename = "canRevert")] can_revert: bool },
18}
19
20pub struct BundleSender {
22 provider: HttpProvider,
23 dry_run: bool,
24 relay_url: String,
25 signer: PrivateKeySigner,
26}
27
28impl BundleSender {
29 pub fn new(
30 provider: HttpProvider,
31 dry_run: bool,
32 relay_url: String,
33 signer: PrivateKeySigner,
34 ) -> Self {
35 Self {
36 provider,
37 dry_run,
38 relay_url,
39 signer,
40 }
41 }
42
43 pub async fn send_mev_share_bundle(&self, body: &[BundleItem]) -> Result<(), AppError> {
45 if self.dry_run {
46 tracing::info!(target: "executor", "Dry-run: would send mev_sendBundle with {} legs", body.len());
47 return Ok(());
48 }
49
50 let block_number = self.provider.get_block_number().await.map_err(|e| {
51 AppError::Connection(format!("Failed to fetch block number: {}", e))
52 })?;
53 let params = json!({
54 "version": "v0.1",
55 "inclusion": {
56 "block": format!("0x{:x}", block_number + 1),
57 "maxBlock": format!("0x{:x}", block_number + 4),
58 },
59 "body": body,
60 "privacy": {
61 "builders": ["flashbots"]
62 }
63 });
64
65 let payload = json!({
66 "jsonrpc": "2.0",
67 "id": 1,
68 "method": "mev_sendBundle",
69 "params": [params]
70 });
71
72 let body_bytes =
73 serde_json::to_vec(&payload).map_err(|e| AppError::Initialization(e.to_string()))?;
74 let sig_header = self.sign_request(&body_bytes)?;
75
76 let client = reqwest::Client::new();
77 let mut attempts = 0;
78 loop {
79 attempts += 1;
80 let resp = client
81 .post(&self.relay_url)
82 .header("Content-Type", "application/json")
83 .header(
84 "X-Flashbots-Signature",
85 HeaderValue::from_str(&sig_header).map_err(|e| {
86 AppError::Connection(format!("Signature header invalid: {}", e))
87 })?,
88 )
89 .body(body_bytes.clone())
90 .send()
91 .await
92 .map_err(|e| AppError::Connection(format!("Relay POST failed: {}", e)))?;
93
94 let status = resp.status();
95 let body_text = resp.text().await.unwrap_or_default();
96 if status.is_success() {
97 tracing::info!(target: "executor", relay=%self.relay_url, block=block_number + 1, legs=body.len(), body=%body_text, "MEV-Share bundle submitted");
98 break;
99 } else if attempts < 2 {
100 tracing::warn!(target: "executor", status=%status, body=%body_text, attempt=attempts, "Relay rejected mev_sendBundle, retrying");
101 continue;
102 } else {
103 return Err(AppError::Connection(format!(
104 "Relay rejected mev_sendBundle: {} body={}",
105 status, body_text
106 )));
107 }
108 }
109
110 Ok(())
111 }
112
113 pub async fn send_bundle(&self, raw_txs: &[Vec<u8>], chain_id: u64) -> Result<(), AppError> {
116 if self.dry_run {
117 tracing::info!("Dry-run: would send bundle with {} txs", raw_txs.len());
118 return Ok(());
119 }
120
121 if chain_id == 1 {
122 self.send_flashbots(raw_txs).await
123 } else {
124 self.send_direct(raw_txs).await
125 }
126 }
127
128 async fn send_direct(&self, raw_txs: &[Vec<u8>]) -> Result<(), AppError> {
129 for raw in raw_txs {
130 let mut attempts = 0;
131 loop {
132 attempts += 1;
133 let res = self.provider.send_raw_transaction(raw.as_slice()).await;
134 match res {
135 Ok(_) => break,
136 Err(e) if attempts < 2 => {
137 tracing::warn!(target: "executor", error=%e, attempt=attempts, "Retrying raw tx send");
138 continue;
139 }
140 Err(e) => {
141 return Err(AppError::Connection(format!("Bundle send failed: {}", e)))
142 }
143 }
144 }
145 }
146 Ok(())
147 }
148
149 async fn send_flashbots(&self, raw_txs: &[Vec<u8>]) -> Result<(), AppError> {
150 let block_number =
151 self.provider.get_block_number().await.map_err(|e| {
152 AppError::Connection(format!("Failed to fetch block number: {}", e))
153 })?;
154 let target_block = block_number + 1;
155 let params = json!({
156 "txs": raw_txs.iter().map(|r| format!("0x{}", hex::encode(r))).collect::<Vec<_>>(),
157 "blockNumber": format!("0x{:x}", target_block),
158 "minTimestamp": current_unix(),
159 });
160
161 let body = json!({
162 "jsonrpc": "2.0",
163 "id": 1,
164 "method": "eth_sendBundle",
165 "params": [params]
166 });
167
168 let body_bytes =
169 serde_json::to_vec(&body).map_err(|e| AppError::Initialization(e.to_string()))?;
170 let sig_header = self.sign_request(&body_bytes)?;
171
172 let client = reqwest::Client::new();
173 let mut attempts = 0;
174 loop {
175 attempts += 1;
176 let resp = client
177 .post(&self.relay_url)
178 .header("Content-Type", "application/json")
179 .header(
180 "X-Flashbots-Signature",
181 HeaderValue::from_str(&sig_header).map_err(|e| {
182 AppError::Connection(format!("Signature header invalid: {}", e))
183 })?,
184 )
185 .body(body_bytes.clone())
186 .send()
187 .await
188 .map_err(|e| AppError::Connection(format!("Relay POST failed: {}", e)))?;
189
190 let status = resp.status();
191 let body_text = resp.text().await.unwrap_or_default();
192 if status.is_success() {
193 tracing::info!(target: "executor", relay=%self.relay_url, block=target_block, txs=raw_txs.len(), body=%body_text, "Flashbots bundle submitted");
194 break;
195 } else if attempts < 2 {
196 tracing::warn!(target: "executor", status=%status, body=%body_text, attempt=attempts, "Relay rejected bundle, retrying");
197 continue;
198 } else {
199 return Err(AppError::Connection(format!(
200 "Relay rejected bundle: {} body={}",
201 status, body_text
202 )));
203 }
204 }
205
206 Ok(())
207 }
208
209 fn sign_request(&self, body_bytes: &[u8]) -> Result<String, AppError> {
210 let hash = keccak256(body_bytes);
211 let sig = self
212 .signer
213 .sign_hash_sync(&hash)
214 .map_err(|e| AppError::Connection(format!("Bundle signing failed: {}", e)))?;
215 let sig_hex = format!("0x{}", hex::encode(sig.as_bytes()));
216 Ok(format!("{:#x}:{}", self.signer.address(), sig_hex))
217 }
218}
219
220pub type SharedBundleSender = Arc<BundleSender>;
221
222fn current_unix() -> u64 {
223 SystemTime::now()
224 .duration_since(UNIX_EPOCH)
225 .unwrap_or_default()
226 .as_secs()
227}