1use anyhow::{Context, anyhow, bail};
2use base64::prelude::*;
3use load_balancer::{LoadBalancer, interval::IntervalLoadBalancer};
4use reqwest::{Client, ClientBuilder, Proxy, Response, header::HeaderMap};
5use serde::{Deserialize, Serialize};
6use serde_json::json;
7use std::{net::IpAddr, sync::Arc, time::Duration};
8
9pub use load_balancer;
10pub use reqwest;
11
12pub struct JitoClientBuilder {
14 url: Vec<String>,
15 rate: u64,
16 multi_ipv4: bool,
17 multi_ipv6: bool,
18 multi_ip: bool,
19 broadcast: bool,
20 timeout: Option<Duration>,
21 proxy: Option<Proxy>,
22 headers: Option<HeaderMap>,
23 ip: Vec<IpAddr>,
24}
25
26impl JitoClientBuilder {
27 pub fn new() -> Self {
29 Self {
30 url: vec!["https://mainnet.block-engine.jito.wtf".to_string()],
31 rate: 0,
32 multi_ipv4: false,
33 multi_ipv6: false,
34 multi_ip: false,
35 broadcast: false,
36 timeout: None,
37 proxy: None,
38 headers: None,
39 ip: Vec::new(),
40 }
41 }
42
43 pub fn url<T: IntoIterator<Item = impl AsRef<str>>>(mut self, url: T) -> Self {
45 self.url = url.into_iter().map(|v| v.as_ref().to_string()).collect();
46 self
47 }
48
49 pub fn rate(mut self, rate: u64) -> Self {
51 self.rate = rate;
52 self
53 }
54
55 pub fn multi_ipv4(mut self, mutil_ip: bool) -> Self {
57 self.multi_ipv4 = mutil_ip;
58 self
59 }
60
61 pub fn multi_ipv6(mut self, mutil_ip: bool) -> Self {
63 self.multi_ipv6 = mutil_ip;
64 self
65 }
66
67 pub fn multi_ip(mut self, mutil_ip: bool) -> Self {
69 self.multi_ip = mutil_ip;
70 self
71 }
72
73 pub fn with_ip(mut self, ip: Vec<IpAddr>) -> Self {
75 self.ip = ip;
76 self
77 }
78
79 pub fn broadcast(mut self, broadcast: bool) -> Self {
81 self.broadcast = broadcast;
82 self
83 }
84
85 pub fn timeout(mut self, timeout: Duration) -> Self {
87 self.timeout = Some(timeout);
88 self
89 }
90
91 pub fn proxy(mut self, proxy: Proxy) -> Self {
93 self.proxy = Some(proxy);
94 self
95 }
96
97 pub fn headers(mut self, headers: HeaderMap) -> Self {
99 self.headers = Some(headers);
100 self
101 }
102
103 pub fn build(mut self) -> anyhow::Result<JitoClient> {
105 let interval = if self.rate == 0 {
106 Duration::ZERO
107 } else {
108 Duration::from_millis(1000).div_f64(self.rate as f64)
109 };
110
111 if self.multi_ipv4 {
112 self.ip.extend(load_balancer::ip::get_ipv4_list());
113 }
114
115 if self.multi_ipv6 {
116 self.ip.extend(load_balancer::ip::get_ipv6_list());
117 }
118
119 if self.multi_ip {
120 if self.multi_ipv4 || self.multi_ipv6 {
121 bail!("`mutil_ip` is mutually exclusive with `mutil_ipv4`/`mutil_ipv6`");
122 }
123
124 self.ip.extend(load_balancer::ip::get_ip_list());
125 }
126
127 let local = !self.multi_ipv4 && !self.multi_ipv6 && !self.multi_ip;
128
129 if !local {
130 if self.ip.is_empty() {
131 bail!("local ip is empty");
132 }
133 }
134
135 let inner = if self.broadcast {
136 let mut entries = Vec::new();
137
138 if local {
139 let mut cb = ClientBuilder::new();
140
141 if let Some(v) = self.timeout {
142 cb = cb.timeout(v);
143 }
144
145 if let Some(v) = self.proxy {
146 cb = cb.proxy(v);
147 }
148
149 if let Some(v) = self.headers {
150 cb = cb.default_headers(v);
151 }
152
153 entries.push((interval, Arc::new((self.url.clone(), cb.build()?))));
154 } else {
155 for ip in &self.ip {
156 let mut cb = ClientBuilder::new();
157
158 if let Some(v) = self.timeout {
159 cb = cb.timeout(v);
160 }
161
162 if let Some(v) = self.proxy.clone() {
163 cb = cb.proxy(v);
164 }
165
166 if let Some(v) = self.headers.clone() {
167 cb = cb.default_headers(v);
168 }
169
170 cb = cb.local_address(*ip);
171
172 entries.push((interval, Arc::new((self.url.clone(), cb.build()?))));
173 }
174 }
175
176 JitoClientRef::Broadcast(IntervalLoadBalancer::new(entries))
177 } else {
178 let mut entries = Vec::new();
179
180 if local {
181 let mut cb = ClientBuilder::new();
182
183 if let Some(v) = self.timeout {
184 cb = cb.timeout(v);
185 }
186
187 if let Some(v) = self.proxy {
188 cb = cb.proxy(v);
189 }
190
191 if let Some(v) = self.headers {
192 cb = cb.default_headers(v);
193 }
194
195 entries.push((interval, Arc::new((self.url[0].clone(), cb.build()?))));
196 } else {
197 for url in &self.url {
198 for ip in &self.ip {
199 let mut cb = ClientBuilder::new();
200
201 if let Some(v) = self.timeout {
202 cb = cb.timeout(v);
203 }
204
205 if let Some(v) = self.proxy.clone() {
206 cb = cb.proxy(v);
207 }
208
209 if let Some(v) = self.headers.clone() {
210 cb = cb.default_headers(v);
211 }
212
213 cb = cb.local_address(*ip);
214
215 entries.push((interval, Arc::new((url.clone(), cb.build()?))));
216 }
217 }
218 }
219
220 JitoClientRef::LB(IntervalLoadBalancer::new(entries))
221 };
222
223 Ok(JitoClient {
224 inner: inner.into(),
225 })
226 }
227}
228
229enum JitoClientRef {
230 Broadcast(IntervalLoadBalancer<Arc<(Vec<String>, Client)>>),
231 LB(IntervalLoadBalancer<Arc<(String, Client)>>),
232}
233
234#[derive(Clone)]
236pub struct JitoClient {
237 inner: Arc<JitoClientRef>,
238}
239
240impl JitoClient {
241 pub fn new() -> Self {
243 JitoClientBuilder::new().build().unwrap()
244 }
245
246 pub async fn send_transaction(&self, tx: impl Serialize) -> anyhow::Result<Response> {
248 let data = BASE64_STANDARD.encode(bincode::serialize(&tx)?);
249 let body = json!({
250 "id": 1,
251 "jsonrpc": "2.0",
252 "method": "sendTransaction",
253 "params": [
254 data, { "encoding": "base64" }
255 ]
256 });
257
258 match *self.inner {
259 JitoClientRef::Broadcast(ref v) => {
260 let (ref url, ref client) = *v.alloc().await;
261
262 Ok(futures::future::select_ok(url.iter().map(|v| {
263 client
264 .post(v)
265 .query(&["bundleOnly", "true"])
266 .json(&body)
267 .send()
268 }))
269 .await?
270 .0)
271 }
272 JitoClientRef::LB(ref v) => {
273 let (ref url, ref client) = *v.alloc().await;
274
275 Ok(client
276 .post(url)
277 .query(&["bundleOnly", "true"])
278 .json(&body)
279 .send()
280 .await?)
281 }
282 }
283 }
284
285 pub async fn send_transaction_bid(&self, tx: impl Serialize) -> anyhow::Result<String> {
287 Ok(self
288 .send_transaction(tx)
289 .await?
290 .headers()
291 .get("x-bundle-id")
292 .ok_or_else(|| anyhow!("missing `x-bundle-id` header"))?
293 .to_str()
294 .map_err(|e| anyhow!("invalid `x-bundle-id` header: {}", e))?
295 .to_string())
296 }
297
298 pub async fn send_transaction_no_bundle_only(
300 &self,
301 tx: impl Serialize,
302 ) -> anyhow::Result<Response> {
303 let data = BASE64_STANDARD.encode(bincode::serialize(&tx)?);
304 let body = json!({
305 "id": 1,
306 "jsonrpc": "2.0",
307 "method": "sendTransaction",
308 "params": [
309 data, { "encoding": "base64" }
310 ]
311 });
312
313 match *self.inner {
314 JitoClientRef::Broadcast(ref v) => {
315 let (ref url, ref client) = *v.alloc().await;
316
317 Ok(futures::future::select_ok(
318 url.iter().map(|v| client.post(v).json(&body).send()),
319 )
320 .await?
321 .0)
322 }
323 JitoClientRef::LB(ref v) => {
324 let (ref url, ref client) = *v.alloc().await;
325
326 Ok(client.post(url).json(&body).send().await?)
327 }
328 }
329 }
330
331 pub async fn send_bundle<T: IntoIterator<Item = impl Serialize>>(
333 &self,
334 tx: T,
335 ) -> anyhow::Result<Response> {
336 let data = tx
337 .into_iter()
338 .map(|tx| {
339 let raw = bincode::serialize(&tx)
340 .map_err(|e| anyhow::anyhow!("failed to serialize tx: {}", e))?;
341 Ok(BASE64_STANDARD.encode(raw))
342 })
343 .collect::<anyhow::Result<Vec<_>>>()?;
344
345 let body = json!({
346 "id": 1,
347 "jsonrpc": "2.0",
348 "method": "sendBundle",
349 "params": [ data, { "encoding": "base64" } ]
350 });
351
352 match *self.inner {
353 JitoClientRef::Broadcast(ref v) => {
354 let (ref urls, ref client) = *v.alloc().await;
355
356 Ok(futures::future::select_ok(urls.iter().map(|u| {
357 client
358 .post(&format!("{}/api/v1/bundles", u))
359 .json(&body)
360 .send()
361 }))
362 .await?
363 .0)
364 }
365 JitoClientRef::LB(ref v) => {
366 let (ref url, ref client) = *v.alloc().await;
367
368 Ok(client
369 .post(&format!("{}/api/v1/bundles", url))
370 .json(&body)
371 .send()
372 .await?)
373 }
374 }
375 }
376
377 pub async fn send_bundle_bid<T: IntoIterator<Item = impl Serialize>>(
379 &self,
380 tx: T,
381 ) -> anyhow::Result<String> {
382 self.send_bundle(tx)
383 .await?
384 .json::<serde_json::Value>()
385 .await?["result"]
386 .as_str()
387 .map(|s| s.to_string())
388 .ok_or_else(|| anyhow::anyhow!("missing bundle result"))
389 }
390}
391
392#[derive(Debug, Clone, Deserialize)]
394pub struct JitoTip {
395 pub landed_tips_25th_percentile: f64,
396 pub landed_tips_50th_percentile: f64,
397 pub landed_tips_75th_percentile: f64,
398 pub landed_tips_95th_percentile: f64,
399 pub landed_tips_99th_percentile: f64,
400 pub ema_landed_tips_50th_percentile: f64,
401}
402
403pub async fn get_jito_tip(client: Client) -> anyhow::Result<JitoTip> {
405 Ok(client
406 .get("https://bundles.jito.wtf/api/v1/bundles/tip_floor")
407 .send()
408 .await?
409 .json::<Vec<JitoTip>>()
410 .await?
411 .get(0)
412 .context("get_jito_tip: empty response")?
413 .clone())
414}
415
416#[derive(Debug, Deserialize)]
418pub struct BundleResult {
419 pub context: serde_json::Value,
420 pub value: Option<Vec<BundleStatus>>,
421}
422
423#[derive(Debug, Deserialize)]
424pub struct BundleStatus {
425 pub bundle_id: String,
426 pub transactions: Option<Vec<String>>,
427 pub slot: Option<u64>,
428 pub confirmation_status: Option<String>,
429 pub err: Option<serde_json::Value>,
430}
431
432pub async fn get_bundle_statuses<T: IntoIterator<Item = impl AsRef<str>>>(
434 client: Client,
435 bundle: T,
436) -> anyhow::Result<BundleResult> {
437 #[derive(Debug, Deserialize)]
438 struct RpcResponse {
439 result: BundleResult,
440 }
441
442 let payload = json!({
443 "jsonrpc": "2.0",
444 "id": 1,
445 "method": "getBundleStatuses",
446 "params": [bundle.into_iter().map(|v| v.as_ref().to_string()).collect::<Vec<_>>()],
447 });
448
449 Ok(client
450 .post("https://mainnet.block-engine.jito.wtf/api/v1/getBundleStatuses")
451 .json(&payload)
452 .send()
453 .await?
454 .json::<RpcResponse>()
455 .await?
456 .result)
457}
458
459#[derive(Debug, Deserialize)]
461pub struct InflightBundleStatus {
462 pub bundle_id: String,
463 pub status: String,
464 pub landed_slot: Option<u64>,
465}
466
467#[derive(Debug, Deserialize)]
468pub struct InflightBundleResult {
469 pub context: serde_json::Value,
470 pub value: Option<Vec<InflightBundleStatus>>,
471}
472
473pub async fn get_inflight_bundle_statuses<T: IntoIterator<Item = impl AsRef<str>>>(
475 client: Client,
476 bundle: T,
477) -> anyhow::Result<InflightBundleResult> {
478 #[derive(Debug, Deserialize)]
479 struct InflightRpcResponse {
480 result: InflightBundleResult,
481 }
482
483 let payload = json!({
484 "jsonrpc": "2.0",
485 "id": 1,
486 "method": "getInflightBundleStatuses",
487 "params": [bundle.into_iter().map(|v| v.as_ref().to_string()).collect::<Vec<_>>()],
488 });
489
490 Ok(client
491 .post("https://mainnet.block-engine.jito.wtf/api/v1/getInflightBundleStatuses")
492 .json(&payload)
493 .send()
494 .await?
495 .json::<InflightRpcResponse>()
496 .await?
497 .result)
498}