jito_client/
lib.rs

1use anyhow::{Context, anyhow};
2use base64::prelude::*;
3use futures::future::{join_all, select_ok};
4use load_balancer::{LoadBalancer, interval::IntervalLoadBalancer};
5use reqwest::{Client, ClientBuilder, Response, StatusCode};
6use serde::{Deserialize, Serialize};
7use serde_json::json;
8use std::{net::IpAddr, sync::Arc, time::Duration};
9use tokio::spawn;
10use tokio::sync::Semaphore;
11
12pub use get_if_addrs::get_if_addrs;
13pub use load_balancer;
14pub use reqwest;
15pub use reqwest::Proxy;
16pub use reqwest::header::HeaderMap;
17pub use serde_json;
18
19/// Builder for configuring and creating a `JitoClient`.
20pub struct JitoClientBuilder {
21    url: Vec<String>,
22    broadcast: bool,
23    interval: Duration,
24    timeout: Option<Duration>,
25    proxy: Option<Proxy>,
26    headers: Option<HeaderMap>,
27    ip: Vec<IpAddr>,
28    semaphore: Option<Arc<Semaphore>>,
29    broadcast_status: Option<StatusCode>,
30}
31
32impl JitoClientBuilder {
33    /// Creates a new `JitoClientBuilder` with default settings.
34    pub fn new() -> Self {
35        Self {
36            url: vec!["https://mainnet.block-engine.jito.wtf".to_string()],
37            broadcast: false,
38            interval: Duration::ZERO,
39            timeout: None,
40            proxy: None,
41            headers: None,
42            ip: Vec::new(),
43            semaphore: None,
44            broadcast_status: None,
45        }
46    }
47
48    /// Sets the target URLs for the client.
49    pub fn url<T: IntoIterator<Item = impl AsRef<str>>>(mut self, url: T) -> Self {
50        self.url = url.into_iter().map(|v| v.as_ref().to_string()).collect();
51        self
52    }
53
54    /// Sets the interval duration between requests (0 = unlimited)
55    /// For example, 5 requests per second = 200 ms interval.
56    pub fn interval(mut self, interval: Duration) -> Self {
57        self.interval = interval;
58        self
59    }
60
61    /// Sets the local IP addresses to bind outgoing requests to.
62    pub fn ip(mut self, ip: Vec<IpAddr>) -> Self {
63        self.ip = ip;
64        self
65    }
66
67    /// Broadcast each request to all configured URLs.
68    pub fn broadcast(mut self, broadcast: bool) -> Self {
69        self.broadcast = broadcast;
70        self
71    }
72
73    /// Sets a timeout duration for requests.
74    pub fn timeout(mut self, timeout: Duration) -> Self {
75        self.timeout = Some(timeout);
76        self
77    }
78
79    /// Sets a proxy for the client.
80    pub fn proxy(mut self, proxy: Proxy) -> Self {
81        self.proxy = Some(proxy);
82        self
83    }
84
85    /// Sets headers for the client.
86    pub fn headers(mut self, headers: HeaderMap) -> Self {
87        self.headers = Some(headers);
88        self
89    }
90
91    /// Sets semaphore for the client.
92    pub fn semaphore(mut self, semaphore: Arc<Semaphore>) -> Self {
93        self.semaphore = Some(semaphore);
94        self
95    }
96
97    /// Sets the status code considered successful in broadcast mode for select_ok.
98    pub fn broadcast_status(mut self, broadcast_status: StatusCode) -> Self {
99        self.broadcast_status = Some(broadcast_status);
100        self
101    }
102
103    /// Builds the `JitoClient` with the configured options.
104    pub fn build(self) -> anyhow::Result<JitoClient> {
105        let semaphore = self.semaphore.unwrap_or(
106            Semaphore::new(if self.interval == Duration::ZERO {
107                usize::MAX
108            } else {
109                Duration::from_secs(1).div_duration_f64(self.interval) as usize
110            })
111            .into(),
112        );
113
114        let default_ip = self.ip.is_empty();
115
116        let inner = if self.broadcast {
117            let mut entries = Vec::new();
118
119            if default_ip {
120                let mut cb = ClientBuilder::new();
121
122                if let Some(v) = self.timeout {
123                    cb = cb.timeout(v);
124                }
125
126                if let Some(v) = self.proxy {
127                    cb = cb.proxy(v);
128                }
129
130                if let Some(v) = self.headers {
131                    cb = cb.default_headers(v);
132                }
133
134                entries.push((self.interval, Arc::new((self.url.clone(), cb.build()?))));
135            } else {
136                for ip in &self.ip {
137                    let mut cb = ClientBuilder::new();
138
139                    if let Some(v) = self.timeout {
140                        cb = cb.timeout(v);
141                    }
142
143                    if let Some(v) = self.proxy.clone() {
144                        cb = cb.proxy(v);
145                    }
146
147                    if let Some(v) = self.headers.clone() {
148                        cb = cb.default_headers(v);
149                    }
150
151                    cb = cb.local_address(*ip);
152
153                    entries.push((self.interval, Arc::new((self.url.clone(), cb.build()?))));
154                }
155            }
156
157            JitoClientRef {
158                semaphore,
159                lb: IntervalLoadBalancer::new(entries),
160                broadcast_status: self.broadcast_status,
161            }
162        } else {
163            let mut entries = Vec::new();
164
165            if default_ip {
166                for url in &self.url {
167                    let mut cb = ClientBuilder::new();
168
169                    if let Some(v) = self.timeout {
170                        cb = cb.timeout(v);
171                    }
172
173                    if let Some(v) = self.proxy.clone() {
174                        cb = cb.proxy(v);
175                    }
176
177                    if let Some(v) = self.headers.clone() {
178                        cb = cb.default_headers(v);
179                    }
180
181                    entries.push((self.interval, Arc::new((vec![url.clone()], cb.build()?))));
182                }
183            } else {
184                for url in &self.url {
185                    for ip in &self.ip {
186                        let mut cb = ClientBuilder::new();
187
188                        if let Some(v) = self.timeout {
189                            cb = cb.timeout(v);
190                        }
191
192                        if let Some(v) = self.proxy.clone() {
193                            cb = cb.proxy(v);
194                        }
195
196                        if let Some(v) = self.headers.clone() {
197                            cb = cb.default_headers(v);
198                        }
199
200                        cb = cb.local_address(*ip);
201
202                        entries.push((self.interval, Arc::new((vec![url.clone()], cb.build()?))));
203                    }
204                }
205            }
206
207            JitoClientRef {
208                semaphore,
209                lb: IntervalLoadBalancer::new(entries),
210                broadcast_status: self.broadcast_status,
211            }
212        };
213
214        Ok(JitoClient {
215            inner: inner.into(),
216        })
217    }
218}
219
220struct JitoClientRef {
221    semaphore: Arc<Semaphore>,
222    lb: IntervalLoadBalancer<Arc<(Vec<String>, Client)>>,
223    broadcast_status: Option<StatusCode>,
224}
225
226/// Jito client for sending transactions and bundles.
227#[derive(Clone)]
228pub struct JitoClient {
229    inner: Arc<JitoClientRef>,
230}
231
232impl JitoClient {
233    /// Creates a new client with default settings.
234    pub fn new() -> Self {
235        JitoClientBuilder::new().build().unwrap()
236    }
237
238    /// Sends a raw request.
239    pub async fn raw_send(&self, body: &serde_json::Value) -> anyhow::Result<Response> {
240        let (ref url, ref client) = *self.inner.lb.alloc().await;
241
242        if url.len() > 1 {
243            Ok(select_ok(url.iter().map(|v| {
244                Box::pin(async move {
245                    let response = client.post(v).json(&body).send().await?;
246
247                    if let Some(v) = self.inner.broadcast_status {
248                        if response.status() == v {
249                            Ok(response)
250                        } else {
251                            Err(anyhow!(
252                                "Status code mismatch: expected {}, found {}",
253                                v,
254                                response.status()
255                            ))
256                        }
257                    } else {
258                        Ok(response)
259                    }
260                })
261            }))
262            .await?
263            .0)
264        } else {
265            Ok(client.post(&url[0]).json(body).send().await?)
266        }
267    }
268
269    /// Sends a raw request, use base_url + api_url.
270    pub async fn raw_send_api(
271        &self,
272        api_url: impl AsRef<str>,
273        body: &serde_json::Value,
274    ) -> anyhow::Result<Response> {
275        let (ref url, ref client) = *self.inner.lb.alloc().await;
276        let api_url = api_url.as_ref();
277
278        if url.len() > 1 {
279            Ok(select_ok(url.iter().map(|v| {
280                Box::pin(async move {
281                    let response = client
282                        .post(&format!("{}{}", v, api_url))
283                        .json(&body)
284                        .send()
285                        .await?;
286
287                    if let Some(v) = self.inner.broadcast_status {
288                        if response.status() == v {
289                            Ok(response)
290                        } else {
291                            Err(anyhow!(
292                                "Status code mismatch: expected {}, found {}",
293                                v,
294                                response.status()
295                            ))
296                        }
297                    } else {
298                        Ok(response)
299                    }
300                })
301            }))
302            .await?
303            .0)
304        } else {
305            Ok(client
306                .post(&format!("{}{}", url[0], api_url))
307                .json(body)
308                .send()
309                .await?)
310        }
311    }
312
313    /// Sends a raw request, with lazy body construction.
314    pub async fn raw_send_lazy(
315        &self,
316        body: impl Future<Output = anyhow::Result<serde_json::Value>>,
317    ) -> anyhow::Result<Response> {
318        let (ref url, ref client) = *self.inner.lb.alloc().await;
319        let body = &body.await?;
320
321        if url.len() > 1 {
322            Ok(select_ok(url.iter().map(|v| {
323                Box::pin(async move {
324                    let response = client.post(v).json(body).send().await?;
325
326                    if let Some(v) = self.inner.broadcast_status {
327                        if response.status() == v {
328                            Ok(response)
329                        } else {
330                            Err(anyhow!(
331                                "Status code mismatch: expected {}, found {}",
332                                v,
333                                response.status()
334                            ))
335                        }
336                    } else {
337                        Ok(response)
338                    }
339                })
340            }))
341            .await?
342            .0)
343        } else {
344            Ok(client.post(&url[0]).json(body).send().await?)
345        }
346    }
347
348    /// Sends a raw request, use base_url + api_url, with lazy body construction.
349    pub async fn raw_send_api_lazy(
350        &self,
351        api_url: impl AsRef<str>,
352        body: impl Future<Output = anyhow::Result<serde_json::Value>>,
353    ) -> anyhow::Result<Response> {
354        let (ref url, ref client) = *self.inner.lb.alloc().await;
355        let api_url = api_url.as_ref();
356        let body = &body.await?;
357
358        if url.len() > 1 {
359            Ok(select_ok(url.iter().map(|v| {
360                Box::pin(async move {
361                    let response = client
362                        .post(&format!("{}{}", v, api_url))
363                        .json(&body)
364                        .send()
365                        .await?;
366
367                    if let Some(v) = self.inner.broadcast_status {
368                        if response.status() == v {
369                            Ok(response)
370                        } else {
371                            Err(anyhow!(
372                                "Status code mismatch: expected {}, found {}",
373                                v,
374                                response.status()
375                            ))
376                        }
377                    } else {
378                        Ok(response)
379                    }
380                })
381            }))
382            .await?
383            .0)
384        } else {
385            Ok(client
386                .post(&format!("{}{}", url[0], api_url))
387                .json(&body)
388                .send()
389                .await?)
390        }
391    }
392
393    /// Sends a raw request, with lazy function to build the body.
394    pub async fn raw_send_lazy_fn<F>(&self, callback: F) -> anyhow::Result<Response>
395    where
396        F: AsyncFnOnce(&Vec<String>, &Client) -> anyhow::Result<serde_json::Value>,
397    {
398        let (ref url, ref client) = *self.inner.lb.alloc().await;
399        let body = &callback(url, client).await?;
400
401        if url.len() > 1 {
402            Ok(select_ok(url.iter().map(|v| {
403                Box::pin(async move {
404                    let response = client.post(v).json(body).send().await?;
405
406                    if let Some(v) = self.inner.broadcast_status {
407                        if response.status() == v {
408                            Ok(response)
409                        } else {
410                            Err(anyhow!(
411                                "Status code mismatch: expected {}, found {}",
412                                v,
413                                response.status()
414                            ))
415                        }
416                    } else {
417                        Ok(response)
418                    }
419                })
420            }))
421            .await?
422            .0)
423        } else {
424            Ok(client.post(&url[0]).json(body).send().await?)
425        }
426    }
427
428    /// Sends a raw request, use base_url + api_url, with lazy function to build the body.
429    pub async fn raw_send_api_lazy_fn<F>(
430        &self,
431        api_url: impl AsRef<str>,
432        callback: F,
433    ) -> anyhow::Result<Response>
434    where
435        F: AsyncFnOnce(&Vec<String>, &Client) -> anyhow::Result<serde_json::Value>,
436    {
437        let (ref url, ref client) = *self.inner.lb.alloc().await;
438        let api_url = api_url.as_ref();
439        let body = &callback(url, client).await?;
440
441        if url.len() > 1 {
442            Ok(select_ok(url.iter().map(|v| {
443                Box::pin(async move {
444                    let response = client
445                        .post(&format!("{}{}", v, api_url))
446                        .json(body)
447                        .send()
448                        .await?;
449
450                    if let Some(v) = self.inner.broadcast_status {
451                        if response.status() == v {
452                            Ok(response)
453                        } else {
454                            Err(anyhow!(
455                                "Status code mismatch: expected {}, found {}",
456                                v,
457                                response.status()
458                            ))
459                        }
460                    } else {
461                        Ok(response)
462                    }
463                })
464            }))
465            .await?
466            .0)
467        } else {
468            Ok(client
469                .post(&format!("{}{}", url[0], api_url))
470                .json(body)
471                .send()
472                .await?)
473        }
474    }
475
476    /// Sends a single transaction and returns the HTTP response.
477    pub async fn send_transaction(&self, tx: impl Serialize) -> anyhow::Result<Response> {
478        let data = serialize_tx(tx)?;
479
480        let body = &json!({
481            "id": 1,
482            "jsonrpc": "2.0",
483            "method": "sendTransaction",
484            "params": [
485                data, { "encoding": "base64" }
486            ]
487        });
488
489        let (ref url, ref client) = *self.inner.lb.alloc().await;
490
491        if url.len() > 1 {
492            Ok(select_ok(url.iter().map(|v| {
493                Box::pin(async move {
494                    let response = client
495                        .post(&format!("{}/api/v1/transactions", v))
496                        .query(&[("bundleOnly", "true")])
497                        .json(body)
498                        .send()
499                        .await?;
500
501                    if let Some(v) = self.inner.broadcast_status {
502                        if response.status() == v {
503                            Ok(response)
504                        } else {
505                            Err(anyhow!(
506                                "Status code mismatch: expected {}, found {}",
507                                v,
508                                response.status()
509                            ))
510                        }
511                    } else {
512                        Ok(response)
513                    }
514                })
515            }))
516            .await?
517            .0)
518        } else {
519            Ok(client
520                .post(&format!("{}/api/v1/transactions", url[0]))
521                .query(&[("bundleOnly", "true")])
522                .json(body)
523                .send()
524                .await?)
525        }
526    }
527
528    /// Sends a transaction and returns the bundle ID from the response headers.
529    pub async fn send_transaction_bid(&self, tx: impl Serialize) -> anyhow::Result<String> {
530        Ok(self
531            .send_transaction(tx)
532            .await?
533            .error_for_status()?
534            .headers()
535            .get("x-bundle-id")
536            .ok_or_else(|| anyhow!("missing `x-bundle-id` header"))?
537            .to_str()
538            .map_err(|v| anyhow!("invalid `x-bundle-id` header: {}", v))?
539            .to_string())
540    }
541
542    /// Sends a transaction without `bundleOnly` flag.
543    pub async fn send_transaction_no_bundle_only(
544        &self,
545        tx: impl Serialize,
546    ) -> anyhow::Result<Response> {
547        let data = serialize_tx(tx)?;
548        let body = &json!({
549            "id": 1,
550            "jsonrpc": "2.0",
551            "method": "sendTransaction",
552            "params": [
553                data, { "encoding": "base64" }
554            ]
555        });
556
557        let (ref url, ref client) = *self.inner.lb.alloc().await;
558
559        if url.len() > 1 {
560            Ok(select_ok(url.iter().map(|v| {
561                Box::pin(async move {
562                    let response = client
563                        .post(&format!("{}/api/v1/transactions", v))
564                        .json(body)
565                        .send()
566                        .await?;
567
568                    if let Some(v) = self.inner.broadcast_status {
569                        if response.status() == v {
570                            Ok(response)
571                        } else {
572                            Err(anyhow!(
573                                "Status code mismatch: expected {}, found {}",
574                                v,
575                                response.status()
576                            ))
577                        }
578                    } else {
579                        Ok(response)
580                    }
581                })
582            }))
583            .await?
584            .0)
585        } else {
586            Ok(client
587                .post(&format!("{}/api/v1/transactions", url[0]))
588                .json(body)
589                .send()
590                .await?)
591        }
592    }
593
594    /// Sends multiple transactions as a bundle.
595    pub async fn send_bundle<T: IntoIterator<Item = impl Serialize>>(
596        &self,
597        tx: T,
598    ) -> anyhow::Result<Response> {
599        let data = serialize_tx_vec(tx)?;
600
601        let body = &json!({
602            "id": 1,
603            "jsonrpc": "2.0",
604            "method": "sendBundle",
605            "params": [ data, { "encoding": "base64" } ]
606        });
607
608        let (ref url, ref client) = *self.inner.lb.alloc().await;
609
610        if url.len() > 1 {
611            Ok(select_ok(url.iter().map(|v| {
612                Box::pin(async move {
613                    let response = client
614                        .post(&format!("{}/api/v1/bundles", v))
615                        .json(body)
616                        .send()
617                        .await?;
618
619                    if let Some(v) = self.inner.broadcast_status {
620                        if response.status() == v {
621                            Ok(response)
622                        } else {
623                            Err(anyhow!(
624                                "Status code mismatch: expected {}, found {}",
625                                v,
626                                response.status()
627                            ))
628                        }
629                    } else {
630                        Ok(response)
631                    }
632                })
633            }))
634            .await?
635            .0)
636        } else {
637            Ok(client
638                .post(&format!("{}/api/v1/bundles", url[0]))
639                .json(body)
640                .send()
641                .await?)
642        }
643    }
644
645    /// Sends a bundle and returns its bundle ID from the JSON response.
646    pub async fn send_bundle_bid<T: IntoIterator<Item = impl Serialize>>(
647        &self,
648        tx: T,
649    ) -> anyhow::Result<String> {
650        self.send_bundle(tx)
651            .await?
652            .error_for_status()?
653            .json::<serde_json::Value>()
654            .await?["result"]
655            .as_str()
656            .map(|v| v.to_string())
657            .ok_or_else(|| anyhow::anyhow!("missing bundle result"))
658    }
659
660    /// Sends a single transaction and returns the HTTP response, with lazy serialization.
661    pub async fn send_transaction_lazy<T>(
662        &self,
663        tx: impl Future<Output = anyhow::Result<T>>,
664    ) -> anyhow::Result<Response>
665    where
666        T: Serialize,
667    {
668        let (ref url, ref client) = *self.inner.lb.alloc().await;
669
670        let data = serialize_tx(tx.await?)?;
671
672        let body = &json!({
673            "id": 1,
674            "jsonrpc": "2.0",
675            "method": "sendTransaction",
676            "params": [
677                data, { "encoding": "base64" }
678            ]
679        });
680
681        if url.len() > 1 {
682            Ok(select_ok(url.iter().map(|v| {
683                Box::pin(async move {
684                    let response = client
685                        .post(&format!("{}/api/v1/transactions", v))
686                        .query(&[("bundleOnly", "true")])
687                        .json(body)
688                        .send()
689                        .await?;
690
691                    if let Some(v) = self.inner.broadcast_status {
692                        if response.status() == v {
693                            Ok(response)
694                        } else {
695                            Err(anyhow!(
696                                "Status code mismatch: expected {}, found {}",
697                                v,
698                                response.status()
699                            ))
700                        }
701                    } else {
702                        Ok(response)
703                    }
704                })
705            }))
706            .await?
707            .0)
708        } else {
709            Ok(client
710                .post(&format!("{}/api/v1/transactions", url[0]))
711                .query(&[("bundleOnly", "true")])
712                .json(body)
713                .send()
714                .await?)
715        }
716    }
717
718    /// Sends a transaction and returns the bundle ID from the response headers, with lazy serialization.
719    pub async fn send_transaction_bid_lazy<T>(
720        &self,
721        tx: impl Future<Output = anyhow::Result<T>>,
722    ) -> anyhow::Result<String>
723    where
724        T: Serialize,
725    {
726        Ok(self
727            .send_transaction_lazy(tx)
728            .await?
729            .error_for_status()?
730            .headers()
731            .get("x-bundle-id")
732            .ok_or_else(|| anyhow!("missing `x-bundle-id` header"))?
733            .to_str()
734            .map_err(|v| anyhow!("invalid `x-bundle-id` header: {}", v))?
735            .to_string())
736    }
737
738    /// Sends a transaction without `bundleOnly` flag, with lazy serialization.
739    pub async fn send_transaction_no_bundle_only_lazy<T>(
740        &self,
741        tx: impl Future<Output = anyhow::Result<T>>,
742    ) -> anyhow::Result<Response>
743    where
744        T: Serialize,
745    {
746        let (ref url, ref client) = *self.inner.lb.alloc().await;
747
748        let data = serialize_tx(tx.await?)?;
749
750        let body = &json!({
751            "id": 1,
752            "jsonrpc": "2.0",
753            "method": "sendTransaction",
754            "params": [
755                data, { "encoding": "base64" }
756            ]
757        });
758
759        if url.len() > 1 {
760            Ok(select_ok(url.iter().map(|v| {
761                Box::pin(async move {
762                    let response = client
763                        .post(&format!("{}/api/v1/transactions", v))
764                        .json(body)
765                        .send()
766                        .await?;
767
768                    if let Some(v) = self.inner.broadcast_status {
769                        if response.status() == v {
770                            Ok(response)
771                        } else {
772                            Err(anyhow!(
773                                "Status code mismatch: expected {}, found {}",
774                                v,
775                                response.status()
776                            ))
777                        }
778                    } else {
779                        Ok(response)
780                    }
781                })
782            }))
783            .await?
784            .0)
785        } else {
786            Ok(client
787                .post(&format!("{}/api/v1/transactions", url[0]))
788                .json(body)
789                .send()
790                .await?)
791        }
792    }
793
794    /// Sends multiple transactions as a bundle, with lazy serialization.
795    pub async fn send_bundle_lazy<T, S>(
796        &self,
797        tx: impl Future<Output = anyhow::Result<T>>,
798    ) -> anyhow::Result<Response>
799    where
800        T: IntoIterator<Item = S>,
801        S: Serialize,
802    {
803        let (ref url, ref client) = *self.inner.lb.alloc().await;
804
805        let data = serialize_tx_vec(tx.await?)?;
806
807        let body = &json!({
808            "id": 1,
809            "jsonrpc": "2.0",
810            "method": "sendBundle",
811            "params": [ data, { "encoding": "base64" } ]
812        });
813
814        if url.len() > 1 {
815            Ok(select_ok(url.iter().map(|v| {
816                Box::pin(async move {
817                    let response = client
818                        .post(&format!("{}/api/v1/bundles", v))
819                        .json(body)
820                        .send()
821                        .await?;
822
823                    if let Some(v) = self.inner.broadcast_status {
824                        if response.status() == v {
825                            Ok(response)
826                        } else {
827                            Err(anyhow!(
828                                "Status code mismatch: expected {}, found {}",
829                                v,
830                                response.status()
831                            ))
832                        }
833                    } else {
834                        Ok(response)
835                    }
836                })
837            }))
838            .await?
839            .0)
840        } else {
841            Ok(client
842                .post(&format!("{}/api/v1/bundles", url[0]))
843                .json(body)
844                .send()
845                .await?)
846        }
847    }
848
849    /// Sends a bundle and returns its bundle ID from the JSON response, with lazy serialization.
850    pub async fn send_bundle_bid_lazy<T, S>(
851        &self,
852        tx: impl Future<Output = anyhow::Result<T>>,
853    ) -> anyhow::Result<String>
854    where
855        T: IntoIterator<Item = S>,
856        S: Serialize,
857    {
858        self.send_bundle_lazy(tx)
859            .await?
860            .error_for_status()?
861            .json::<serde_json::Value>()
862            .await?["result"]
863            .as_str()
864            .map(|v| v.to_string())
865            .ok_or_else(|| anyhow::anyhow!("missing bundle result"))
866    }
867
868    /// Sends a single transaction and returns the HTTP response, with lazy serialization.
869    pub async fn send_transaction_lazy_fn<F, T>(&self, callback: F) -> anyhow::Result<Response>
870    where
871        F: AsyncFnOnce(&Vec<String>, &Client) -> anyhow::Result<T>,
872        T: Serialize,
873    {
874        let (ref url, ref client) = *self.inner.lb.alloc().await;
875
876        let data = serialize_tx(callback(url, client).await?)?;
877
878        let body = &json!({
879            "id": 1,
880            "jsonrpc": "2.0",
881            "method": "sendTransaction",
882            "params": [
883                data, { "encoding": "base64" }
884            ]
885        });
886
887        if url.len() > 1 {
888            Ok(select_ok(url.iter().map(|v| {
889                Box::pin(async move {
890                    let response = client
891                        .post(&format!("{}/api/v1/transactions", v))
892                        .query(&[("bundleOnly", "true")])
893                        .json(body)
894                        .send()
895                        .await?;
896
897                    if let Some(v) = self.inner.broadcast_status {
898                        if response.status() == v {
899                            Ok(response)
900                        } else {
901                            Err(anyhow!(
902                                "Status code mismatch: expected {}, found {}",
903                                v,
904                                response.status()
905                            ))
906                        }
907                    } else {
908                        Ok(response)
909                    }
910                })
911            }))
912            .await?
913            .0)
914        } else {
915            Ok(client
916                .post(&format!("{}/api/v1/transactions", url[0]))
917                .query(&[("bundleOnly", "true")])
918                .json(body)
919                .send()
920                .await?)
921        }
922    }
923
924    /// Sends a transaction and returns the bundle ID from the response headers, with lazy serialization.
925    pub async fn send_transaction_bid_lazy_fn<F, T>(&self, callback: F) -> anyhow::Result<String>
926    where
927        F: AsyncFnOnce(&Vec<String>, &Client) -> anyhow::Result<T>,
928        T: Serialize,
929    {
930        Ok(self
931            .send_transaction_lazy_fn(callback)
932            .await?
933            .error_for_status()?
934            .headers()
935            .get("x-bundle-id")
936            .ok_or_else(|| anyhow!("missing `x-bundle-id` header"))?
937            .to_str()
938            .map_err(|v| anyhow!("invalid `x-bundle-id` header: {}", v))?
939            .to_string())
940    }
941
942    /// Sends a transaction without `bundleOnly` flag, with lazy serialization.
943    pub async fn send_transaction_no_bundle_only_lazy_fn<F, T>(
944        &self,
945        callback: F,
946    ) -> anyhow::Result<Response>
947    where
948        F: AsyncFnOnce(&Vec<String>, &Client) -> anyhow::Result<T>,
949        T: Serialize,
950    {
951        let (ref url, ref client) = *self.inner.lb.alloc().await;
952
953        let data = serialize_tx(callback(url, client).await?)?;
954
955        let body = &json!({
956            "id": 1,
957            "jsonrpc": "2.0",
958            "method": "sendTransaction",
959            "params": [
960                data, { "encoding": "base64" }
961            ]
962        });
963
964        if url.len() > 1 {
965            Ok(select_ok(url.iter().map(|v| {
966                Box::pin(async move {
967                    let response = client
968                        .post(&format!("{}/api/v1/transactions", v))
969                        .json(body)
970                        .send()
971                        .await?;
972
973                    if let Some(v) = self.inner.broadcast_status {
974                        if response.status() == v {
975                            Ok(response)
976                        } else {
977                            Err(anyhow!(
978                                "Status code mismatch: expected {}, found {}",
979                                v,
980                                response.status()
981                            ))
982                        }
983                    } else {
984                        Ok(response)
985                    }
986                })
987            }))
988            .await?
989            .0)
990        } else {
991            Ok(client
992                .post(&format!("{}/api/v1/transactions", url[0]))
993                .json(body)
994                .send()
995                .await?)
996        }
997    }
998
999    /// Sends multiple transactions as a bundle, with lazy serialization.
1000    pub async fn send_bundle_lazy_fn<F, T, S>(&self, callback: F) -> anyhow::Result<Response>
1001    where
1002        F: AsyncFnOnce(&Vec<String>, &Client) -> anyhow::Result<T>,
1003        T: IntoIterator<Item = S>,
1004        S: Serialize,
1005    {
1006        let (ref url, ref client) = *self.inner.lb.alloc().await;
1007
1008        let data = serialize_tx_vec(callback(url, client).await?)?;
1009
1010        let body = &json!({
1011            "id": 1,
1012            "jsonrpc": "2.0",
1013            "method": "sendBundle",
1014            "params": [ data, { "encoding": "base64" } ]
1015        });
1016
1017        if url.len() > 1 {
1018            Ok(select_ok(url.iter().map(|v| {
1019                Box::pin(async move {
1020                    let response = client
1021                        .post(&format!("{}/api/v1/bundles", v))
1022                        .json(body)
1023                        .send()
1024                        .await?;
1025
1026                    if let Some(v) = self.inner.broadcast_status {
1027                        if response.status() == v {
1028                            Ok(response)
1029                        } else {
1030                            Err(anyhow!(
1031                                "Status code mismatch: expected {}, found {}",
1032                                v,
1033                                response.status()
1034                            ))
1035                        }
1036                    } else {
1037                        Ok(response)
1038                    }
1039                })
1040            }))
1041            .await?
1042            .0)
1043        } else {
1044            Ok(client
1045                .post(&format!("{}/api/v1/bundles", url[0]))
1046                .json(body)
1047                .send()
1048                .await?)
1049        }
1050    }
1051
1052    /// Sends a bundle and returns its bundle ID from the JSON response, with lazy serialization.
1053    pub async fn send_bundle_bid_lazy_fn<F, T, S>(&self, callback: F) -> anyhow::Result<String>
1054    where
1055        F: AsyncFnOnce(&Vec<String>, &Client) -> anyhow::Result<T>,
1056        T: IntoIterator<Item = S>,
1057        S: Serialize,
1058    {
1059        self.send_bundle_lazy_fn(callback)
1060            .await?
1061            .error_for_status()?
1062            .json::<serde_json::Value>()
1063            .await?["result"]
1064            .as_str()
1065            .map(|v| v.to_string())
1066            .ok_or_else(|| anyhow::anyhow!("missing bundle result"))
1067    }
1068
1069    /// Spawns a new asynchronous task that respects the internal semaphore.
1070    ///
1071    /// The task will only start executing once a permit is acquired from the semaphore.
1072    /// When the task completes, the permit is automatically released.
1073    pub async fn spawn<F>(&self, future: F) -> tokio::task::JoinHandle<F::Output>
1074    where
1075        F: Future + Send + 'static,
1076        F::Output: Send + 'static,
1077    {
1078        let permit = self.inner.semaphore.clone().acquire_owned().await.unwrap();
1079
1080        spawn(async move {
1081            let result = future.await;
1082
1083            drop(permit);
1084            
1085            result
1086        })
1087    }
1088}
1089
1090/// Represents Jito tip data.
1091#[derive(Debug, Clone, Deserialize)]
1092pub struct JitoTip {
1093    pub landed_tips_25th_percentile: f64,
1094    pub landed_tips_50th_percentile: f64,
1095    pub landed_tips_75th_percentile: f64,
1096    pub landed_tips_95th_percentile: f64,
1097    pub landed_tips_99th_percentile: f64,
1098    pub ema_landed_tips_50th_percentile: f64,
1099}
1100
1101/// Fetches the current Jito tip from the public API.
1102pub async fn get_jito_tip(client: Client) -> anyhow::Result<JitoTip> {
1103    Ok(client
1104        .get("https://bundles.jito.wtf/api/v1/bundles/tip_floor")
1105        .send()
1106        .await?
1107        .json::<Vec<JitoTip>>()
1108        .await?
1109        .get(0)
1110        .context("get_jito_tip: empty response")?
1111        .clone())
1112}
1113
1114/// Represents the result of querying bundle statuses.
1115#[derive(Debug, Deserialize)]
1116pub struct BundleResult {
1117    pub context: serde_json::Value,
1118    pub value: Option<Vec<BundleStatus>>,
1119}
1120
1121#[derive(Debug, Deserialize)]
1122pub struct BundleStatus {
1123    pub bundle_id: String,
1124    pub transactions: Option<Vec<String>>,
1125    pub slot: Option<u64>,
1126    pub confirmation_status: Option<String>,
1127    pub err: Option<serde_json::Value>,
1128}
1129
1130/// Fetches statuses of multiple bundles.
1131pub async fn get_bundle_statuses<T: IntoIterator<Item = impl AsRef<str>>>(
1132    client: Client,
1133    bundle: T,
1134) -> anyhow::Result<BundleResult> {
1135    #[derive(Debug, Deserialize)]
1136    struct RpcResponse {
1137        result: BundleResult,
1138    }
1139
1140    let payload = json!({
1141        "jsonrpc": "2.0",
1142        "id": 1,
1143        "method": "getBundleStatuses",
1144        "params": [bundle.into_iter().map(|v| v.as_ref().to_string()).collect::<Vec<_>>()],
1145    });
1146
1147    Ok(client
1148        .post("https://mainnet.block-engine.jito.wtf/api/v1/getBundleStatuses")
1149        .json(&payload)
1150        .send()
1151        .await?
1152        .json::<RpcResponse>()
1153        .await?
1154        .result)
1155}
1156
1157/// Represents in-flight bundle status.
1158#[derive(Debug, Deserialize)]
1159pub struct InflightBundleStatus {
1160    pub bundle_id: String,
1161    pub status: String,
1162    pub landed_slot: Option<u64>,
1163}
1164
1165#[derive(Debug, Deserialize)]
1166pub struct InflightBundleResult {
1167    pub context: serde_json::Value,
1168    pub value: Option<Vec<InflightBundleStatus>>,
1169}
1170
1171/// Fetches statuses of in-flight bundles.
1172pub async fn get_inflight_bundle_statuses<T: IntoIterator<Item = impl AsRef<str>>>(
1173    client: Client,
1174    bundle: T,
1175) -> anyhow::Result<InflightBundleResult> {
1176    #[derive(Debug, Deserialize)]
1177    struct InflightRpcResponse {
1178        result: InflightBundleResult,
1179    }
1180
1181    let payload = json!({
1182        "jsonrpc": "2.0",
1183        "id": 1,
1184        "method": "getInflightBundleStatuses",
1185        "params": [bundle.into_iter().map(|v| v.as_ref().to_string()).collect::<Vec<_>>()],
1186    });
1187
1188    Ok(client
1189        .post("https://mainnet.block-engine.jito.wtf/api/v1/getInflightBundleStatuses")
1190        .json(&payload)
1191        .send()
1192        .await?
1193        .json::<InflightRpcResponse>()
1194        .await?
1195        .result)
1196}
1197
1198/// Get all non-loopback IP addresses of the machine.
1199pub fn get_ip_list() -> anyhow::Result<Vec<IpAddr>> {
1200    Ok(get_if_addrs()?
1201        .into_iter()
1202        .filter(|v| !v.is_loopback())
1203        .map(|v| v.ip())
1204        .collect::<Vec<_>>())
1205}
1206
1207/// Get all non-loopback IPv4 addresses of the machine.
1208pub fn get_ipv4_list() -> anyhow::Result<Vec<IpAddr>> {
1209    Ok(get_if_addrs()?
1210        .into_iter()
1211        .filter(|v| !v.is_loopback() && v.ip().is_ipv4())
1212        .map(|v| v.ip())
1213        .collect::<Vec<_>>())
1214}
1215
1216/// Get all non-loopback IPv6 addresses of the machine.
1217pub fn get_ipv6_list() -> anyhow::Result<Vec<IpAddr>> {
1218    Ok(get_if_addrs()?
1219        .into_iter()
1220        .filter(|v| !v.is_loopback() && v.ip().is_ipv6())
1221        .map(|v| v.ip())
1222        .collect::<Vec<_>>())
1223}
1224
1225pub async fn test_ip(ip: IpAddr) -> anyhow::Result<IpAddr> {
1226    reqwest::ClientBuilder::new()
1227        .timeout(Duration::from_secs(3))
1228        .local_address(ip)
1229        .build()?
1230        .get("https://apple.com")
1231        .send()
1232        .await?;
1233
1234    Ok(ip)
1235}
1236
1237pub async fn test_all_ip() -> Vec<anyhow::Result<IpAddr>> {
1238    match get_ip_list() {
1239        Ok(v) => join_all(v.into_iter().map(|v| test_ip(v))).await,
1240        Err(_) => Vec::new(),
1241    }
1242}
1243
1244pub async fn test_all_ipv4() -> Vec<anyhow::Result<IpAddr>> {
1245    match get_ipv4_list() {
1246        Ok(v) => join_all(v.into_iter().map(|v| test_ip(v))).await,
1247        Err(_) => Vec::new(),
1248    }
1249}
1250
1251pub async fn test_all_ipv6() -> Vec<anyhow::Result<IpAddr>> {
1252    match get_ipv6_list() {
1253        Ok(v) => join_all(v.into_iter().map(|v| test_ip(v))).await,
1254        Err(_) => Vec::new(),
1255    }
1256}
1257
1258pub fn serialize_tx(tx: impl Serialize) -> anyhow::Result<String> {
1259    Ok(BASE64_STANDARD.encode(bincode::serialize(&tx)?))
1260}
1261
1262pub fn serialize_tx_vec<T: IntoIterator<Item = impl Serialize>>(
1263    tx: T,
1264) -> anyhow::Result<Vec<String>> {
1265    tx.into_iter()
1266        .map(|tx| {
1267            Ok(BASE64_STANDARD.encode(
1268                bincode::serialize(&tx)
1269                    .map_err(|v| anyhow::anyhow!("failed to serialize tx: {}", v))?,
1270            ))
1271        })
1272        .collect::<anyhow::Result<Vec<_>>>()
1273}