1use anyhow::{Context, anyhow};
2use base64::prelude::*;
3use futures::future::{join_all, select_ok};
4use load_balancer::{LoadBalancer, interval::IntervalLoadBalancer};
5use reqwest::{Client, ClientBuilder, Response, StatusCode};
6use serde::{Deserialize, Serialize};
7use serde_json::json;
8use std::{net::IpAddr, sync::Arc, time::Duration};
9use tokio::spawn;
10use tokio::sync::Semaphore;
11
12pub use get_if_addrs::get_if_addrs;
13pub use load_balancer;
14pub use reqwest;
15pub use reqwest::Proxy;
16pub use reqwest::header::HeaderMap;
17pub use serde_json;
18
19pub struct JitoClientBuilder {
21 url: Vec<String>,
22 broadcast: bool,
23 interval: Duration,
24 timeout: Option<Duration>,
25 proxy: Option<Proxy>,
26 headers: Option<HeaderMap>,
27 ip: Vec<IpAddr>,
28 semaphore: Option<Arc<Semaphore>>,
29 broadcast_status: Option<StatusCode>,
30}
31
32impl JitoClientBuilder {
33 pub fn new() -> Self {
35 Self {
36 url: vec!["https://mainnet.block-engine.jito.wtf".to_string()],
37 broadcast: false,
38 interval: Duration::ZERO,
39 timeout: None,
40 proxy: None,
41 headers: None,
42 ip: Vec::new(),
43 semaphore: None,
44 broadcast_status: None,
45 }
46 }
47
48 pub fn url<T: IntoIterator<Item = impl AsRef<str>>>(mut self, url: T) -> Self {
50 self.url = url.into_iter().map(|v| v.as_ref().to_string()).collect();
51 self
52 }
53
54 pub fn interval(mut self, interval: Duration) -> Self {
57 self.interval = interval;
58 self
59 }
60
61 pub fn ip(mut self, ip: Vec<IpAddr>) -> Self {
63 self.ip = ip;
64 self
65 }
66
67 pub fn broadcast(mut self, broadcast: bool) -> Self {
69 self.broadcast = broadcast;
70 self
71 }
72
73 pub fn timeout(mut self, timeout: Duration) -> Self {
75 self.timeout = Some(timeout);
76 self
77 }
78
79 pub fn proxy(mut self, proxy: Proxy) -> Self {
81 self.proxy = Some(proxy);
82 self
83 }
84
85 pub fn headers(mut self, headers: HeaderMap) -> Self {
87 self.headers = Some(headers);
88 self
89 }
90
91 pub fn semaphore(mut self, semaphore: Arc<Semaphore>) -> Self {
93 self.semaphore = Some(semaphore);
94 self
95 }
96
97 pub fn broadcast_status(mut self, broadcast_status: StatusCode) -> Self {
99 self.broadcast_status = Some(broadcast_status);
100 self
101 }
102
103 pub fn build(self) -> anyhow::Result<JitoClient> {
105 let semaphore = self.semaphore.unwrap_or(
106 Semaphore::new(if self.interval == Duration::ZERO {
107 usize::MAX
108 } else {
109 Duration::from_secs(1).div_duration_f64(self.interval) as usize
110 })
111 .into(),
112 );
113
114 let default_ip = self.ip.is_empty();
115
116 let inner = if self.broadcast {
117 let mut entries = Vec::new();
118
119 if default_ip {
120 let mut cb = ClientBuilder::new();
121
122 if let Some(v) = self.timeout {
123 cb = cb.timeout(v);
124 }
125
126 if let Some(v) = self.proxy {
127 cb = cb.proxy(v);
128 }
129
130 if let Some(v) = self.headers {
131 cb = cb.default_headers(v);
132 }
133
134 entries.push((self.interval, Arc::new((self.url.clone(), cb.build()?))));
135 } else {
136 for ip in &self.ip {
137 let mut cb = ClientBuilder::new();
138
139 if let Some(v) = self.timeout {
140 cb = cb.timeout(v);
141 }
142
143 if let Some(v) = self.proxy.clone() {
144 cb = cb.proxy(v);
145 }
146
147 if let Some(v) = self.headers.clone() {
148 cb = cb.default_headers(v);
149 }
150
151 cb = cb.local_address(*ip);
152
153 entries.push((self.interval, Arc::new((self.url.clone(), cb.build()?))));
154 }
155 }
156
157 JitoClientRef {
158 semaphore,
159 lb: IntervalLoadBalancer::new(entries),
160 broadcast_status: self.broadcast_status,
161 }
162 } else {
163 let mut entries = Vec::new();
164
165 if default_ip {
166 for url in &self.url {
167 let mut cb = ClientBuilder::new();
168
169 if let Some(v) = self.timeout {
170 cb = cb.timeout(v);
171 }
172
173 if let Some(v) = self.proxy.clone() {
174 cb = cb.proxy(v);
175 }
176
177 if let Some(v) = self.headers.clone() {
178 cb = cb.default_headers(v);
179 }
180
181 entries.push((self.interval, Arc::new((vec![url.clone()], cb.build()?))));
182 }
183 } else {
184 for url in &self.url {
185 for ip in &self.ip {
186 let mut cb = ClientBuilder::new();
187
188 if let Some(v) = self.timeout {
189 cb = cb.timeout(v);
190 }
191
192 if let Some(v) = self.proxy.clone() {
193 cb = cb.proxy(v);
194 }
195
196 if let Some(v) = self.headers.clone() {
197 cb = cb.default_headers(v);
198 }
199
200 cb = cb.local_address(*ip);
201
202 entries.push((self.interval, Arc::new((vec![url.clone()], cb.build()?))));
203 }
204 }
205 }
206
207 JitoClientRef {
208 semaphore,
209 lb: IntervalLoadBalancer::new(entries),
210 broadcast_status: self.broadcast_status,
211 }
212 };
213
214 Ok(JitoClient {
215 inner: inner.into(),
216 })
217 }
218}
219
220struct JitoClientRef {
221 semaphore: Arc<Semaphore>,
222 lb: IntervalLoadBalancer<Arc<(Vec<String>, Client)>>,
223 broadcast_status: Option<StatusCode>,
224}
225
226#[derive(Clone)]
228pub struct JitoClient {
229 inner: Arc<JitoClientRef>,
230}
231
232impl JitoClient {
233 pub fn new() -> Self {
235 JitoClientBuilder::new().build().unwrap()
236 }
237
238 pub async fn raw_send(&self, body: &serde_json::Value) -> anyhow::Result<Response> {
240 let (ref url, ref client) = *self.inner.lb.alloc().await;
241
242 if url.len() > 1 {
243 Ok(select_ok(url.iter().map(|v| {
244 Box::pin(async move {
245 let response = client.post(v).json(&body).send().await?;
246
247 if let Some(v) = self.inner.broadcast_status {
248 if response.status() == v {
249 Ok(response)
250 } else {
251 Err(anyhow!(
252 "Status code mismatch: expected {}, found {}",
253 v,
254 response.status()
255 ))
256 }
257 } else {
258 Ok(response)
259 }
260 })
261 }))
262 .await?
263 .0)
264 } else {
265 Ok(client.post(&url[0]).json(body).send().await?)
266 }
267 }
268
269 pub async fn raw_send_api(
271 &self,
272 api_url: impl AsRef<str>,
273 body: &serde_json::Value,
274 ) -> anyhow::Result<Response> {
275 let (ref url, ref client) = *self.inner.lb.alloc().await;
276 let api_url = api_url.as_ref();
277
278 if url.len() > 1 {
279 Ok(select_ok(url.iter().map(|v| {
280 Box::pin(async move {
281 let response = client
282 .post(&format!("{}{}", v, api_url))
283 .json(&body)
284 .send()
285 .await?;
286
287 if let Some(v) = self.inner.broadcast_status {
288 if response.status() == v {
289 Ok(response)
290 } else {
291 Err(anyhow!(
292 "Status code mismatch: expected {}, found {}",
293 v,
294 response.status()
295 ))
296 }
297 } else {
298 Ok(response)
299 }
300 })
301 }))
302 .await?
303 .0)
304 } else {
305 Ok(client
306 .post(&format!("{}{}", url[0], api_url))
307 .json(body)
308 .send()
309 .await?)
310 }
311 }
312
313 pub async fn raw_send_lazy(
315 &self,
316 body: impl Future<Output = anyhow::Result<serde_json::Value>>,
317 ) -> anyhow::Result<Response> {
318 let (ref url, ref client) = *self.inner.lb.alloc().await;
319 let body = &body.await?;
320
321 if url.len() > 1 {
322 Ok(select_ok(url.iter().map(|v| {
323 Box::pin(async move {
324 let response = client.post(v).json(body).send().await?;
325
326 if let Some(v) = self.inner.broadcast_status {
327 if response.status() == v {
328 Ok(response)
329 } else {
330 Err(anyhow!(
331 "Status code mismatch: expected {}, found {}",
332 v,
333 response.status()
334 ))
335 }
336 } else {
337 Ok(response)
338 }
339 })
340 }))
341 .await?
342 .0)
343 } else {
344 Ok(client.post(&url[0]).json(body).send().await?)
345 }
346 }
347
348 pub async fn raw_send_api_lazy(
350 &self,
351 api_url: impl AsRef<str>,
352 body: impl Future<Output = anyhow::Result<serde_json::Value>>,
353 ) -> anyhow::Result<Response> {
354 let (ref url, ref client) = *self.inner.lb.alloc().await;
355 let api_url = api_url.as_ref();
356 let body = &body.await?;
357
358 if url.len() > 1 {
359 Ok(select_ok(url.iter().map(|v| {
360 Box::pin(async move {
361 let response = client
362 .post(&format!("{}{}", v, api_url))
363 .json(&body)
364 .send()
365 .await?;
366
367 if let Some(v) = self.inner.broadcast_status {
368 if response.status() == v {
369 Ok(response)
370 } else {
371 Err(anyhow!(
372 "Status code mismatch: expected {}, found {}",
373 v,
374 response.status()
375 ))
376 }
377 } else {
378 Ok(response)
379 }
380 })
381 }))
382 .await?
383 .0)
384 } else {
385 Ok(client
386 .post(&format!("{}{}", url[0], api_url))
387 .json(&body)
388 .send()
389 .await?)
390 }
391 }
392
393 pub async fn raw_send_lazy_fn<F>(&self, callback: F) -> anyhow::Result<Response>
395 where
396 F: AsyncFnOnce(&Vec<String>, &Client) -> anyhow::Result<serde_json::Value>,
397 {
398 let (ref url, ref client) = *self.inner.lb.alloc().await;
399 let body = &callback(url, client).await?;
400
401 if url.len() > 1 {
402 Ok(select_ok(url.iter().map(|v| {
403 Box::pin(async move {
404 let response = client.post(v).json(body).send().await?;
405
406 if let Some(v) = self.inner.broadcast_status {
407 if response.status() == v {
408 Ok(response)
409 } else {
410 Err(anyhow!(
411 "Status code mismatch: expected {}, found {}",
412 v,
413 response.status()
414 ))
415 }
416 } else {
417 Ok(response)
418 }
419 })
420 }))
421 .await?
422 .0)
423 } else {
424 Ok(client.post(&url[0]).json(body).send().await?)
425 }
426 }
427
428 pub async fn raw_send_api_lazy_fn<F>(
430 &self,
431 api_url: impl AsRef<str>,
432 callback: F,
433 ) -> anyhow::Result<Response>
434 where
435 F: AsyncFnOnce(&Vec<String>, &Client) -> anyhow::Result<serde_json::Value>,
436 {
437 let (ref url, ref client) = *self.inner.lb.alloc().await;
438 let api_url = api_url.as_ref();
439 let body = &callback(url, client).await?;
440
441 if url.len() > 1 {
442 Ok(select_ok(url.iter().map(|v| {
443 Box::pin(async move {
444 let response = client
445 .post(&format!("{}{}", v, api_url))
446 .json(body)
447 .send()
448 .await?;
449
450 if let Some(v) = self.inner.broadcast_status {
451 if response.status() == v {
452 Ok(response)
453 } else {
454 Err(anyhow!(
455 "Status code mismatch: expected {}, found {}",
456 v,
457 response.status()
458 ))
459 }
460 } else {
461 Ok(response)
462 }
463 })
464 }))
465 .await?
466 .0)
467 } else {
468 Ok(client
469 .post(&format!("{}{}", url[0], api_url))
470 .json(body)
471 .send()
472 .await?)
473 }
474 }
475
476 pub async fn send_transaction(&self, tx: impl Serialize) -> anyhow::Result<Response> {
478 let data = serialize_tx(tx)?;
479
480 let body = &json!({
481 "id": 1,
482 "jsonrpc": "2.0",
483 "method": "sendTransaction",
484 "params": [
485 data, { "encoding": "base64" }
486 ]
487 });
488
489 let (ref url, ref client) = *self.inner.lb.alloc().await;
490
491 if url.len() > 1 {
492 Ok(select_ok(url.iter().map(|v| {
493 Box::pin(async move {
494 let response = client
495 .post(&format!("{}/api/v1/transactions", v))
496 .query(&[("bundleOnly", "true")])
497 .json(body)
498 .send()
499 .await?;
500
501 if let Some(v) = self.inner.broadcast_status {
502 if response.status() == v {
503 Ok(response)
504 } else {
505 Err(anyhow!(
506 "Status code mismatch: expected {}, found {}",
507 v,
508 response.status()
509 ))
510 }
511 } else {
512 Ok(response)
513 }
514 })
515 }))
516 .await?
517 .0)
518 } else {
519 Ok(client
520 .post(&format!("{}/api/v1/transactions", url[0]))
521 .query(&[("bundleOnly", "true")])
522 .json(body)
523 .send()
524 .await?)
525 }
526 }
527
528 pub async fn send_transaction_bid(&self, tx: impl Serialize) -> anyhow::Result<String> {
530 Ok(self
531 .send_transaction(tx)
532 .await?
533 .error_for_status()?
534 .headers()
535 .get("x-bundle-id")
536 .ok_or_else(|| anyhow!("missing `x-bundle-id` header"))?
537 .to_str()
538 .map_err(|v| anyhow!("invalid `x-bundle-id` header: {}", v))?
539 .to_string())
540 }
541
542 pub async fn send_transaction_no_bundle_only(
544 &self,
545 tx: impl Serialize,
546 ) -> anyhow::Result<Response> {
547 let data = serialize_tx(tx)?;
548 let body = &json!({
549 "id": 1,
550 "jsonrpc": "2.0",
551 "method": "sendTransaction",
552 "params": [
553 data, { "encoding": "base64" }
554 ]
555 });
556
557 let (ref url, ref client) = *self.inner.lb.alloc().await;
558
559 if url.len() > 1 {
560 Ok(select_ok(url.iter().map(|v| {
561 Box::pin(async move {
562 let response = client
563 .post(&format!("{}/api/v1/transactions", v))
564 .json(body)
565 .send()
566 .await?;
567
568 if let Some(v) = self.inner.broadcast_status {
569 if response.status() == v {
570 Ok(response)
571 } else {
572 Err(anyhow!(
573 "Status code mismatch: expected {}, found {}",
574 v,
575 response.status()
576 ))
577 }
578 } else {
579 Ok(response)
580 }
581 })
582 }))
583 .await?
584 .0)
585 } else {
586 Ok(client
587 .post(&format!("{}/api/v1/transactions", url[0]))
588 .json(body)
589 .send()
590 .await?)
591 }
592 }
593
594 pub async fn send_bundle<T: IntoIterator<Item = impl Serialize>>(
596 &self,
597 tx: T,
598 ) -> anyhow::Result<Response> {
599 let data = serialize_tx_vec(tx)?;
600
601 let body = &json!({
602 "id": 1,
603 "jsonrpc": "2.0",
604 "method": "sendBundle",
605 "params": [ data, { "encoding": "base64" } ]
606 });
607
608 let (ref url, ref client) = *self.inner.lb.alloc().await;
609
610 if url.len() > 1 {
611 Ok(select_ok(url.iter().map(|v| {
612 Box::pin(async move {
613 let response = client
614 .post(&format!("{}/api/v1/bundles", v))
615 .json(body)
616 .send()
617 .await?;
618
619 if let Some(v) = self.inner.broadcast_status {
620 if response.status() == v {
621 Ok(response)
622 } else {
623 Err(anyhow!(
624 "Status code mismatch: expected {}, found {}",
625 v,
626 response.status()
627 ))
628 }
629 } else {
630 Ok(response)
631 }
632 })
633 }))
634 .await?
635 .0)
636 } else {
637 Ok(client
638 .post(&format!("{}/api/v1/bundles", url[0]))
639 .json(body)
640 .send()
641 .await?)
642 }
643 }
644
645 pub async fn send_bundle_bid<T: IntoIterator<Item = impl Serialize>>(
647 &self,
648 tx: T,
649 ) -> anyhow::Result<String> {
650 self.send_bundle(tx)
651 .await?
652 .error_for_status()?
653 .json::<serde_json::Value>()
654 .await?["result"]
655 .as_str()
656 .map(|v| v.to_string())
657 .ok_or_else(|| anyhow::anyhow!("missing bundle result"))
658 }
659
660 pub async fn send_transaction_lazy<T>(
662 &self,
663 tx: impl Future<Output = anyhow::Result<T>>,
664 ) -> anyhow::Result<Response>
665 where
666 T: Serialize,
667 {
668 let (ref url, ref client) = *self.inner.lb.alloc().await;
669
670 let data = serialize_tx(tx.await?)?;
671
672 let body = &json!({
673 "id": 1,
674 "jsonrpc": "2.0",
675 "method": "sendTransaction",
676 "params": [
677 data, { "encoding": "base64" }
678 ]
679 });
680
681 if url.len() > 1 {
682 Ok(select_ok(url.iter().map(|v| {
683 Box::pin(async move {
684 let response = client
685 .post(&format!("{}/api/v1/transactions", v))
686 .query(&[("bundleOnly", "true")])
687 .json(body)
688 .send()
689 .await?;
690
691 if let Some(v) = self.inner.broadcast_status {
692 if response.status() == v {
693 Ok(response)
694 } else {
695 Err(anyhow!(
696 "Status code mismatch: expected {}, found {}",
697 v,
698 response.status()
699 ))
700 }
701 } else {
702 Ok(response)
703 }
704 })
705 }))
706 .await?
707 .0)
708 } else {
709 Ok(client
710 .post(&format!("{}/api/v1/transactions", url[0]))
711 .query(&[("bundleOnly", "true")])
712 .json(body)
713 .send()
714 .await?)
715 }
716 }
717
718 pub async fn send_transaction_bid_lazy<T>(
720 &self,
721 tx: impl Future<Output = anyhow::Result<T>>,
722 ) -> anyhow::Result<String>
723 where
724 T: Serialize,
725 {
726 Ok(self
727 .send_transaction_lazy(tx)
728 .await?
729 .error_for_status()?
730 .headers()
731 .get("x-bundle-id")
732 .ok_or_else(|| anyhow!("missing `x-bundle-id` header"))?
733 .to_str()
734 .map_err(|v| anyhow!("invalid `x-bundle-id` header: {}", v))?
735 .to_string())
736 }
737
738 pub async fn send_transaction_no_bundle_only_lazy<T>(
740 &self,
741 tx: impl Future<Output = anyhow::Result<T>>,
742 ) -> anyhow::Result<Response>
743 where
744 T: Serialize,
745 {
746 let (ref url, ref client) = *self.inner.lb.alloc().await;
747
748 let data = serialize_tx(tx.await?)?;
749
750 let body = &json!({
751 "id": 1,
752 "jsonrpc": "2.0",
753 "method": "sendTransaction",
754 "params": [
755 data, { "encoding": "base64" }
756 ]
757 });
758
759 if url.len() > 1 {
760 Ok(select_ok(url.iter().map(|v| {
761 Box::pin(async move {
762 let response = client
763 .post(&format!("{}/api/v1/transactions", v))
764 .json(body)
765 .send()
766 .await?;
767
768 if let Some(v) = self.inner.broadcast_status {
769 if response.status() == v {
770 Ok(response)
771 } else {
772 Err(anyhow!(
773 "Status code mismatch: expected {}, found {}",
774 v,
775 response.status()
776 ))
777 }
778 } else {
779 Ok(response)
780 }
781 })
782 }))
783 .await?
784 .0)
785 } else {
786 Ok(client
787 .post(&format!("{}/api/v1/transactions", url[0]))
788 .json(body)
789 .send()
790 .await?)
791 }
792 }
793
794 pub async fn send_bundle_lazy<T, S>(
796 &self,
797 tx: impl Future<Output = anyhow::Result<T>>,
798 ) -> anyhow::Result<Response>
799 where
800 T: IntoIterator<Item = S>,
801 S: Serialize,
802 {
803 let (ref url, ref client) = *self.inner.lb.alloc().await;
804
805 let data = serialize_tx_vec(tx.await?)?;
806
807 let body = &json!({
808 "id": 1,
809 "jsonrpc": "2.0",
810 "method": "sendBundle",
811 "params": [ data, { "encoding": "base64" } ]
812 });
813
814 if url.len() > 1 {
815 Ok(select_ok(url.iter().map(|v| {
816 Box::pin(async move {
817 let response = client
818 .post(&format!("{}/api/v1/bundles", v))
819 .json(body)
820 .send()
821 .await?;
822
823 if let Some(v) = self.inner.broadcast_status {
824 if response.status() == v {
825 Ok(response)
826 } else {
827 Err(anyhow!(
828 "Status code mismatch: expected {}, found {}",
829 v,
830 response.status()
831 ))
832 }
833 } else {
834 Ok(response)
835 }
836 })
837 }))
838 .await?
839 .0)
840 } else {
841 Ok(client
842 .post(&format!("{}/api/v1/bundles", url[0]))
843 .json(body)
844 .send()
845 .await?)
846 }
847 }
848
849 pub async fn send_bundle_bid_lazy<T, S>(
851 &self,
852 tx: impl Future<Output = anyhow::Result<T>>,
853 ) -> anyhow::Result<String>
854 where
855 T: IntoIterator<Item = S>,
856 S: Serialize,
857 {
858 self.send_bundle_lazy(tx)
859 .await?
860 .error_for_status()?
861 .json::<serde_json::Value>()
862 .await?["result"]
863 .as_str()
864 .map(|v| v.to_string())
865 .ok_or_else(|| anyhow::anyhow!("missing bundle result"))
866 }
867
868 pub async fn send_transaction_lazy_fn<F, T>(&self, callback: F) -> anyhow::Result<Response>
870 where
871 F: AsyncFnOnce(&Vec<String>, &Client) -> anyhow::Result<T>,
872 T: Serialize,
873 {
874 let (ref url, ref client) = *self.inner.lb.alloc().await;
875
876 let data = serialize_tx(callback(url, client).await?)?;
877
878 let body = &json!({
879 "id": 1,
880 "jsonrpc": "2.0",
881 "method": "sendTransaction",
882 "params": [
883 data, { "encoding": "base64" }
884 ]
885 });
886
887 if url.len() > 1 {
888 Ok(select_ok(url.iter().map(|v| {
889 Box::pin(async move {
890 let response = client
891 .post(&format!("{}/api/v1/transactions", v))
892 .query(&[("bundleOnly", "true")])
893 .json(body)
894 .send()
895 .await?;
896
897 if let Some(v) = self.inner.broadcast_status {
898 if response.status() == v {
899 Ok(response)
900 } else {
901 Err(anyhow!(
902 "Status code mismatch: expected {}, found {}",
903 v,
904 response.status()
905 ))
906 }
907 } else {
908 Ok(response)
909 }
910 })
911 }))
912 .await?
913 .0)
914 } else {
915 Ok(client
916 .post(&format!("{}/api/v1/transactions", url[0]))
917 .query(&[("bundleOnly", "true")])
918 .json(body)
919 .send()
920 .await?)
921 }
922 }
923
924 pub async fn send_transaction_bid_lazy_fn<F, T>(&self, callback: F) -> anyhow::Result<String>
926 where
927 F: AsyncFnOnce(&Vec<String>, &Client) -> anyhow::Result<T>,
928 T: Serialize,
929 {
930 Ok(self
931 .send_transaction_lazy_fn(callback)
932 .await?
933 .error_for_status()?
934 .headers()
935 .get("x-bundle-id")
936 .ok_or_else(|| anyhow!("missing `x-bundle-id` header"))?
937 .to_str()
938 .map_err(|v| anyhow!("invalid `x-bundle-id` header: {}", v))?
939 .to_string())
940 }
941
942 pub async fn send_transaction_no_bundle_only_lazy_fn<F, T>(
944 &self,
945 callback: F,
946 ) -> anyhow::Result<Response>
947 where
948 F: AsyncFnOnce(&Vec<String>, &Client) -> anyhow::Result<T>,
949 T: Serialize,
950 {
951 let (ref url, ref client) = *self.inner.lb.alloc().await;
952
953 let data = serialize_tx(callback(url, client).await?)?;
954
955 let body = &json!({
956 "id": 1,
957 "jsonrpc": "2.0",
958 "method": "sendTransaction",
959 "params": [
960 data, { "encoding": "base64" }
961 ]
962 });
963
964 if url.len() > 1 {
965 Ok(select_ok(url.iter().map(|v| {
966 Box::pin(async move {
967 let response = client
968 .post(&format!("{}/api/v1/transactions", v))
969 .json(body)
970 .send()
971 .await?;
972
973 if let Some(v) = self.inner.broadcast_status {
974 if response.status() == v {
975 Ok(response)
976 } else {
977 Err(anyhow!(
978 "Status code mismatch: expected {}, found {}",
979 v,
980 response.status()
981 ))
982 }
983 } else {
984 Ok(response)
985 }
986 })
987 }))
988 .await?
989 .0)
990 } else {
991 Ok(client
992 .post(&format!("{}/api/v1/transactions", url[0]))
993 .json(body)
994 .send()
995 .await?)
996 }
997 }
998
999 pub async fn send_bundle_lazy_fn<F, T, S>(&self, callback: F) -> anyhow::Result<Response>
1001 where
1002 F: AsyncFnOnce(&Vec<String>, &Client) -> anyhow::Result<T>,
1003 T: IntoIterator<Item = S>,
1004 S: Serialize,
1005 {
1006 let (ref url, ref client) = *self.inner.lb.alloc().await;
1007
1008 let data = serialize_tx_vec(callback(url, client).await?)?;
1009
1010 let body = &json!({
1011 "id": 1,
1012 "jsonrpc": "2.0",
1013 "method": "sendBundle",
1014 "params": [ data, { "encoding": "base64" } ]
1015 });
1016
1017 if url.len() > 1 {
1018 Ok(select_ok(url.iter().map(|v| {
1019 Box::pin(async move {
1020 let response = client
1021 .post(&format!("{}/api/v1/bundles", v))
1022 .json(body)
1023 .send()
1024 .await?;
1025
1026 if let Some(v) = self.inner.broadcast_status {
1027 if response.status() == v {
1028 Ok(response)
1029 } else {
1030 Err(anyhow!(
1031 "Status code mismatch: expected {}, found {}",
1032 v,
1033 response.status()
1034 ))
1035 }
1036 } else {
1037 Ok(response)
1038 }
1039 })
1040 }))
1041 .await?
1042 .0)
1043 } else {
1044 Ok(client
1045 .post(&format!("{}/api/v1/bundles", url[0]))
1046 .json(body)
1047 .send()
1048 .await?)
1049 }
1050 }
1051
1052 pub async fn send_bundle_bid_lazy_fn<F, T, S>(&self, callback: F) -> anyhow::Result<String>
1054 where
1055 F: AsyncFnOnce(&Vec<String>, &Client) -> anyhow::Result<T>,
1056 T: IntoIterator<Item = S>,
1057 S: Serialize,
1058 {
1059 self.send_bundle_lazy_fn(callback)
1060 .await?
1061 .error_for_status()?
1062 .json::<serde_json::Value>()
1063 .await?["result"]
1064 .as_str()
1065 .map(|v| v.to_string())
1066 .ok_or_else(|| anyhow::anyhow!("missing bundle result"))
1067 }
1068
1069 pub async fn spawn<F>(&self, future: F) -> tokio::task::JoinHandle<F::Output>
1074 where
1075 F: Future + Send + 'static,
1076 F::Output: Send + 'static,
1077 {
1078 let permit = self.inner.semaphore.clone().acquire_owned().await.unwrap();
1079
1080 spawn(async move {
1081 let result = future.await;
1082
1083 drop(permit);
1084
1085 result
1086 })
1087 }
1088}
1089
1090#[derive(Debug, Clone, Deserialize)]
1092pub struct JitoTip {
1093 pub landed_tips_25th_percentile: f64,
1094 pub landed_tips_50th_percentile: f64,
1095 pub landed_tips_75th_percentile: f64,
1096 pub landed_tips_95th_percentile: f64,
1097 pub landed_tips_99th_percentile: f64,
1098 pub ema_landed_tips_50th_percentile: f64,
1099}
1100
1101pub async fn get_jito_tip(client: Client) -> anyhow::Result<JitoTip> {
1103 Ok(client
1104 .get("https://bundles.jito.wtf/api/v1/bundles/tip_floor")
1105 .send()
1106 .await?
1107 .json::<Vec<JitoTip>>()
1108 .await?
1109 .get(0)
1110 .context("get_jito_tip: empty response")?
1111 .clone())
1112}
1113
1114#[derive(Debug, Deserialize)]
1116pub struct BundleResult {
1117 pub context: serde_json::Value,
1118 pub value: Option<Vec<BundleStatus>>,
1119}
1120
1121#[derive(Debug, Deserialize)]
1122pub struct BundleStatus {
1123 pub bundle_id: String,
1124 pub transactions: Option<Vec<String>>,
1125 pub slot: Option<u64>,
1126 pub confirmation_status: Option<String>,
1127 pub err: Option<serde_json::Value>,
1128}
1129
1130pub async fn get_bundle_statuses<T: IntoIterator<Item = impl AsRef<str>>>(
1132 client: Client,
1133 bundle: T,
1134) -> anyhow::Result<BundleResult> {
1135 #[derive(Debug, Deserialize)]
1136 struct RpcResponse {
1137 result: BundleResult,
1138 }
1139
1140 let payload = json!({
1141 "jsonrpc": "2.0",
1142 "id": 1,
1143 "method": "getBundleStatuses",
1144 "params": [bundle.into_iter().map(|v| v.as_ref().to_string()).collect::<Vec<_>>()],
1145 });
1146
1147 Ok(client
1148 .post("https://mainnet.block-engine.jito.wtf/api/v1/getBundleStatuses")
1149 .json(&payload)
1150 .send()
1151 .await?
1152 .json::<RpcResponse>()
1153 .await?
1154 .result)
1155}
1156
1157#[derive(Debug, Deserialize)]
1159pub struct InflightBundleStatus {
1160 pub bundle_id: String,
1161 pub status: String,
1162 pub landed_slot: Option<u64>,
1163}
1164
1165#[derive(Debug, Deserialize)]
1166pub struct InflightBundleResult {
1167 pub context: serde_json::Value,
1168 pub value: Option<Vec<InflightBundleStatus>>,
1169}
1170
1171pub async fn get_inflight_bundle_statuses<T: IntoIterator<Item = impl AsRef<str>>>(
1173 client: Client,
1174 bundle: T,
1175) -> anyhow::Result<InflightBundleResult> {
1176 #[derive(Debug, Deserialize)]
1177 struct InflightRpcResponse {
1178 result: InflightBundleResult,
1179 }
1180
1181 let payload = json!({
1182 "jsonrpc": "2.0",
1183 "id": 1,
1184 "method": "getInflightBundleStatuses",
1185 "params": [bundle.into_iter().map(|v| v.as_ref().to_string()).collect::<Vec<_>>()],
1186 });
1187
1188 Ok(client
1189 .post("https://mainnet.block-engine.jito.wtf/api/v1/getInflightBundleStatuses")
1190 .json(&payload)
1191 .send()
1192 .await?
1193 .json::<InflightRpcResponse>()
1194 .await?
1195 .result)
1196}
1197
1198pub fn get_ip_list() -> anyhow::Result<Vec<IpAddr>> {
1200 Ok(get_if_addrs()?
1201 .into_iter()
1202 .filter(|v| !v.is_loopback())
1203 .map(|v| v.ip())
1204 .collect::<Vec<_>>())
1205}
1206
1207pub fn get_ipv4_list() -> anyhow::Result<Vec<IpAddr>> {
1209 Ok(get_if_addrs()?
1210 .into_iter()
1211 .filter(|v| !v.is_loopback() && v.ip().is_ipv4())
1212 .map(|v| v.ip())
1213 .collect::<Vec<_>>())
1214}
1215
1216pub fn get_ipv6_list() -> anyhow::Result<Vec<IpAddr>> {
1218 Ok(get_if_addrs()?
1219 .into_iter()
1220 .filter(|v| !v.is_loopback() && v.ip().is_ipv6())
1221 .map(|v| v.ip())
1222 .collect::<Vec<_>>())
1223}
1224
1225pub async fn test_ip(ip: IpAddr) -> anyhow::Result<IpAddr> {
1226 reqwest::ClientBuilder::new()
1227 .timeout(Duration::from_secs(3))
1228 .local_address(ip)
1229 .build()?
1230 .get("https://apple.com")
1231 .send()
1232 .await?;
1233
1234 Ok(ip)
1235}
1236
1237pub async fn test_all_ip() -> Vec<anyhow::Result<IpAddr>> {
1238 match get_ip_list() {
1239 Ok(v) => join_all(v.into_iter().map(|v| test_ip(v))).await,
1240 Err(_) => Vec::new(),
1241 }
1242}
1243
1244pub async fn test_all_ipv4() -> Vec<anyhow::Result<IpAddr>> {
1245 match get_ipv4_list() {
1246 Ok(v) => join_all(v.into_iter().map(|v| test_ip(v))).await,
1247 Err(_) => Vec::new(),
1248 }
1249}
1250
1251pub async fn test_all_ipv6() -> Vec<anyhow::Result<IpAddr>> {
1252 match get_ipv6_list() {
1253 Ok(v) => join_all(v.into_iter().map(|v| test_ip(v))).await,
1254 Err(_) => Vec::new(),
1255 }
1256}
1257
1258pub fn serialize_tx(tx: impl Serialize) -> anyhow::Result<String> {
1259 Ok(BASE64_STANDARD.encode(bincode::serialize(&tx)?))
1260}
1261
1262pub fn serialize_tx_vec<T: IntoIterator<Item = impl Serialize>>(
1263 tx: T,
1264) -> anyhow::Result<Vec<String>> {
1265 tx.into_iter()
1266 .map(|tx| {
1267 Ok(BASE64_STANDARD.encode(
1268 bincode::serialize(&tx)
1269 .map_err(|v| anyhow::anyhow!("failed to serialize tx: {}", v))?,
1270 ))
1271 })
1272 .collect::<anyhow::Result<Vec<_>>>()
1273}