blob_indexer/clients/beacon/
mod.rs

1use std::fmt::Debug;
2
3use anyhow::Context as AnyhowContext;
4use async_trait::async_trait;
5use backoff::ExponentialBackoff;
6
7use reqwest::{Client, Url};
8use reqwest_eventsource::EventSource;
9
10#[cfg(test)]
11use mockall::automock;
12use types::BlockHeader;
13
14use crate::{
15    clients::{
16        beacon::types::{BlockHeaderResponse, Spec, SpecResponse},
17        common::ClientResult,
18    },
19    json_get,
20};
21
22use self::types::{Blob, BlobsResponse, Block, BlockId, BlockResponse, Topic};
23
24pub mod types;
25
26#[derive(Debug, Clone)]
27pub struct BeaconClient {
28    base_url: Url,
29    client: Client,
30    exp_backoff: Option<ExponentialBackoff>,
31}
32
33pub struct Config {
34    pub base_url: String,
35    pub exp_backoff: Option<ExponentialBackoff>,
36}
37
38#[async_trait]
39#[cfg_attr(test, automock)]
40pub trait CommonBeaconClient: Send + Sync + Debug {
41    async fn get_block(&self, block_id: BlockId) -> ClientResult<Option<Block>>;
42    async fn get_block_header(&self, block_id: BlockId) -> ClientResult<Option<BlockHeader>>;
43    async fn get_blobs(&self, block_id: BlockId) -> ClientResult<Option<Vec<Blob>>>;
44    async fn get_spec(&self) -> ClientResult<Option<Spec>>;
45    fn subscribe_to_events(&self, topics: &[Topic]) -> ClientResult<EventSource>;
46}
47
48impl BeaconClient {
49    pub fn try_with_client(client: Client, config: Config) -> ClientResult<Self> {
50        let base_url = Url::parse(&format!("{}/eth/", config.base_url))
51            .with_context(|| "Failed to parse base URL")?;
52        let exp_backoff = config.exp_backoff;
53
54        Ok(Self {
55            base_url,
56            client,
57            exp_backoff,
58        })
59    }
60}
61
62#[async_trait]
63impl CommonBeaconClient for BeaconClient {
64    async fn get_block(&self, block_id: BlockId) -> ClientResult<Option<Block>> {
65        let path = format!("v2/beacon/blocks/{}", { block_id.to_detailed_string() });
66        let url = self.base_url.join(path.as_str())?;
67
68        json_get!(&self.client, url, BlockResponse, self.exp_backoff.clone())
69            .map(|res| res.map(|r| r.into()))
70    }
71
72    async fn get_block_header(&self, block_id: BlockId) -> ClientResult<Option<BlockHeader>> {
73        let path = format!("v1/beacon/headers/{}", { block_id.to_detailed_string() });
74        let url = self.base_url.join(path.as_str())?;
75
76        json_get!(
77            &self.client,
78            url,
79            BlockHeaderResponse,
80            self.exp_backoff.clone()
81        )
82        .map(|res| res.map(|r| r.into()))
83    }
84
85    async fn get_blobs(&self, block_id: BlockId) -> ClientResult<Option<Vec<Blob>>> {
86        let path = format!("v1/beacon/blob_sidecars/{}", {
87            block_id.to_detailed_string()
88        });
89        let url = self.base_url.join(path.as_str())?;
90
91        json_get!(&self.client, url, BlobsResponse, self.exp_backoff.clone()).map(|res| match res {
92            Some(r) => Some(r.data),
93            None => None,
94        })
95    }
96
97    async fn get_spec(&self) -> ClientResult<Option<Spec>> {
98        let url = self.base_url.join("v1/config/spec")?;
99
100        json_get!(&self.client, url, SpecResponse, self.exp_backoff.clone()).map(|res| match res {
101            Some(r) => Some(r.data),
102            None => None,
103        })
104    }
105
106    fn subscribe_to_events(&self, topics: &[Topic]) -> ClientResult<EventSource> {
107        let topics = topics
108            .iter()
109            .map(|topic| topic.into())
110            .collect::<Vec<String>>()
111            .join(",");
112        let path = format!("v1/events?topics={topics}");
113        let url = self.base_url.join(&path)?;
114
115        Ok(EventSource::get(url))
116    }
117}