1use anyhow::{Context, anyhow};
2use base64::prelude::*;
3use load_balancer::{LoadBalancer, interval::IntervalLoadBalancer};
4use reqwest::{Client, ClientBuilder, Response};
5use serde::{Deserialize, Serialize};
6use serde_json::json;
7use std::{net::IpAddr, sync::Arc, time::Duration};
8
9pub use load_balancer;
10pub use load_balancer::get_if_addrs;
11pub use load_balancer::ip::{get_ip_list, get_ipv4_list, get_ipv6_list};
12pub use reqwest;
13pub use reqwest::Proxy;
14pub use reqwest::header::HeaderMap;
15
16pub struct JitoClientBuilder {
18 url: Vec<String>,
19 broadcast: bool,
20 interval: Duration,
21 timeout: Option<Duration>,
22 proxy: Option<Proxy>,
23 headers: Option<HeaderMap>,
24 ip: Vec<IpAddr>,
25}
26
27impl JitoClientBuilder {
28 pub fn new() -> Self {
30 Self {
31 url: vec!["https://mainnet.block-engine.jito.wtf".to_string()],
32 broadcast: false,
33 interval: Duration::ZERO,
34 timeout: None,
35 proxy: None,
36 headers: None,
37 ip: Vec::new(),
38 }
39 }
40
41 pub fn url<T: IntoIterator<Item = impl AsRef<str>>>(mut self, url: T) -> Self {
43 self.url = url.into_iter().map(|v| v.as_ref().to_string()).collect();
44 self
45 }
46
47 pub fn interval(mut self, interval: Duration) -> Self {
50 self.interval = interval;
51 self
52 }
53
54 pub fn ip(mut self, ip: Vec<IpAddr>) -> Self {
56 self.ip = ip;
57 self
58 }
59
60 pub fn broadcast(mut self, broadcast: bool) -> Self {
62 self.broadcast = broadcast;
63 self
64 }
65
66 pub fn timeout(mut self, timeout: Duration) -> Self {
68 self.timeout = Some(timeout);
69 self
70 }
71
72 pub fn proxy(mut self, proxy: Proxy) -> Self {
74 self.proxy = Some(proxy);
75 self
76 }
77
78 pub fn headers(mut self, headers: HeaderMap) -> Self {
80 self.headers = Some(headers);
81 self
82 }
83
84 pub fn build(self) -> anyhow::Result<JitoClient> {
86 let default_ip = self.ip.is_empty();
87
88 let inner = if self.broadcast {
89 let mut entries = Vec::new();
90
91 if default_ip {
92 let mut cb = ClientBuilder::new();
93
94 if let Some(v) = self.timeout {
95 cb = cb.timeout(v);
96 }
97
98 if let Some(v) = self.proxy {
99 cb = cb.proxy(v);
100 }
101
102 if let Some(v) = self.headers {
103 cb = cb.default_headers(v);
104 }
105
106 entries.push((self.interval, Arc::new((self.url.clone(), cb.build()?))));
107 } else {
108 for ip in &self.ip {
109 let mut cb = ClientBuilder::new();
110
111 if let Some(v) = self.timeout {
112 cb = cb.timeout(v);
113 }
114
115 if let Some(v) = self.proxy.clone() {
116 cb = cb.proxy(v);
117 }
118
119 if let Some(v) = self.headers.clone() {
120 cb = cb.default_headers(v);
121 }
122
123 cb = cb.local_address(*ip);
124
125 entries.push((self.interval, Arc::new((self.url.clone(), cb.build()?))));
126 }
127 }
128
129 JitoClientRef {
130 broadcast: true,
131 lb: IntervalLoadBalancer::new(entries),
132 }
133 } else {
134 let mut entries = Vec::new();
135
136 if default_ip {
137 for url in &self.url {
138 let mut cb = ClientBuilder::new();
139
140 if let Some(v) = self.timeout {
141 cb = cb.timeout(v);
142 }
143
144 if let Some(v) = self.proxy.clone() {
145 cb = cb.proxy(v);
146 }
147
148 if let Some(v) = self.headers.clone() {
149 cb = cb.default_headers(v);
150 }
151
152 entries.push((self.interval, Arc::new((vec![url.clone()], cb.build()?))));
153 }
154 } else {
155 for url in &self.url {
156 for ip in &self.ip {
157 let mut cb = ClientBuilder::new();
158
159 if let Some(v) = self.timeout {
160 cb = cb.timeout(v);
161 }
162
163 if let Some(v) = self.proxy.clone() {
164 cb = cb.proxy(v);
165 }
166
167 if let Some(v) = self.headers.clone() {
168 cb = cb.default_headers(v);
169 }
170
171 cb = cb.local_address(*ip);
172
173 entries.push((self.interval, Arc::new((vec![url.clone()], cb.build()?))));
174 }
175 }
176 }
177
178 JitoClientRef {
179 broadcast: false,
180 lb: IntervalLoadBalancer::new(entries),
181 }
182 };
183
184 Ok(JitoClient {
185 inner: inner.into(),
186 })
187 }
188}
189
190struct JitoClientRef {
191 broadcast: bool,
192 lb: IntervalLoadBalancer<Arc<(Vec<String>, Client)>>,
193}
194
195#[derive(Clone)]
197pub struct JitoClient {
198 inner: Arc<JitoClientRef>,
199}
200
201impl JitoClient {
202 pub fn new() -> Self {
204 JitoClientBuilder::new().build().unwrap()
205 }
206
207 pub async fn send_transaction(&self, tx: impl Serialize) -> anyhow::Result<Response> {
209 let data = BASE64_STANDARD.encode(bincode::serialize(&tx)?);
210 let body = json!({
211 "id": 1,
212 "jsonrpc": "2.0",
213 "method": "sendTransaction",
214 "params": [
215 data, { "encoding": "base64" }
216 ]
217 });
218
219 let (ref url, ref client) = *self.inner.lb.alloc().await;
220
221 if self.inner.broadcast {
222 Ok(futures::future::select_ok(url.iter().map(|v| {
223 client
224 .post(&format!("{}/api/v1/transactions", v))
225 .query(&["bundleOnly", "true"])
226 .json(&body)
227 .send()
228 }))
229 .await?
230 .0)
231 } else {
232 Ok(client
233 .post(&format!("{}/api/v1/transactions", url[0]))
234 .query(&["bundleOnly", "true"])
235 .json(&body)
236 .send()
237 .await?)
238 }
239 }
240
241 pub async fn send_transaction_bid(&self, tx: impl Serialize) -> anyhow::Result<String> {
243 Ok(self
244 .send_transaction(tx)
245 .await?
246 .error_for_status()?
247 .headers()
248 .get("x-bundle-id")
249 .ok_or_else(|| anyhow!("missing `x-bundle-id` header"))?
250 .to_str()
251 .map_err(|v| anyhow!("invalid `x-bundle-id` header: {}", v))?
252 .to_string())
253 }
254
255 pub async fn send_transaction_no_bundle_only(
257 &self,
258 tx: impl Serialize,
259 ) -> anyhow::Result<Response> {
260 let data = BASE64_STANDARD.encode(bincode::serialize(&tx)?);
261 let body = json!({
262 "id": 1,
263 "jsonrpc": "2.0",
264 "method": "sendTransaction",
265 "params": [
266 data, { "encoding": "base64" }
267 ]
268 });
269
270 let (ref url, ref client) = *self.inner.lb.alloc().await;
271
272 if self.inner.broadcast {
273 Ok(futures::future::select_ok(url.iter().map(|v| {
274 client
275 .post(&format!("{}/api/v1/transactions", v))
276 .json(&body)
277 .send()
278 }))
279 .await?
280 .0)
281 } else {
282 Ok(client
283 .post(&format!("{}/api/v1/transactions", url[0]))
284 .json(&body)
285 .send()
286 .await?)
287 }
288 }
289
290 pub async fn send_bundle<T: IntoIterator<Item = impl Serialize>>(
292 &self,
293 tx: T,
294 ) -> anyhow::Result<Response> {
295 let data = tx
296 .into_iter()
297 .map(|tx| {
298 Ok(BASE64_STANDARD.encode(
299 bincode::serialize(&tx)
300 .map_err(|v| anyhow::anyhow!("failed to serialize tx: {}", v))?,
301 ))
302 })
303 .collect::<anyhow::Result<Vec<_>>>()?;
304
305 let body = json!({
306 "id": 1,
307 "jsonrpc": "2.0",
308 "method": "sendBundle",
309 "params": [ data, { "encoding": "base64" } ]
310 });
311
312 let (ref url, ref client) = *self.inner.lb.alloc().await;
313
314 if self.inner.broadcast {
315 Ok(futures::future::select_ok(url.iter().map(|v| {
316 client
317 .post(&format!("{}/api/v1/bundles", v))
318 .json(&body)
319 .send()
320 }))
321 .await?
322 .0)
323 } else {
324 Ok(client
325 .post(&format!("{}/api/v1/bundles", url[0]))
326 .json(&body)
327 .send()
328 .await?)
329 }
330 }
331
332 pub async fn send_bundle_bid<T: IntoIterator<Item = impl Serialize>>(
334 &self,
335 tx: T,
336 ) -> anyhow::Result<String> {
337 self.send_bundle(tx)
338 .await?
339 .error_for_status()?
340 .json::<serde_json::Value>()
341 .await?["result"]
342 .as_str()
343 .map(|v| v.to_string())
344 .ok_or_else(|| anyhow::anyhow!("missing bundle result"))
345 }
346
347 pub async fn send_transaction_lazy<T>(
349 &self,
350 tx: impl Future<Output = anyhow::Result<T>>,
351 ) -> anyhow::Result<Response>
352 where
353 T: Serialize,
354 {
355 let (ref url, ref client) = *self.inner.lb.alloc().await;
356
357 let data = BASE64_STANDARD.encode(bincode::serialize(&tx.await?)?);
358
359 let body = json!({
360 "id": 1,
361 "jsonrpc": "2.0",
362 "method": "sendTransaction",
363 "params": [
364 data, { "encoding": "base64" }
365 ]
366 });
367
368 if self.inner.broadcast {
369 Ok(futures::future::select_ok(url.iter().map(|v| {
370 client
371 .post(&format!("{}/api/v1/transactions", v))
372 .query(&["bundleOnly", "true"])
373 .json(&body)
374 .send()
375 }))
376 .await?
377 .0)
378 } else {
379 Ok(client
380 .post(&format!("{}/api/v1/transactions", url[0]))
381 .query(&["bundleOnly", "true"])
382 .json(&body)
383 .send()
384 .await?)
385 }
386 }
387
388 pub async fn send_transaction_bid_lazy<T>(
390 &self,
391 tx: impl Future<Output = anyhow::Result<T>>,
392 ) -> anyhow::Result<String>
393 where
394 T: Serialize,
395 {
396 Ok(self
397 .send_transaction_lazy(tx)
398 .await?
399 .error_for_status()?
400 .headers()
401 .get("x-bundle-id")
402 .ok_or_else(|| anyhow!("missing `x-bundle-id` header"))?
403 .to_str()
404 .map_err(|v| anyhow!("invalid `x-bundle-id` header: {}", v))?
405 .to_string())
406 }
407
408 pub async fn send_transaction_no_bundle_only_lazy<T>(
410 &self,
411 tx: impl Future<Output = anyhow::Result<T>>,
412 ) -> anyhow::Result<Response>
413 where
414 T: Serialize,
415 {
416 let (ref url, ref client) = *self.inner.lb.alloc().await;
417
418 let data = BASE64_STANDARD.encode(bincode::serialize(&tx.await?)?);
419
420 let body = json!({
421 "id": 1,
422 "jsonrpc": "2.0",
423 "method": "sendTransaction",
424 "params": [
425 data, { "encoding": "base64" }
426 ]
427 });
428
429 if self.inner.broadcast {
430 Ok(futures::future::select_ok(url.iter().map(|v| {
431 client
432 .post(&format!("{}/api/v1/transactions", v))
433 .json(&body)
434 .send()
435 }))
436 .await?
437 .0)
438 } else {
439 Ok(client
440 .post(&format!("{}/api/v1/transactions", url[0]))
441 .json(&body)
442 .send()
443 .await?)
444 }
445 }
446
447 pub async fn send_bundle_lazy<T, S>(
449 &self,
450 tx: impl Future<Output = anyhow::Result<T>>,
451 ) -> anyhow::Result<Response>
452 where
453 T: IntoIterator<Item = S>,
454 S: Serialize,
455 {
456 let (ref url, ref client) = *self.inner.lb.alloc().await;
457
458 let data = tx
459 .await?
460 .into_iter()
461 .map(|tx| {
462 Ok(BASE64_STANDARD.encode(
463 bincode::serialize(&tx)
464 .map_err(|v| anyhow::anyhow!("failed to serialize tx: {}", v))?,
465 ))
466 })
467 .collect::<anyhow::Result<Vec<_>>>()?;
468
469 let body = json!({
470 "id": 1,
471 "jsonrpc": "2.0",
472 "method": "sendBundle",
473 "params": [ data, { "encoding": "base64" } ]
474 });
475
476 if self.inner.broadcast {
477 Ok(futures::future::select_ok(url.iter().map(|v| {
478 client
479 .post(&format!("{}/api/v1/bundles", v))
480 .json(&body)
481 .send()
482 }))
483 .await?
484 .0)
485 } else {
486 Ok(client
487 .post(&format!("{}/api/v1/bundles", url[0]))
488 .json(&body)
489 .send()
490 .await?)
491 }
492 }
493
494 pub async fn send_bundle_bid_lazy<T, S>(
496 &self,
497 tx: impl Future<Output = anyhow::Result<T>>,
498 ) -> anyhow::Result<String>
499 where
500 T: IntoIterator<Item = S>,
501 S: Serialize,
502 {
503 self.send_bundle_lazy(tx)
504 .await?
505 .error_for_status()?
506 .json::<serde_json::Value>()
507 .await?["result"]
508 .as_str()
509 .map(|v| v.to_string())
510 .ok_or_else(|| anyhow::anyhow!("missing bundle result"))
511 }
512
513 #[cfg(rustc_version_1_85_0)]
515 pub async fn send_transaction_lazy_fn<F, T>(&self, callback: F) -> anyhow::Result<Response>
516 where
517 F: AsyncFnOnce(&Vec<String>, &Client) -> anyhow::Result<T>,
518 T: Serialize,
519 {
520 let (ref url, ref client) = *self.inner.lb.alloc().await;
521
522 let data = BASE64_STANDARD.encode(bincode::serialize(&callback(url, client).await?)?);
523
524 let body = json!({
525 "id": 1,
526 "jsonrpc": "2.0",
527 "method": "sendTransaction",
528 "params": [
529 data, { "encoding": "base64" }
530 ]
531 });
532
533 if self.inner.broadcast {
534 Ok(futures::future::select_ok(url.iter().map(|v| {
535 client
536 .post(&format!("{}/api/v1/transactions", v))
537 .query(&["bundleOnly", "true"])
538 .json(&body)
539 .send()
540 }))
541 .await?
542 .0)
543 } else {
544 Ok(client
545 .post(&format!("{}/api/v1/transactions", url[0]))
546 .query(&["bundleOnly", "true"])
547 .json(&body)
548 .send()
549 .await?)
550 }
551 }
552
553 #[cfg(rustc_version_1_85_0)]
555 pub async fn send_transaction_bid_lazy_fn<F, T>(&self, callback: F) -> anyhow::Result<String>
556 where
557 F: AsyncFnOnce(&Vec<String>, &Client) -> anyhow::Result<T>,
558 T: Serialize,
559 {
560 Ok(self
561 .send_transaction_lazy_fn(callback)
562 .await?
563 .error_for_status()?
564 .headers()
565 .get("x-bundle-id")
566 .ok_or_else(|| anyhow!("missing `x-bundle-id` header"))?
567 .to_str()
568 .map_err(|v| anyhow!("invalid `x-bundle-id` header: {}", v))?
569 .to_string())
570 }
571
572 #[cfg(rustc_version_1_85_0)]
574 pub async fn send_transaction_no_bundle_only_lazy_fn<F, T>(
575 &self,
576 callback: F,
577 ) -> anyhow::Result<Response>
578 where
579 F: AsyncFnOnce(&Vec<String>, &Client) -> anyhow::Result<T>,
580 T: Serialize,
581 {
582 let (ref url, ref client) = *self.inner.lb.alloc().await;
583
584 let data = BASE64_STANDARD.encode(bincode::serialize(&callback(url, client).await?)?);
585
586 let body = json!({
587 "id": 1,
588 "jsonrpc": "2.0",
589 "method": "sendTransaction",
590 "params": [
591 data, { "encoding": "base64" }
592 ]
593 });
594
595 if self.inner.broadcast {
596 Ok(futures::future::select_ok(url.iter().map(|v| {
597 client
598 .post(&format!("{}/api/v1/transactions", v))
599 .json(&body)
600 .send()
601 }))
602 .await?
603 .0)
604 } else {
605 Ok(client
606 .post(&format!("{}/api/v1/transactions", url[0]))
607 .json(&body)
608 .send()
609 .await?)
610 }
611 }
612
613 #[cfg(rustc_version_1_85_0)]
615 pub async fn send_bundle_lazy_fn<F, T, S>(&self, callback: F) -> anyhow::Result<Response>
616 where
617 F: AsyncFnOnce(&Vec<String>, &Client) -> anyhow::Result<T>,
618 T: IntoIterator<Item = S>,
619 S: Serialize,
620 {
621 let (ref url, ref client) = *self.inner.lb.alloc().await;
622
623 let data = callback(url, client)
624 .await?
625 .into_iter()
626 .map(|tx| {
627 Ok(BASE64_STANDARD.encode(
628 bincode::serialize(&tx)
629 .map_err(|v| anyhow::anyhow!("failed to serialize tx: {}", v))?,
630 ))
631 })
632 .collect::<anyhow::Result<Vec<_>>>()?;
633
634 let body = json!({
635 "id": 1,
636 "jsonrpc": "2.0",
637 "method": "sendBundle",
638 "params": [ data, { "encoding": "base64" } ]
639 });
640
641 if self.inner.broadcast {
642 Ok(futures::future::select_ok(url.iter().map(|v| {
643 client
644 .post(&format!("{}/api/v1/bundles", v))
645 .json(&body)
646 .send()
647 }))
648 .await?
649 .0)
650 } else {
651 Ok(client
652 .post(&format!("{}/api/v1/bundles", url[0]))
653 .json(&body)
654 .send()
655 .await?)
656 }
657 }
658
659 #[cfg(rustc_version_1_85_0)]
661 pub async fn send_bundle_bid_lazy_fn<F, T, S>(&self, callback: F) -> anyhow::Result<String>
662 where
663 F: AsyncFnOnce(&Vec<String>, &Client) -> anyhow::Result<T>,
664 T: IntoIterator<Item = S>,
665 S: Serialize,
666 {
667 self.send_bundle_lazy_fn(callback)
668 .await?
669 .error_for_status()?
670 .json::<serde_json::Value>()
671 .await?["result"]
672 .as_str()
673 .map(|v| v.to_string())
674 .ok_or_else(|| anyhow::anyhow!("missing bundle result"))
675 }
676}
677
678#[derive(Debug, Clone, Deserialize)]
680pub struct JitoTip {
681 pub landed_tips_25th_percentile: f64,
682 pub landed_tips_50th_percentile: f64,
683 pub landed_tips_75th_percentile: f64,
684 pub landed_tips_95th_percentile: f64,
685 pub landed_tips_99th_percentile: f64,
686 pub ema_landed_tips_50th_percentile: f64,
687}
688
689pub async fn get_jito_tip(client: Client) -> anyhow::Result<JitoTip> {
691 Ok(client
692 .get("https://bundles.jito.wtf/api/v1/bundles/tip_floor")
693 .send()
694 .await?
695 .json::<Vec<JitoTip>>()
696 .await?
697 .get(0)
698 .context("get_jito_tip: empty response")?
699 .clone())
700}
701
702#[derive(Debug, Deserialize)]
704pub struct BundleResult {
705 pub context: serde_json::Value,
706 pub value: Option<Vec<BundleStatus>>,
707}
708
709#[derive(Debug, Deserialize)]
710pub struct BundleStatus {
711 pub bundle_id: String,
712 pub transactions: Option<Vec<String>>,
713 pub slot: Option<u64>,
714 pub confirmation_status: Option<String>,
715 pub err: Option<serde_json::Value>,
716}
717
718pub async fn get_bundle_statuses<T: IntoIterator<Item = impl AsRef<str>>>(
720 client: Client,
721 bundle: T,
722) -> anyhow::Result<BundleResult> {
723 #[derive(Debug, Deserialize)]
724 struct RpcResponse {
725 result: BundleResult,
726 }
727
728 let payload = json!({
729 "jsonrpc": "2.0",
730 "id": 1,
731 "method": "getBundleStatuses",
732 "params": [bundle.into_iter().map(|v| v.as_ref().to_string()).collect::<Vec<_>>()],
733 });
734
735 Ok(client
736 .post("https://mainnet.block-engine.jito.wtf/api/v1/getBundleStatuses")
737 .json(&payload)
738 .send()
739 .await?
740 .json::<RpcResponse>()
741 .await?
742 .result)
743}
744
745#[derive(Debug, Deserialize)]
747pub struct InflightBundleStatus {
748 pub bundle_id: String,
749 pub status: String,
750 pub landed_slot: Option<u64>,
751}
752
753#[derive(Debug, Deserialize)]
754pub struct InflightBundleResult {
755 pub context: serde_json::Value,
756 pub value: Option<Vec<InflightBundleStatus>>,
757}
758
759pub async fn get_inflight_bundle_statuses<T: IntoIterator<Item = impl AsRef<str>>>(
761 client: Client,
762 bundle: T,
763) -> anyhow::Result<InflightBundleResult> {
764 #[derive(Debug, Deserialize)]
765 struct InflightRpcResponse {
766 result: InflightBundleResult,
767 }
768
769 let payload = json!({
770 "jsonrpc": "2.0",
771 "id": 1,
772 "method": "getInflightBundleStatuses",
773 "params": [bundle.into_iter().map(|v| v.as_ref().to_string()).collect::<Vec<_>>()],
774 });
775
776 Ok(client
777 .post("https://mainnet.block-engine.jito.wtf/api/v1/getInflightBundleStatuses")
778 .json(&payload)
779 .send()
780 .await?
781 .json::<InflightRpcResponse>()
782 .await?
783 .result)
784}
785
786pub async fn test_ip(ip: IpAddr) -> anyhow::Result<IpAddr> {
787 reqwest::ClientBuilder::new()
788 .timeout(Duration::from_secs(3))
789 .local_address(ip)
790 .build()?
791 .get("https://crates.io")
792 .send()
793 .await?;
794
795 Ok(ip)
796}
797
798pub async fn test_all_ip() -> Vec<anyhow::Result<IpAddr>> {
799 match get_ip_list() {
800 Ok(v) => futures::future::join_all(v.into_iter().map(|v| test_ip(v))).await,
801 Err(_) => Vec::new(),
802 }
803}
804
805pub async fn test_all_ipv4() -> Vec<anyhow::Result<IpAddr>> {
806 match get_ipv4_list() {
807 Ok(v) => futures::future::join_all(v.into_iter().map(|v| test_ip(v))).await,
808 Err(_) => Vec::new(),
809 }
810}
811
812pub async fn test_all_ipv6() -> Vec<anyhow::Result<IpAddr>> {
813 match get_ipv6_list() {
814 Ok(v) => futures::future::join_all(v.into_iter().map(|v| test_ip(v))).await,
815 Err(_) => Vec::new(),
816 }
817}