blob_indexer/clients/blobscan/
mod.rs

1use std::fmt::Debug;
2
3use alloy::primitives::B256;
4use async_trait::async_trait;
5use backoff::ExponentialBackoff;
6use chrono::TimeDelta;
7use reqwest::{Client, Url};
8
9#[cfg(test)]
10use mockall::automock;
11use types::{BlobscanBlock, ReorgedBlocksRequestBody};
12
13use crate::{clients::common::ClientResult, json_get, json_put};
14
15use self::{
16    jwt_manager::{Config as JWTManagerConfig, JWTManager},
17    types::{
18        Blob, Block, BlockchainSyncState, BlockchainSyncStateRequest, BlockchainSyncStateResponse,
19        IndexRequest, Transaction,
20    },
21};
22
23mod jwt_manager;
24
25pub mod types;
26
27#[async_trait]
28#[cfg_attr(test, automock)]
29pub trait CommonBlobscanClient: Send + Sync + Debug {
30    fn try_with_client(client: Client, config: Config) -> ClientResult<Self>
31    where
32        Self: Sized;
33    async fn index(
34        &self,
35        block: Block,
36        transactions: Vec<Transaction>,
37        blobs: Vec<Blob>,
38    ) -> ClientResult<()>;
39    async fn get_block(&self, slot: u32) -> ClientResult<Option<BlobscanBlock>>;
40    async fn handle_reorg(
41        &self,
42        rewinded_blocks: Vec<B256>,
43        forwarded_blocks: Vec<B256>,
44    ) -> ClientResult<()>;
45    async fn update_sync_state(&self, sync_state: BlockchainSyncState) -> ClientResult<()>;
46    async fn get_sync_state(&self) -> ClientResult<Option<BlockchainSyncState>>;
47}
48
49#[derive(Debug, Clone)]
50pub struct BlobscanClient {
51    base_url: Url,
52    client: reqwest::Client,
53    jwt_manager: JWTManager,
54    exp_backoff: Option<ExponentialBackoff>,
55}
56
57pub struct Config {
58    pub base_url: String,
59    pub secret_key: String,
60    pub exp_backoff: Option<ExponentialBackoff>,
61}
62
63#[async_trait]
64
65impl CommonBlobscanClient for BlobscanClient {
66    fn try_with_client(client: Client, config: Config) -> ClientResult<Self> {
67        let base_url = Url::parse(&format!("{}/", config.base_url))?;
68        let jwt_manager = JWTManager::new(JWTManagerConfig {
69            secret_key: config.secret_key,
70            refresh_interval: TimeDelta::try_hours(1).unwrap(),
71            safety_magin: None,
72        });
73        let exp_backoff = config.exp_backoff;
74
75        Ok(Self {
76            base_url,
77            client,
78            jwt_manager,
79            exp_backoff,
80        })
81    }
82
83    async fn index(
84        &self,
85        block: Block,
86        transactions: Vec<Transaction>,
87        blobs: Vec<Blob>,
88    ) -> ClientResult<()> {
89        let url = self.base_url.join("indexer/block-txs-blobs")?;
90        let token = self.jwt_manager.get_token()?;
91        let req = IndexRequest {
92            block,
93            transactions,
94            blobs,
95        };
96
97        json_put!(&self.client, url, token, &req).map(|_: Option<()>| ())
98    }
99
100    async fn get_block(&self, slot: u32) -> ClientResult<Option<BlobscanBlock>> {
101        let url = self.base_url.join(&format!("slots/{}", slot))?;
102
103        json_get!(&self.client, url, BlobscanBlock, self.exp_backoff.clone())
104    }
105
106    async fn handle_reorg(
107        &self,
108        rewinded_blocks: Vec<B256>,
109        forwarded_blocks: Vec<B256>,
110    ) -> ClientResult<()> {
111        let url = self.base_url.join("indexer/reorged-blocks")?;
112        let token = self.jwt_manager.get_token()?;
113
114        let req = ReorgedBlocksRequestBody {
115            forwarded_blocks,
116            rewinded_blocks,
117        };
118
119        json_put!(&self.client, url, ReorgedBlocksRequestBody, token, &req).map(|_| ())
120    }
121
122    async fn update_sync_state(&self, sync_state: BlockchainSyncState) -> ClientResult<()> {
123        let url = self.base_url.join("blockchain-sync-state")?;
124        let token = self.jwt_manager.get_token()?;
125        let req: BlockchainSyncStateRequest = sync_state.into();
126
127        json_put!(&self.client, url, token, &req).map(|_: Option<()>| ())
128    }
129
130    async fn get_sync_state(&self) -> ClientResult<Option<BlockchainSyncState>> {
131        let url = self.base_url.join("blockchain-sync-state")?;
132        json_get!(
133            &self.client,
134            url,
135            BlockchainSyncStateResponse,
136            self.exp_backoff.clone()
137        )
138        .map(|res: Option<BlockchainSyncStateResponse>| Some(res.unwrap().into()))
139    }
140}