blob_indexer/clients/blobscan/
mod.rs1use std::fmt::Debug;
2
3use alloy::primitives::B256;
4use async_trait::async_trait;
5use backoff::ExponentialBackoff;
6use chrono::TimeDelta;
7use reqwest::{Client, Url};
8
9#[cfg(test)]
10use mockall::automock;
11use types::{BlobscanBlock, ReorgedBlocksRequestBody};
12
13use crate::{clients::common::ClientResult, json_get, json_put};
14
15use self::{
16 jwt_manager::{Config as JWTManagerConfig, JWTManager},
17 types::{
18 Blob, Block, BlockchainSyncState, BlockchainSyncStateRequest, BlockchainSyncStateResponse,
19 IndexRequest, Transaction,
20 },
21};
22
23mod jwt_manager;
24
25pub mod types;
26
27#[async_trait]
28#[cfg_attr(test, automock)]
29pub trait CommonBlobscanClient: Send + Sync + Debug {
30 fn try_with_client(client: Client, config: Config) -> ClientResult<Self>
31 where
32 Self: Sized;
33 async fn index(
34 &self,
35 block: Block,
36 transactions: Vec<Transaction>,
37 blobs: Vec<Blob>,
38 ) -> ClientResult<()>;
39 async fn get_block(&self, slot: u32) -> ClientResult<Option<BlobscanBlock>>;
40 async fn handle_reorg(
41 &self,
42 rewinded_blocks: Vec<B256>,
43 forwarded_blocks: Vec<B256>,
44 ) -> ClientResult<()>;
45 async fn update_sync_state(&self, sync_state: BlockchainSyncState) -> ClientResult<()>;
46 async fn get_sync_state(&self) -> ClientResult<Option<BlockchainSyncState>>;
47}
48
49#[derive(Debug, Clone)]
50pub struct BlobscanClient {
51 base_url: Url,
52 client: reqwest::Client,
53 jwt_manager: JWTManager,
54 exp_backoff: Option<ExponentialBackoff>,
55}
56
57pub struct Config {
58 pub base_url: String,
59 pub secret_key: String,
60 pub exp_backoff: Option<ExponentialBackoff>,
61}
62
63#[async_trait]
64
65impl CommonBlobscanClient for BlobscanClient {
66 fn try_with_client(client: Client, config: Config) -> ClientResult<Self> {
67 let base_url = Url::parse(&format!("{}/", config.base_url))?;
68 let jwt_manager = JWTManager::new(JWTManagerConfig {
69 secret_key: config.secret_key,
70 refresh_interval: TimeDelta::try_hours(1).unwrap(),
71 safety_magin: None,
72 });
73 let exp_backoff = config.exp_backoff;
74
75 Ok(Self {
76 base_url,
77 client,
78 jwt_manager,
79 exp_backoff,
80 })
81 }
82
83 async fn index(
84 &self,
85 block: Block,
86 transactions: Vec<Transaction>,
87 blobs: Vec<Blob>,
88 ) -> ClientResult<()> {
89 let url = self.base_url.join("indexer/block-txs-blobs")?;
90 let token = self.jwt_manager.get_token()?;
91 let req = IndexRequest {
92 block,
93 transactions,
94 blobs,
95 };
96
97 json_put!(&self.client, url, token, &req).map(|_: Option<()>| ())
98 }
99
100 async fn get_block(&self, slot: u32) -> ClientResult<Option<BlobscanBlock>> {
101 let url = self.base_url.join(&format!("slots/{}", slot))?;
102
103 json_get!(&self.client, url, BlobscanBlock, self.exp_backoff.clone())
104 }
105
106 async fn handle_reorg(
107 &self,
108 rewinded_blocks: Vec<B256>,
109 forwarded_blocks: Vec<B256>,
110 ) -> ClientResult<()> {
111 let url = self.base_url.join("indexer/reorged-blocks")?;
112 let token = self.jwt_manager.get_token()?;
113
114 let req = ReorgedBlocksRequestBody {
115 forwarded_blocks,
116 rewinded_blocks,
117 };
118
119 json_put!(&self.client, url, ReorgedBlocksRequestBody, token, &req).map(|_| ())
120 }
121
122 async fn update_sync_state(&self, sync_state: BlockchainSyncState) -> ClientResult<()> {
123 let url = self.base_url.join("blockchain-sync-state")?;
124 let token = self.jwt_manager.get_token()?;
125 let req: BlockchainSyncStateRequest = sync_state.into();
126
127 json_put!(&self.client, url, token, &req).map(|_: Option<()>| ())
128 }
129
130 async fn get_sync_state(&self) -> ClientResult<Option<BlockchainSyncState>> {
131 let url = self.base_url.join("blockchain-sync-state")?;
132 json_get!(
133 &self.client,
134 url,
135 BlockchainSyncStateResponse,
136 self.exp_backoff.clone()
137 )
138 .map(|res: Option<BlockchainSyncStateResponse>| Some(res.unwrap().into()))
139 }
140}