oxidized_builder/core/
executor.rs

1use crate::common::error::AppError;
2use crate::network::provider::HttpProvider;
3use alloy::primitives::keccak256;
4use alloy::providers::Provider;
5use alloy::signers::local::PrivateKeySigner;
6use alloy::signers::SignerSync;
7use reqwest::header::HeaderValue;
8use serde::Serialize;
9use serde_json::json;
10use std::sync::Arc;
11use std::time::{SystemTime, UNIX_EPOCH};
12
13#[derive(Clone, Debug, Serialize)]
14#[serde(untagged)]
15pub enum BundleItem {
16    Hash { hash: String },
17    Tx { tx: String, #[serde(rename = "canRevert")] can_revert: bool },
18}
19
20/// Sends bundles or raw transactions to the network/relays.
21pub struct BundleSender {
22    provider: HttpProvider,
23    dry_run: bool,
24    relay_url: String,
25    signer: PrivateKeySigner,
26}
27
28impl BundleSender {
29    pub fn new(
30        provider: HttpProvider,
31        dry_run: bool,
32        relay_url: String,
33        signer: PrivateKeySigner,
34    ) -> Self {
35        Self {
36            provider,
37            dry_run,
38            relay_url,
39            signer,
40        }
41    }
42
43    /// Send a MEV-Share bundle that references tx hashes (instead of raw bytes).
44    pub async fn send_mev_share_bundle(&self, body: &[BundleItem]) -> Result<(), AppError> {
45        if self.dry_run {
46            tracing::info!(target: "executor", "Dry-run: would send mev_sendBundle with {} legs", body.len());
47            return Ok(());
48        }
49
50        let block_number = self.provider.get_block_number().await.map_err(|e| {
51            AppError::Connection(format!("Failed to fetch block number: {}", e))
52        })?;
53        let params = json!({
54            "version": "v0.1",
55            "inclusion": {
56                "block": format!("0x{:x}", block_number + 1),
57                "maxBlock": format!("0x{:x}", block_number + 4),
58            },
59            "body": body,
60            "privacy": {
61                "builders": ["flashbots"]
62            }
63        });
64
65        let payload = json!({
66            "jsonrpc": "2.0",
67            "id": 1,
68            "method": "mev_sendBundle",
69            "params": [params]
70        });
71
72        let body_bytes =
73            serde_json::to_vec(&payload).map_err(|e| AppError::Initialization(e.to_string()))?;
74        let sig_header = self.sign_request(&body_bytes)?;
75
76        let client = reqwest::Client::new();
77        let mut attempts = 0;
78        loop {
79            attempts += 1;
80            let resp = client
81                .post(&self.relay_url)
82                .header("Content-Type", "application/json")
83                .header(
84                    "X-Flashbots-Signature",
85                    HeaderValue::from_str(&sig_header).map_err(|e| {
86                        AppError::Connection(format!("Signature header invalid: {}", e))
87                    })?,
88                )
89                .body(body_bytes.clone())
90                .send()
91                .await
92                .map_err(|e| AppError::Connection(format!("Relay POST failed: {}", e)))?;
93
94            let status = resp.status();
95            let body_text = resp.text().await.unwrap_or_default();
96            if status.is_success() {
97                tracing::info!(target: "executor", relay=%self.relay_url, block=block_number + 1, legs=body.len(), body=%body_text, "MEV-Share bundle submitted");
98                break;
99            } else if attempts < 2 {
100                tracing::warn!(target: "executor", status=%status, body=%body_text, attempt=attempts, "Relay rejected mev_sendBundle, retrying");
101                continue;
102            } else {
103                return Err(AppError::Connection(format!(
104                    "Relay rejected mev_sendBundle: {} body={}",
105                    status, body_text
106                )));
107            }
108        }
109
110        Ok(())
111    }
112
113    /// Broadcast a list of raw transaction payloads (RLP encoded).
114    /// In dry-run mode this only logs.
115    pub async fn send_bundle(&self, raw_txs: &[Vec<u8>], chain_id: u64) -> Result<(), AppError> {
116        if self.dry_run {
117            tracing::info!("Dry-run: would send bundle with {} txs", raw_txs.len());
118            return Ok(());
119        }
120
121        if chain_id == 1 {
122            self.send_flashbots(raw_txs).await
123        } else {
124            self.send_direct(raw_txs).await
125        }
126    }
127
128    async fn send_direct(&self, raw_txs: &[Vec<u8>]) -> Result<(), AppError> {
129        for raw in raw_txs {
130            let mut attempts = 0;
131            loop {
132                attempts += 1;
133                let res = self.provider.send_raw_transaction(raw.as_slice()).await;
134                match res {
135                    Ok(_) => break,
136                    Err(e) if attempts < 2 => {
137                        tracing::warn!(target: "executor", error=%e, attempt=attempts, "Retrying raw tx send");
138                        continue;
139                    }
140                    Err(e) => {
141                        return Err(AppError::Connection(format!("Bundle send failed: {}", e)))
142                    }
143                }
144            }
145        }
146        Ok(())
147    }
148
149    async fn send_flashbots(&self, raw_txs: &[Vec<u8>]) -> Result<(), AppError> {
150        let block_number =
151            self.provider.get_block_number().await.map_err(|e| {
152                AppError::Connection(format!("Failed to fetch block number: {}", e))
153            })?;
154        let target_block = block_number + 1;
155        let params = json!({
156            "txs": raw_txs.iter().map(|r| format!("0x{}", hex::encode(r))).collect::<Vec<_>>(),
157            "blockNumber": format!("0x{:x}", target_block),
158            "minTimestamp": current_unix(),
159        });
160
161        let body = json!({
162            "jsonrpc": "2.0",
163            "id": 1,
164            "method": "eth_sendBundle",
165            "params": [params]
166        });
167
168        let body_bytes =
169            serde_json::to_vec(&body).map_err(|e| AppError::Initialization(e.to_string()))?;
170        let sig_header = self.sign_request(&body_bytes)?;
171
172        let client = reqwest::Client::new();
173        let mut attempts = 0;
174        loop {
175            attempts += 1;
176            let resp = client
177                .post(&self.relay_url)
178                .header("Content-Type", "application/json")
179                .header(
180                    "X-Flashbots-Signature",
181                    HeaderValue::from_str(&sig_header).map_err(|e| {
182                        AppError::Connection(format!("Signature header invalid: {}", e))
183                    })?,
184                )
185                .body(body_bytes.clone())
186                .send()
187                .await
188                .map_err(|e| AppError::Connection(format!("Relay POST failed: {}", e)))?;
189
190            let status = resp.status();
191            let body_text = resp.text().await.unwrap_or_default();
192            if status.is_success() {
193                tracing::info!(target: "executor", relay=%self.relay_url, block=target_block, txs=raw_txs.len(), body=%body_text, "Flashbots bundle submitted");
194                break;
195            } else if attempts < 2 {
196                tracing::warn!(target: "executor", status=%status, body=%body_text, attempt=attempts, "Relay rejected bundle, retrying");
197                continue;
198            } else {
199                return Err(AppError::Connection(format!(
200                    "Relay rejected bundle: {} body={}",
201                    status, body_text
202                )));
203            }
204        }
205
206        Ok(())
207    }
208
209    fn sign_request(&self, body_bytes: &[u8]) -> Result<String, AppError> {
210        let hash = keccak256(body_bytes);
211        let sig = self
212            .signer
213            .sign_hash_sync(&hash)
214            .map_err(|e| AppError::Connection(format!("Bundle signing failed: {}", e)))?;
215        let sig_hex = format!("0x{}", hex::encode(sig.as_bytes()));
216        Ok(format!("{:#x}:{}", self.signer.address(), sig_hex))
217    }
218}
219
220pub type SharedBundleSender = Arc<BundleSender>;
221
222fn current_unix() -> u64 {
223    SystemTime::now()
224        .duration_since(UNIX_EPOCH)
225        .unwrap_or_default()
226        .as_secs()
227}