tycho_core/blockchain_rpc/
providers.rs1use std::num::NonZeroU64;
2use std::sync::Arc;
3
4use anyhow::Result;
5use async_trait::async_trait;
6use bytes::Bytes;
7use tycho_types::models::BlockId;
8
9use crate::proto::blockchain::ArchiveInfo;
10use crate::storage::{
11 ArchiveId, BlockStorage, CoreStorage, PersistentStateInfo, PersistentStateKind,
12};
13
14#[async_trait]
16pub trait RpcDataProvider: Send + Sync + 'static {
17 async fn get_archive_info(&self, mc_seqno: u32) -> Result<ArchiveInfo>;
24
25 async fn get_archive_chunk(&self, archive_id: u32, offset: u64) -> Result<Bytes>;
27
28 async fn get_persistent_state_info(
30 &self,
31 block_id: &BlockId,
32 kind: PersistentStateKind,
33 ) -> Result<Option<PersistentStateInfo>>;
34
35 async fn get_persistent_state_chunk(
37 &self,
38 block_id: &BlockId,
39 offset: u64,
40 kind: PersistentStateKind,
41 ) -> Result<Option<Bytes>>;
42}
43
44pub struct StorageRpcDataProvider {
47 storage: CoreStorage,
48}
49
50impl StorageRpcDataProvider {
51 pub fn new(storage: CoreStorage) -> Self {
52 Self { storage }
53 }
54}
55
56#[async_trait]
57impl RpcDataProvider for StorageRpcDataProvider {
58 async fn get_archive_info(&self, mc_seqno: u32) -> Result<ArchiveInfo> {
59 let node_state = self.storage.node_state();
60
61 match node_state.load_last_mc_block_id() {
62 Some(last_applied_mc_block) => {
63 if mc_seqno > last_applied_mc_block.seqno {
64 return Ok(ArchiveInfo::TooNew);
65 }
66
67 let block_storage = self.storage.block_storage();
68
69 let id = block_storage.get_archive_id(mc_seqno);
70 let size_res = match id {
71 ArchiveId::Found(id) => block_storage.get_archive_size(id),
72 ArchiveId::TooNew | ArchiveId::NotFound => Ok(None),
73 };
74
75 Ok(match (id, size_res) {
76 (ArchiveId::Found(id), Ok(Some(size))) if size > 0 => ArchiveInfo::Found {
77 id: id as u64,
78 size: NonZeroU64::new(size as _).unwrap(),
79 chunk_size: BlockStorage::DEFAULT_BLOB_CHUNK_SIZE,
80 },
81 (ArchiveId::Found(_) | ArchiveId::TooNew, Ok(None)) => ArchiveInfo::TooNew,
82 _ => ArchiveInfo::NotFound,
83 })
84 }
85 None => {
86 anyhow::bail!("get_archive_id failed: no blocks applied")
87 }
88 }
89 }
90
91 async fn get_archive_chunk(&self, archive_id: u32, offset: u64) -> Result<Bytes> {
92 self.storage
93 .block_storage()
94 .get_archive_chunk(archive_id, offset)
95 .await
96 }
97
98 async fn get_persistent_state_info(
99 &self,
100 block_id: &BlockId,
101 kind: PersistentStateKind,
102 ) -> Result<Option<PersistentStateInfo>> {
103 let persistent_state_storage = self.storage.persistent_state_storage();
104 Ok(persistent_state_storage.get_state_info(block_id, kind))
105 }
106
107 async fn get_persistent_state_chunk(
108 &self,
109 block_id: &BlockId,
110 offset: u64,
111 kind: PersistentStateKind,
112 ) -> Result<Option<Bytes>> {
113 let persistent_state_storage = self.storage.persistent_state_storage();
114 Ok(persistent_state_storage
115 .read_state_part(block_id, offset, kind)
116 .await
117 .map(Bytes::from))
118 }
119}
120
121#[cfg(feature = "s3")]
124pub use s3_impl::S3RpcDataProvider;
125
126#[cfg(feature = "s3")]
127mod s3_impl {
128 use std::num::NonZeroU32;
129
130 use governor::clock::DefaultClock;
131 use governor::state::{InMemoryState, NotKeyed};
132 use governor::{Quota, RateLimiter};
133 use object_store::ObjectStoreExt;
134
135 use super::*;
136 use crate::blockchain_rpc::S3ProxyConfig;
137 use crate::s3::S3Client;
138
139 type S3RateLimiter = RateLimiter<NotKeyed, InMemoryState, DefaultClock>;
140
141 pub struct S3RpcDataProvider {
142 client: S3Client,
143 storage: CoreStorage,
144 chunk_size: NonZeroU32,
145 rate_limiter: S3RateLimiter,
146 bandwidth_limiter: Option<S3RateLimiter>,
147 }
148
149 impl S3RpcDataProvider {
150 pub fn new(client: S3Client, storage: CoreStorage, config: &S3ProxyConfig) -> Self {
151 let chunk_size = client.chunk_size();
152 let rate_limiter = RateLimiter::direct(Quota::per_second(config.rate_limit));
153
154 let bandwidth_limiter =
155 NonZeroU32::new(config.bandwidth_limit.as_u64() as u32).map(|bytes_per_sec| {
156 let burst = bytes_per_sec.get().max(chunk_size.get());
157 RateLimiter::direct(
158 Quota::per_second(bytes_per_sec)
159 .allow_burst(NonZeroU32::new(burst).unwrap()),
160 )
161 });
162
163 Self {
164 client,
165 storage,
166 chunk_size,
167 rate_limiter,
168 bandwidth_limiter,
169 }
170 }
171 }
172
173 #[async_trait]
174 impl RpcDataProvider for S3RpcDataProvider {
175 async fn get_archive_info(&self, mc_seqno: u32) -> Result<ArchiveInfo> {
176 self.check_rate_limit()?;
177
178 let archive_id = self.storage.block_storage().estimate_archive_id(mc_seqno);
179
180 match self.client.get_archive_info(archive_id).await {
181 Ok(Some(info)) => Ok(ArchiveInfo::Found {
182 id: info.archive_id as u64,
183 size: info.size,
184 chunk_size: self.chunk_size,
185 }),
186 Ok(None) => Ok(ArchiveInfo::NotFound),
187 Err(e) => {
188 tracing::warn!(archive_id, "failed to get archive info from S3: {e:?}");
189 Ok(ArchiveInfo::NotFound)
190 }
191 }
192 }
193
194 async fn get_archive_chunk(&self, archive_id: u32, offset: u64) -> Result<Bytes> {
195 let chunk_size = self.chunk_size.get() as u64;
196
197 anyhow::ensure!(
198 offset.is_multiple_of(chunk_size),
199 "unaligned archive chunk offset"
200 );
201
202 self.check_rate_limit()?;
203 self.check_bandwidth_limit()?;
204
205 let path = self.client.make_archive_key(archive_id);
206 let client = self.client.client();
207
208 let range = std::ops::Range {
209 start: offset,
210 end: offset + chunk_size,
211 };
212
213 client.get_range(&path, range).await.map_err(Into::into)
214 }
215
216 async fn get_persistent_state_info(
217 &self,
218 block_id: &BlockId,
219 kind: PersistentStateKind,
220 ) -> Result<Option<PersistentStateInfo>> {
221 self.check_rate_limit()?;
222
223 Ok(self
224 .client
225 .get_persistent_state_info(block_id, kind)
226 .await?
227 .map(|info| PersistentStateInfo {
228 size: info.size,
229 chunk_size: self.chunk_size,
230 }))
231 }
232
233 async fn get_persistent_state_chunk(
234 &self,
235 block_id: &BlockId,
236 offset: u64,
237 kind: PersistentStateKind,
238 ) -> Result<Option<Bytes>> {
239 self.check_rate_limit()?;
240 self.check_bandwidth_limit()?;
241
242 let chunk_size = self.chunk_size.get() as u64;
243
244 if !offset.is_multiple_of(chunk_size) {
245 return Ok(None);
246 }
247
248 let path = self.client.make_state_key(block_id, kind);
249 let client = self.client.client();
250
251 let range = std::ops::Range {
252 start: offset,
253 end: offset + chunk_size,
254 };
255
256 match client.get_range(&path, range).await {
257 Ok(data) => Ok(Some(data)),
258 Err(object_store::Error::NotFound { .. }) => Ok(None),
259 Err(e) => Err(e.into()),
260 }
261 }
262 }
263
264 impl S3RpcDataProvider {
265 fn check_rate_limit(&self) -> Result<()> {
266 self.rate_limiter
267 .check()
268 .map_err(|_err| anyhow::anyhow!("S3 rate limit exceeded"))
269 }
270
271 fn check_bandwidth_limit(&self) -> Result<()> {
272 if let Some(limiter) = &self.bandwidth_limiter {
273 limiter
274 .check_n(self.chunk_size)
275 .expect("shouldn't happen since burst = bytes_per_sec.max(chunk_size")
276 .map_err(|err| anyhow::anyhow!("S3 bandwidth limit exceeded {err:?}"))?;
277 }
278 Ok(())
279 }
280 }
281}
282
283pub struct HybridRpcProvider<T1, T2> {
286 primary: T1,
287 fallback: T2,
288}
289
290impl<T1, T2> HybridRpcProvider<T1, T2> {
291 pub fn new(primary: T1, fallback: T2) -> Self {
292 Self { primary, fallback }
293 }
294}
295
296#[async_trait]
297impl<T1, T2> RpcDataProvider for HybridRpcProvider<T1, T2>
298where
299 T1: RpcDataProvider,
300 T2: RpcDataProvider,
301{
302 async fn get_archive_info(&self, mc_seqno: u32) -> Result<ArchiveInfo> {
303 match self.primary.get_archive_info(mc_seqno).await {
305 Ok(info @ ArchiveInfo::Found { .. }) => return Ok(info),
306 Ok(ArchiveInfo::TooNew) => {
307 return Ok(ArchiveInfo::TooNew);
308 }
309 Ok(ArchiveInfo::NotFound) => {}
310 Err(e) => {
311 tracing::warn!(mc_seqno, "primary archive provider error: {e:?}");
312 }
313 }
314
315 self.fallback.get_archive_info(mc_seqno).await
317 }
318
319 async fn get_archive_chunk(&self, archive_id: u32, offset: u64) -> Result<Bytes> {
320 match self.primary.get_archive_chunk(archive_id, offset).await {
322 Ok(chunk) => return Ok(chunk),
323 Err(e) => {
324 tracing::warn!(archive_id, offset, "primary archive provider error: {e:?}");
325 }
326 }
327
328 self.fallback.get_archive_chunk(archive_id, offset).await
330 }
331
332 async fn get_persistent_state_info(
333 &self,
334 block_id: &BlockId,
335 kind: PersistentStateKind,
336 ) -> Result<Option<PersistentStateInfo>> {
337 match self.primary.get_persistent_state_info(block_id, kind).await {
339 Ok(Some(info)) => return Ok(Some(info)),
340 Ok(None) => {}
341 Err(e) => {
342 tracing::warn!(?block_id, ?kind, "primary state provider error: {e:?}");
343 }
344 }
345
346 self.fallback
348 .get_persistent_state_info(block_id, kind)
349 .await
350 }
351
352 async fn get_persistent_state_chunk(
353 &self,
354 block_id: &BlockId,
355 offset: u64,
356 kind: PersistentStateKind,
357 ) -> Result<Option<Bytes>> {
358 match self
360 .primary
361 .get_persistent_state_chunk(block_id, offset, kind)
362 .await
363 {
364 Ok(Some(chunk)) => return Ok(Some(chunk)),
365 Ok(None) => {}
366 Err(e) => {
367 tracing::warn!(
368 ?block_id,
369 offset,
370 ?kind,
371 "primary state provider error: {e:?}"
372 );
373 }
374 }
375
376 self.fallback
378 .get_persistent_state_chunk(block_id, offset, kind)
379 .await
380 }
381}
382
383pub trait IntoRpcDataProvider {
386 fn into_data_provider(self) -> Arc<dyn RpcDataProvider>;
387}
388
389impl<T: RpcDataProvider> IntoRpcDataProvider for (T,) {
390 #[inline]
391 fn into_data_provider(self) -> Arc<dyn RpcDataProvider> {
392 Arc::new(self.0)
393 }
394}
395
396impl<T1: RpcDataProvider, T2: RpcDataProvider> IntoRpcDataProvider for (T1, Option<T2>) {
397 fn into_data_provider(self) -> Arc<dyn RpcDataProvider> {
398 let (primary, fallback) = self;
399 match fallback {
400 None => Arc::new(primary),
401 Some(fallback) => Arc::new(HybridRpcProvider::new(primary, fallback)),
402 }
403 }
404}
405
406impl IntoRpcDataProvider for CoreStorage {
407 #[inline]
408 fn into_data_provider(self) -> Arc<dyn RpcDataProvider> {
409 Arc::new(StorageRpcDataProvider::new(self))
410 }
411}