blob_indexer/clients/beacon/
mod.rs1use std::fmt::Debug;
2
3use anyhow::Context as AnyhowContext;
4use async_trait::async_trait;
5use backoff::ExponentialBackoff;
6
7use reqwest::{Client, Url};
8use reqwest_eventsource::EventSource;
9
10#[cfg(test)]
11use mockall::automock;
12use types::BlockHeader;
13
14use crate::{
15 clients::{
16 beacon::types::{BlockHeaderResponse, Spec, SpecResponse},
17 common::ClientResult,
18 },
19 json_get,
20};
21
22use self::types::{Blob, BlobsResponse, Block, BlockId, BlockResponse, Topic};
23
24pub mod types;
25
26#[derive(Debug, Clone)]
27pub struct BeaconClient {
28 base_url: Url,
29 client: Client,
30 exp_backoff: Option<ExponentialBackoff>,
31}
32
33pub struct Config {
34 pub base_url: String,
35 pub exp_backoff: Option<ExponentialBackoff>,
36}
37
38#[async_trait]
39#[cfg_attr(test, automock)]
40pub trait CommonBeaconClient: Send + Sync + Debug {
41 async fn get_block(&self, block_id: BlockId) -> ClientResult<Option<Block>>;
42 async fn get_block_header(&self, block_id: BlockId) -> ClientResult<Option<BlockHeader>>;
43 async fn get_blobs(&self, block_id: BlockId) -> ClientResult<Option<Vec<Blob>>>;
44 async fn get_spec(&self) -> ClientResult<Option<Spec>>;
45 fn subscribe_to_events(&self, topics: &[Topic]) -> ClientResult<EventSource>;
46}
47
48impl BeaconClient {
49 pub fn try_with_client(client: Client, config: Config) -> ClientResult<Self> {
50 let base_url = Url::parse(&format!("{}/eth/", config.base_url))
51 .with_context(|| "Failed to parse base URL")?;
52 let exp_backoff = config.exp_backoff;
53
54 Ok(Self {
55 base_url,
56 client,
57 exp_backoff,
58 })
59 }
60}
61
62#[async_trait]
63impl CommonBeaconClient for BeaconClient {
64 async fn get_block(&self, block_id: BlockId) -> ClientResult<Option<Block>> {
65 let path = format!("v2/beacon/blocks/{}", { block_id.to_detailed_string() });
66 let url = self.base_url.join(path.as_str())?;
67
68 json_get!(&self.client, url, BlockResponse, self.exp_backoff.clone())
69 .map(|res| res.map(|r| r.into()))
70 }
71
72 async fn get_block_header(&self, block_id: BlockId) -> ClientResult<Option<BlockHeader>> {
73 let path = format!("v1/beacon/headers/{}", { block_id.to_detailed_string() });
74 let url = self.base_url.join(path.as_str())?;
75
76 json_get!(
77 &self.client,
78 url,
79 BlockHeaderResponse,
80 self.exp_backoff.clone()
81 )
82 .map(|res| res.map(|r| r.into()))
83 }
84
85 async fn get_blobs(&self, block_id: BlockId) -> ClientResult<Option<Vec<Blob>>> {
86 let path = format!("v1/beacon/blob_sidecars/{}", {
87 block_id.to_detailed_string()
88 });
89 let url = self.base_url.join(path.as_str())?;
90
91 json_get!(&self.client, url, BlobsResponse, self.exp_backoff.clone()).map(|res| match res {
92 Some(r) => Some(r.data),
93 None => None,
94 })
95 }
96
97 async fn get_spec(&self) -> ClientResult<Option<Spec>> {
98 let url = self.base_url.join("v1/config/spec")?;
99
100 json_get!(&self.client, url, SpecResponse, self.exp_backoff.clone()).map(|res| match res {
101 Some(r) => Some(r.data),
102 None => None,
103 })
104 }
105
106 fn subscribe_to_events(&self, topics: &[Topic]) -> ClientResult<EventSource> {
107 let topics = topics
108 .iter()
109 .map(|topic| topic.into())
110 .collect::<Vec<String>>()
111 .join(",");
112 let path = format!("v1/events?topics={topics}");
113 let url = self.base_url.join(&path)?;
114
115 Ok(EventSource::get(url))
116 }
117}