Skip to main content

sof_tx/submit/
jito.rs

1//! Jito block-engine submit transport implementation.
2
3use std::time::Duration;
4
5use async_trait::async_trait;
6use base64::{Engine as _, engine::general_purpose::STANDARD as BASE64_STANDARD};
7use reqwest::{Url, redirect::Policy};
8use serde::{Deserialize, Serialize};
9use serde_json::from_slice as json_from_slice;
10use sof_support::time_support::nonzero_duration_or;
11
12use super::{JitoSubmitConfig, JitoSubmitResponse, JitoSubmitTransport, SubmitTransportError};
13
14/// Default Jito mainnet block-engine base URL.
15const DEFAULT_JITO_BLOCK_ENGINE_URL: &str = "https://mainnet.block-engine.jito.wtf";
16/// Maximum HTTP body size accepted from Jito submit responses.
17const MAX_JITO_SUBMIT_RESPONSE_BYTES: usize = 64 * 1024;
18/// Default timeout used for Jito HTTP requests.
19const DEFAULT_JITO_REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
20
21/// Typed Jito mainnet region.
22#[derive(Debug, Clone, Copy, Eq, PartialEq)]
23pub enum JitoBlockEngineRegion {
24    /// Amsterdam region.
25    Amsterdam,
26    /// Dublin region.
27    Dublin,
28    /// Frankfurt region.
29    Frankfurt,
30    /// London region.
31    London,
32    /// New York region.
33    NewYork,
34    /// Salt Lake City region.
35    SaltLakeCity,
36    /// Singapore region.
37    Singapore,
38    /// Tokyo region.
39    Tokyo,
40}
41
42/// Typed Jito block-engine endpoint.
43#[derive(Debug, Clone, Eq, PartialEq)]
44pub enum JitoBlockEngineEndpoint {
45    /// Default Jito mainnet block-engine endpoint.
46    Mainnet,
47    /// Region-specific Jito mainnet block-engine endpoint.
48    MainnetRegion(JitoBlockEngineRegion),
49    /// Custom parsed block-engine base URL.
50    Custom(Url),
51}
52
53impl JitoBlockEngineEndpoint {
54    /// Returns the default mainnet block-engine endpoint.
55    #[must_use]
56    pub const fn mainnet() -> Self {
57        Self::Mainnet
58    }
59
60    /// Returns one typed regional mainnet block-engine endpoint.
61    #[must_use]
62    pub const fn mainnet_region(region: JitoBlockEngineRegion) -> Self {
63        Self::MainnetRegion(region)
64    }
65
66    /// Creates a custom block-engine endpoint from a parsed URL.
67    #[must_use]
68    pub const fn custom(url: Url) -> Self {
69        Self::Custom(url)
70    }
71
72    /// Returns the base URL.
73    #[must_use]
74    pub fn as_url(&self) -> &str {
75        match self {
76            Self::Mainnet => DEFAULT_JITO_BLOCK_ENGINE_URL,
77            Self::MainnetRegion(region) => match region {
78                JitoBlockEngineRegion::Amsterdam => {
79                    "https://amsterdam.mainnet.block-engine.jito.wtf"
80                }
81                JitoBlockEngineRegion::Dublin => "https://dublin.mainnet.block-engine.jito.wtf",
82                JitoBlockEngineRegion::Frankfurt => {
83                    "https://frankfurt.mainnet.block-engine.jito.wtf"
84                }
85                JitoBlockEngineRegion::London => "https://london.mainnet.block-engine.jito.wtf",
86                JitoBlockEngineRegion::NewYork => "https://ny.mainnet.block-engine.jito.wtf",
87                JitoBlockEngineRegion::SaltLakeCity => "https://slc.mainnet.block-engine.jito.wtf",
88                JitoBlockEngineRegion::Singapore => {
89                    "https://singapore.mainnet.block-engine.jito.wtf"
90                }
91                JitoBlockEngineRegion::Tokyo => "https://tokyo.mainnet.block-engine.jito.wtf",
92            },
93            Self::Custom(url) => url.as_str(),
94        }
95    }
96}
97
98impl Default for JitoBlockEngineEndpoint {
99    fn default() -> Self {
100        Self::mainnet()
101    }
102}
103
104/// Transport-level Jito block-engine settings.
105#[derive(Debug, Clone, Eq, PartialEq)]
106pub struct JitoTransportConfig {
107    /// Target Jito block-engine endpoint.
108    pub endpoint: JitoBlockEngineEndpoint,
109    /// HTTP timeout applied to block-engine requests.
110    pub request_timeout: Duration,
111}
112
113impl Default for JitoTransportConfig {
114    fn default() -> Self {
115        Self {
116            endpoint: JitoBlockEngineEndpoint::default(),
117            request_timeout: DEFAULT_JITO_REQUEST_TIMEOUT,
118        }
119    }
120}
121
122/// Jito block-engine JSON-RPC transport using `/api/v1/transactions`.
123#[derive(Debug, Clone)]
124pub struct JitoJsonRpcTransport {
125    /// HTTP client used for block-engine calls.
126    client: reqwest::Client,
127    /// Transport-level request settings.
128    transport_config: JitoTransportConfig,
129}
130
131impl JitoJsonRpcTransport {
132    /// Creates a Jito block-engine transport.
133    ///
134    /// # Errors
135    ///
136    /// Returns [`SubmitTransportError::Config`] when HTTP client creation fails.
137    pub fn new() -> Result<Self, SubmitTransportError> {
138        Self::with_config(JitoTransportConfig::default())
139    }
140
141    /// Creates a Jito block-engine transport for one typed endpoint.
142    ///
143    /// # Errors
144    ///
145    /// Returns [`SubmitTransportError::Config`] when HTTP client creation fails.
146    pub fn with_endpoint(endpoint: JitoBlockEngineEndpoint) -> Result<Self, SubmitTransportError> {
147        Self::with_config(JitoTransportConfig {
148            endpoint,
149            ..JitoTransportConfig::default()
150        })
151    }
152
153    /// Creates a Jito block-engine transport with explicit transport settings.
154    ///
155    /// # Errors
156    ///
157    /// Returns [`SubmitTransportError::Config`] when HTTP client creation fails.
158    pub fn with_config(
159        transport_config: JitoTransportConfig,
160    ) -> Result<Self, SubmitTransportError> {
161        let request_timeout = nonzero_duration_or(
162            transport_config.request_timeout,
163            DEFAULT_JITO_REQUEST_TIMEOUT,
164        );
165        let client = reqwest::Client::builder()
166            .redirect(Policy::none())
167            .connect_timeout(request_timeout)
168            .timeout(request_timeout)
169            .build()
170            .map_err(|error| SubmitTransportError::Config {
171                message: error.to_string(),
172            })?;
173        Ok(Self {
174            client,
175            transport_config,
176        })
177    }
178
179    /// Builds the per-request endpoint URL with optional revert protection.
180    fn request_url(&self, config: &JitoSubmitConfig) -> String {
181        let mut url = self
182            .transport_config
183            .endpoint
184            .as_url()
185            .trim_end_matches('/')
186            .to_owned();
187        url.push_str("/api/v1/transactions");
188        if config.bundle_only {
189            url.push_str("?bundleOnly=true");
190        }
191        url
192    }
193}
194
195/// JSON-RPC envelope.
196#[derive(Debug, Deserialize)]
197struct JsonRpcResponse {
198    /// Result value for successful calls.
199    result: Option<String>,
200    /// Error payload for failed calls.
201    error: Option<JsonRpcError>,
202}
203
204/// JSON-RPC error object.
205#[derive(Debug, Deserialize)]
206struct JsonRpcError {
207    /// JSON-RPC error code.
208    code: i64,
209    /// Human-readable message.
210    message: String,
211}
212
213#[async_trait]
214impl JitoSubmitTransport for JitoJsonRpcTransport {
215    async fn submit_jito(
216        &self,
217        tx_bytes: &[u8],
218        config: &JitoSubmitConfig,
219    ) -> Result<JitoSubmitResponse, SubmitTransportError> {
220        #[derive(Debug, Serialize)]
221        struct JitoRpcConfig<'config> {
222            /// Transaction encoding format.
223            encoding: &'config str,
224        }
225
226        let encoded_tx = BASE64_STANDARD.encode(tx_bytes);
227        let payload = serde_json::json!({
228            "jsonrpc": "2.0",
229            "id": 1,
230            "method": "sendTransaction",
231            "params": [
232                encoded_tx,
233                JitoRpcConfig { encoding: "base64" }
234            ]
235        });
236
237        let response = self
238            .client
239            .post(self.request_url(config))
240            .json(&payload)
241            .send()
242            .await
243            .map_err(|error| SubmitTransportError::Failure {
244                message: error.to_string(),
245            })?;
246        if response.status().is_redirection() {
247            return Err(SubmitTransportError::Failure {
248                message: format!("unexpected redirect response: {}", response.status()),
249            });
250        }
251
252        let response =
253            response
254                .error_for_status()
255                .map_err(|error| SubmitTransportError::Failure {
256                    message: error.to_string(),
257                })?;
258
259        let response_body = read_http_response_bytes_bounded(response).await?;
260        let parsed: JsonRpcResponse =
261            json_from_slice(&response_body).map_err(|error| SubmitTransportError::Failure {
262                message: error.to_string(),
263            })?;
264
265        if let Some(signature) = parsed.result {
266            return Ok(JitoSubmitResponse {
267                transaction_signature: Some(signature),
268                bundle_id: None,
269            });
270        }
271        if let Some(error) = parsed.error {
272            return Err(SubmitTransportError::Failure {
273                message: format!("jito error {}: {}", error.code, error.message),
274            });
275        }
276
277        Err(SubmitTransportError::Failure {
278            message: "jito returned neither result nor error".to_owned(),
279        })
280    }
281}
282
283/// Reads one Jito submit response body while enforcing a fixed maximum byte budget.
284async fn read_http_response_bytes_bounded(
285    mut response: reqwest::Response,
286) -> Result<Vec<u8>, SubmitTransportError> {
287    if response
288        .content_length()
289        .is_some_and(|content_length| content_length > MAX_JITO_SUBMIT_RESPONSE_BYTES as u64)
290    {
291        return Err(SubmitTransportError::Failure {
292            message: format!(
293                "response body exceeded max size of {MAX_JITO_SUBMIT_RESPONSE_BYTES} bytes"
294            ),
295        });
296    }
297
298    let initial_capacity = response
299        .content_length()
300        .and_then(|content_length| usize::try_from(content_length).ok())
301        .unwrap_or(0)
302        .min(MAX_JITO_SUBMIT_RESPONSE_BYTES);
303    let mut body = Vec::with_capacity(initial_capacity);
304    while let Some(chunk) =
305        response
306            .chunk()
307            .await
308            .map_err(|error| SubmitTransportError::Failure {
309                message: error.to_string(),
310            })?
311    {
312        let remaining = MAX_JITO_SUBMIT_RESPONSE_BYTES.saturating_sub(body.len());
313        if chunk.len() > remaining {
314            return Err(SubmitTransportError::Failure {
315                message: format!(
316                    "response body exceeded max size of {MAX_JITO_SUBMIT_RESPONSE_BYTES} bytes"
317                ),
318            });
319        }
320        body.extend_from_slice(&chunk);
321    }
322    Ok(body)
323}
324
325#[cfg(test)]
326#[allow(clippy::indexing_slicing, clippy::panic)]
327mod tests {
328    use super::*;
329    use tokio::{
330        io::{AsyncReadExt, AsyncWriteExt},
331        net::TcpListener,
332    };
333
334    async fn spawn_http_response_server(response: String) -> String {
335        let listener = TcpListener::bind("127.0.0.1:0").await;
336        assert!(listener.is_ok());
337        let listener = listener.unwrap_or_else(|error| panic!("{error}"));
338        let addr = listener.local_addr();
339        assert!(addr.is_ok());
340        let addr = addr.unwrap_or_else(|error| panic!("{error}"));
341        tokio::spawn(async move {
342            let accepted = listener.accept().await;
343            assert!(accepted.is_ok());
344            let (mut stream, _) = accepted.unwrap_or_else(|error| panic!("{error}"));
345            let mut buffer = [0_u8; 4096];
346            let read = stream.read(&mut buffer).await;
347            assert!(read.is_ok());
348            let write = stream.write_all(response.as_bytes()).await;
349            assert!(write.is_ok());
350        });
351        format!("http://{addr}")
352    }
353
354    #[test]
355    fn request_url_uses_transactions_path() {
356        let transport_result = JitoJsonRpcTransport::new();
357        assert!(transport_result.is_ok());
358        let Some(transport) = transport_result.ok() else {
359            return;
360        };
361
362        let url = transport.request_url(&JitoSubmitConfig::default());
363
364        assert_eq!(
365            url,
366            "https://mainnet.block-engine.jito.wtf/api/v1/transactions"
367        );
368    }
369
370    #[test]
371    fn request_url_appends_bundle_only_query() {
372        let parsed_url_result = Url::parse("https://mainnet.block-engine.jito.wtf/");
373        assert!(parsed_url_result.is_ok());
374        let Some(parsed_url) = parsed_url_result.ok() else {
375            return;
376        };
377        let transport_result =
378            JitoJsonRpcTransport::with_endpoint(JitoBlockEngineEndpoint::custom(parsed_url));
379        assert!(transport_result.is_ok());
380        let Some(transport) = transport_result.ok() else {
381            return;
382        };
383
384        let url = transport.request_url(&JitoSubmitConfig { bundle_only: true });
385
386        assert_eq!(
387            url,
388            "https://mainnet.block-engine.jito.wtf/api/v1/transactions?bundleOnly=true"
389        );
390    }
391
392    #[test]
393    fn transport_config_defaults_are_stable() {
394        let config = JitoTransportConfig::default();
395
396        assert_eq!(config.endpoint, JitoBlockEngineEndpoint::mainnet());
397        assert_eq!(config.request_timeout, Duration::from_secs(10));
398    }
399
400    #[test]
401    fn transport_accepts_zero_timeout_config() {
402        let transport = JitoJsonRpcTransport::with_config(JitoTransportConfig {
403            endpoint: JitoBlockEngineEndpoint::default(),
404            request_timeout: Duration::ZERO,
405        });
406        assert!(transport.is_ok());
407    }
408
409    #[test]
410    fn regional_endpoint_uses_documented_slug() {
411        let endpoint = JitoBlockEngineEndpoint::mainnet_region(JitoBlockEngineRegion::Frankfurt);
412
413        assert_eq!(
414            endpoint.as_url(),
415            "https://frankfurt.mainnet.block-engine.jito.wtf"
416        );
417    }
418
419    #[tokio::test]
420    async fn jito_transport_rejects_redirects() {
421        let parsed_url = Url::parse(
422            &spawn_http_response_server(
423                "HTTP/1.1 307 Temporary Redirect\r\nlocation: http://127.0.0.1/\r\ncontent-length: 0\r\nconnection: close\r\n\r\n"
424                    .to_owned(),
425            )
426            .await,
427        );
428        assert!(parsed_url.is_ok());
429        let parsed_url = parsed_url.unwrap_or_else(|error| panic!("{error}"));
430        let transport =
431            JitoJsonRpcTransport::with_endpoint(JitoBlockEngineEndpoint::custom(parsed_url));
432        assert!(transport.is_ok());
433        let transport = transport.unwrap_or_else(|error| panic!("{error}"));
434
435        let error = transport
436            .submit_jito(&[1, 2, 3], &JitoSubmitConfig::default())
437            .await;
438        assert!(error.is_err());
439        let error = match error {
440            Ok(_response) => panic!("redirect should fail"),
441            Err(error) => error,
442        };
443        assert!(error.to_string().contains("redirect"));
444    }
445
446    #[tokio::test]
447    async fn jito_transport_rejects_oversized_responses() {
448        let endpoint = spawn_http_response_server(format!(
449            "HTTP/1.1 200 OK\r\ncontent-type: application/json\r\ncontent-length: {}\r\nconnection: close\r\n\r\n",
450            MAX_JITO_SUBMIT_RESPONSE_BYTES.saturating_add(1)
451        ))
452        .await;
453        let parsed_url = Url::parse(&endpoint);
454        assert!(parsed_url.is_ok());
455        let parsed_url = parsed_url.unwrap_or_else(|error| panic!("{error}"));
456        let transport =
457            JitoJsonRpcTransport::with_endpoint(JitoBlockEngineEndpoint::custom(parsed_url));
458        assert!(transport.is_ok());
459        let transport = transport.unwrap_or_else(|error| panic!("{error}"));
460
461        let error = transport
462            .submit_jito(&[1, 2, 3], &JitoSubmitConfig::default())
463            .await;
464        assert!(error.is_err());
465        let error = match error {
466            Ok(_response) => panic!("oversized body should fail"),
467            Err(error) => error,
468        };
469        assert!(error.to_string().contains("exceeded max size"));
470    }
471}