1use std::time::Duration;
4
5use async_trait::async_trait;
6use base64::{Engine as _, engine::general_purpose::STANDARD as BASE64_STANDARD};
7use reqwest::{Url, redirect::Policy};
8use serde::{Deserialize, Serialize};
9use serde_json::from_slice as json_from_slice;
10use sof_support::time_support::nonzero_duration_or;
11
12use super::{JitoSubmitConfig, JitoSubmitResponse, JitoSubmitTransport, SubmitTransportError};
13
14const DEFAULT_JITO_BLOCK_ENGINE_URL: &str = "https://mainnet.block-engine.jito.wtf";
16const MAX_JITO_SUBMIT_RESPONSE_BYTES: usize = 64 * 1024;
18const DEFAULT_JITO_REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
20
21#[derive(Debug, Clone, Copy, Eq, PartialEq)]
23pub enum JitoBlockEngineRegion {
24 Amsterdam,
26 Dublin,
28 Frankfurt,
30 London,
32 NewYork,
34 SaltLakeCity,
36 Singapore,
38 Tokyo,
40}
41
42#[derive(Debug, Clone, Eq, PartialEq)]
44pub enum JitoBlockEngineEndpoint {
45 Mainnet,
47 MainnetRegion(JitoBlockEngineRegion),
49 Custom(Url),
51}
52
53impl JitoBlockEngineEndpoint {
54 #[must_use]
56 pub const fn mainnet() -> Self {
57 Self::Mainnet
58 }
59
60 #[must_use]
62 pub const fn mainnet_region(region: JitoBlockEngineRegion) -> Self {
63 Self::MainnetRegion(region)
64 }
65
66 #[must_use]
68 pub const fn custom(url: Url) -> Self {
69 Self::Custom(url)
70 }
71
72 #[must_use]
74 pub fn as_url(&self) -> &str {
75 match self {
76 Self::Mainnet => DEFAULT_JITO_BLOCK_ENGINE_URL,
77 Self::MainnetRegion(region) => match region {
78 JitoBlockEngineRegion::Amsterdam => {
79 "https://amsterdam.mainnet.block-engine.jito.wtf"
80 }
81 JitoBlockEngineRegion::Dublin => "https://dublin.mainnet.block-engine.jito.wtf",
82 JitoBlockEngineRegion::Frankfurt => {
83 "https://frankfurt.mainnet.block-engine.jito.wtf"
84 }
85 JitoBlockEngineRegion::London => "https://london.mainnet.block-engine.jito.wtf",
86 JitoBlockEngineRegion::NewYork => "https://ny.mainnet.block-engine.jito.wtf",
87 JitoBlockEngineRegion::SaltLakeCity => "https://slc.mainnet.block-engine.jito.wtf",
88 JitoBlockEngineRegion::Singapore => {
89 "https://singapore.mainnet.block-engine.jito.wtf"
90 }
91 JitoBlockEngineRegion::Tokyo => "https://tokyo.mainnet.block-engine.jito.wtf",
92 },
93 Self::Custom(url) => url.as_str(),
94 }
95 }
96}
97
98impl Default for JitoBlockEngineEndpoint {
99 fn default() -> Self {
100 Self::mainnet()
101 }
102}
103
104#[derive(Debug, Clone, Eq, PartialEq)]
106pub struct JitoTransportConfig {
107 pub endpoint: JitoBlockEngineEndpoint,
109 pub request_timeout: Duration,
111}
112
113impl Default for JitoTransportConfig {
114 fn default() -> Self {
115 Self {
116 endpoint: JitoBlockEngineEndpoint::default(),
117 request_timeout: DEFAULT_JITO_REQUEST_TIMEOUT,
118 }
119 }
120}
121
122#[derive(Debug, Clone)]
124pub struct JitoJsonRpcTransport {
125 client: reqwest::Client,
127 transport_config: JitoTransportConfig,
129}
130
131impl JitoJsonRpcTransport {
132 pub fn new() -> Result<Self, SubmitTransportError> {
138 Self::with_config(JitoTransportConfig::default())
139 }
140
141 pub fn with_endpoint(endpoint: JitoBlockEngineEndpoint) -> Result<Self, SubmitTransportError> {
147 Self::with_config(JitoTransportConfig {
148 endpoint,
149 ..JitoTransportConfig::default()
150 })
151 }
152
153 pub fn with_config(
159 transport_config: JitoTransportConfig,
160 ) -> Result<Self, SubmitTransportError> {
161 let request_timeout = nonzero_duration_or(
162 transport_config.request_timeout,
163 DEFAULT_JITO_REQUEST_TIMEOUT,
164 );
165 let client = reqwest::Client::builder()
166 .redirect(Policy::none())
167 .connect_timeout(request_timeout)
168 .timeout(request_timeout)
169 .build()
170 .map_err(|error| SubmitTransportError::Config {
171 message: error.to_string(),
172 })?;
173 Ok(Self {
174 client,
175 transport_config,
176 })
177 }
178
179 fn request_url(&self, config: &JitoSubmitConfig) -> String {
181 let mut url = self
182 .transport_config
183 .endpoint
184 .as_url()
185 .trim_end_matches('/')
186 .to_owned();
187 url.push_str("/api/v1/transactions");
188 if config.bundle_only {
189 url.push_str("?bundleOnly=true");
190 }
191 url
192 }
193}
194
195#[derive(Debug, Deserialize)]
197struct JsonRpcResponse {
198 result: Option<String>,
200 error: Option<JsonRpcError>,
202}
203
204#[derive(Debug, Deserialize)]
206struct JsonRpcError {
207 code: i64,
209 message: String,
211}
212
213#[async_trait]
214impl JitoSubmitTransport for JitoJsonRpcTransport {
215 async fn submit_jito(
216 &self,
217 tx_bytes: &[u8],
218 config: &JitoSubmitConfig,
219 ) -> Result<JitoSubmitResponse, SubmitTransportError> {
220 #[derive(Debug, Serialize)]
221 struct JitoRpcConfig<'config> {
222 encoding: &'config str,
224 }
225
226 let encoded_tx = BASE64_STANDARD.encode(tx_bytes);
227 let payload = serde_json::json!({
228 "jsonrpc": "2.0",
229 "id": 1,
230 "method": "sendTransaction",
231 "params": [
232 encoded_tx,
233 JitoRpcConfig { encoding: "base64" }
234 ]
235 });
236
237 let response = self
238 .client
239 .post(self.request_url(config))
240 .json(&payload)
241 .send()
242 .await
243 .map_err(|error| SubmitTransportError::Failure {
244 message: error.to_string(),
245 })?;
246 if response.status().is_redirection() {
247 return Err(SubmitTransportError::Failure {
248 message: format!("unexpected redirect response: {}", response.status()),
249 });
250 }
251
252 let response =
253 response
254 .error_for_status()
255 .map_err(|error| SubmitTransportError::Failure {
256 message: error.to_string(),
257 })?;
258
259 let response_body = read_http_response_bytes_bounded(response).await?;
260 let parsed: JsonRpcResponse =
261 json_from_slice(&response_body).map_err(|error| SubmitTransportError::Failure {
262 message: error.to_string(),
263 })?;
264
265 if let Some(signature) = parsed.result {
266 return Ok(JitoSubmitResponse {
267 transaction_signature: Some(signature),
268 bundle_id: None,
269 });
270 }
271 if let Some(error) = parsed.error {
272 return Err(SubmitTransportError::Failure {
273 message: format!("jito error {}: {}", error.code, error.message),
274 });
275 }
276
277 Err(SubmitTransportError::Failure {
278 message: "jito returned neither result nor error".to_owned(),
279 })
280 }
281}
282
283async fn read_http_response_bytes_bounded(
285 mut response: reqwest::Response,
286) -> Result<Vec<u8>, SubmitTransportError> {
287 if response
288 .content_length()
289 .is_some_and(|content_length| content_length > MAX_JITO_SUBMIT_RESPONSE_BYTES as u64)
290 {
291 return Err(SubmitTransportError::Failure {
292 message: format!(
293 "response body exceeded max size of {MAX_JITO_SUBMIT_RESPONSE_BYTES} bytes"
294 ),
295 });
296 }
297
298 let initial_capacity = response
299 .content_length()
300 .and_then(|content_length| usize::try_from(content_length).ok())
301 .unwrap_or(0)
302 .min(MAX_JITO_SUBMIT_RESPONSE_BYTES);
303 let mut body = Vec::with_capacity(initial_capacity);
304 while let Some(chunk) =
305 response
306 .chunk()
307 .await
308 .map_err(|error| SubmitTransportError::Failure {
309 message: error.to_string(),
310 })?
311 {
312 let remaining = MAX_JITO_SUBMIT_RESPONSE_BYTES.saturating_sub(body.len());
313 if chunk.len() > remaining {
314 return Err(SubmitTransportError::Failure {
315 message: format!(
316 "response body exceeded max size of {MAX_JITO_SUBMIT_RESPONSE_BYTES} bytes"
317 ),
318 });
319 }
320 body.extend_from_slice(&chunk);
321 }
322 Ok(body)
323}
324
325#[cfg(test)]
326#[allow(clippy::indexing_slicing, clippy::panic)]
327mod tests {
328 use super::*;
329 use tokio::{
330 io::{AsyncReadExt, AsyncWriteExt},
331 net::TcpListener,
332 };
333
334 async fn spawn_http_response_server(response: String) -> String {
335 let listener = TcpListener::bind("127.0.0.1:0").await;
336 assert!(listener.is_ok());
337 let listener = listener.unwrap_or_else(|error| panic!("{error}"));
338 let addr = listener.local_addr();
339 assert!(addr.is_ok());
340 let addr = addr.unwrap_or_else(|error| panic!("{error}"));
341 tokio::spawn(async move {
342 let accepted = listener.accept().await;
343 assert!(accepted.is_ok());
344 let (mut stream, _) = accepted.unwrap_or_else(|error| panic!("{error}"));
345 let mut buffer = [0_u8; 4096];
346 let read = stream.read(&mut buffer).await;
347 assert!(read.is_ok());
348 let write = stream.write_all(response.as_bytes()).await;
349 assert!(write.is_ok());
350 });
351 format!("http://{addr}")
352 }
353
354 #[test]
355 fn request_url_uses_transactions_path() {
356 let transport_result = JitoJsonRpcTransport::new();
357 assert!(transport_result.is_ok());
358 let Some(transport) = transport_result.ok() else {
359 return;
360 };
361
362 let url = transport.request_url(&JitoSubmitConfig::default());
363
364 assert_eq!(
365 url,
366 "https://mainnet.block-engine.jito.wtf/api/v1/transactions"
367 );
368 }
369
370 #[test]
371 fn request_url_appends_bundle_only_query() {
372 let parsed_url_result = Url::parse("https://mainnet.block-engine.jito.wtf/");
373 assert!(parsed_url_result.is_ok());
374 let Some(parsed_url) = parsed_url_result.ok() else {
375 return;
376 };
377 let transport_result =
378 JitoJsonRpcTransport::with_endpoint(JitoBlockEngineEndpoint::custom(parsed_url));
379 assert!(transport_result.is_ok());
380 let Some(transport) = transport_result.ok() else {
381 return;
382 };
383
384 let url = transport.request_url(&JitoSubmitConfig { bundle_only: true });
385
386 assert_eq!(
387 url,
388 "https://mainnet.block-engine.jito.wtf/api/v1/transactions?bundleOnly=true"
389 );
390 }
391
392 #[test]
393 fn transport_config_defaults_are_stable() {
394 let config = JitoTransportConfig::default();
395
396 assert_eq!(config.endpoint, JitoBlockEngineEndpoint::mainnet());
397 assert_eq!(config.request_timeout, Duration::from_secs(10));
398 }
399
400 #[test]
401 fn transport_accepts_zero_timeout_config() {
402 let transport = JitoJsonRpcTransport::with_config(JitoTransportConfig {
403 endpoint: JitoBlockEngineEndpoint::default(),
404 request_timeout: Duration::ZERO,
405 });
406 assert!(transport.is_ok());
407 }
408
409 #[test]
410 fn regional_endpoint_uses_documented_slug() {
411 let endpoint = JitoBlockEngineEndpoint::mainnet_region(JitoBlockEngineRegion::Frankfurt);
412
413 assert_eq!(
414 endpoint.as_url(),
415 "https://frankfurt.mainnet.block-engine.jito.wtf"
416 );
417 }
418
419 #[tokio::test]
420 async fn jito_transport_rejects_redirects() {
421 let parsed_url = Url::parse(
422 &spawn_http_response_server(
423 "HTTP/1.1 307 Temporary Redirect\r\nlocation: http://127.0.0.1/\r\ncontent-length: 0\r\nconnection: close\r\n\r\n"
424 .to_owned(),
425 )
426 .await,
427 );
428 assert!(parsed_url.is_ok());
429 let parsed_url = parsed_url.unwrap_or_else(|error| panic!("{error}"));
430 let transport =
431 JitoJsonRpcTransport::with_endpoint(JitoBlockEngineEndpoint::custom(parsed_url));
432 assert!(transport.is_ok());
433 let transport = transport.unwrap_or_else(|error| panic!("{error}"));
434
435 let error = transport
436 .submit_jito(&[1, 2, 3], &JitoSubmitConfig::default())
437 .await;
438 assert!(error.is_err());
439 let error = match error {
440 Ok(_response) => panic!("redirect should fail"),
441 Err(error) => error,
442 };
443 assert!(error.to_string().contains("redirect"));
444 }
445
446 #[tokio::test]
447 async fn jito_transport_rejects_oversized_responses() {
448 let endpoint = spawn_http_response_server(format!(
449 "HTTP/1.1 200 OK\r\ncontent-type: application/json\r\ncontent-length: {}\r\nconnection: close\r\n\r\n",
450 MAX_JITO_SUBMIT_RESPONSE_BYTES.saturating_add(1)
451 ))
452 .await;
453 let parsed_url = Url::parse(&endpoint);
454 assert!(parsed_url.is_ok());
455 let parsed_url = parsed_url.unwrap_or_else(|error| panic!("{error}"));
456 let transport =
457 JitoJsonRpcTransport::with_endpoint(JitoBlockEngineEndpoint::custom(parsed_url));
458 assert!(transport.is_ok());
459 let transport = transport.unwrap_or_else(|error| panic!("{error}"));
460
461 let error = transport
462 .submit_jito(&[1, 2, 3], &JitoSubmitConfig::default())
463 .await;
464 assert!(error.is_err());
465 let error = match error {
466 Ok(_response) => panic!("oversized body should fail"),
467 Err(error) => error,
468 };
469 assert!(error.to_string().contains("exceeded max size"));
470 }
471}