Skip to main content

tycho_core/blockchain_rpc/
providers.rs

1use std::num::NonZeroU64;
2use std::sync::Arc;
3
4use anyhow::Result;
5use async_trait::async_trait;
6use bytes::Bytes;
7use tycho_types::models::BlockId;
8
9use crate::proto::blockchain::ArchiveInfo;
10use crate::storage::{
11    ArchiveId, BlockStorage, CoreStorage, PersistentStateInfo, PersistentStateKind,
12};
13
14/// Abstraction for rpc data providers (storage, S3, etc.)
15#[async_trait]
16pub trait RpcDataProvider: Send + Sync + 'static {
17    /// Get archive info for the given masterchain seqno.
18    ///
19    /// Returns:
20    /// - `ArchiveInfo::Found` if archive exists and is ready
21    /// - `ArchiveInfo::TooNew` if the archive is not yet available
22    /// - `ArchiveInfo::NotFound` if the archive doesn't exist
23    async fn get_archive_info(&self, mc_seqno: u32) -> Result<ArchiveInfo>;
24
25    /// Get a chunk of archive data at the given offset.
26    async fn get_archive_chunk(&self, archive_id: u32, offset: u64) -> Result<Bytes>;
27
28    /// Get persistent state info for the given block.
29    async fn get_persistent_state_info(
30        &self,
31        block_id: &BlockId,
32        kind: PersistentStateKind,
33    ) -> Result<Option<PersistentStateInfo>>;
34
35    /// Get a chunk of persistent state data at the given offset.
36    async fn get_persistent_state_chunk(
37        &self,
38        block_id: &BlockId,
39        offset: u64,
40        kind: PersistentStateKind,
41    ) -> Result<Option<Bytes>>;
42}
43
44// === Storage Implementation ===
45
46pub struct StorageRpcDataProvider {
47    storage: CoreStorage,
48}
49
50impl StorageRpcDataProvider {
51    pub fn new(storage: CoreStorage) -> Self {
52        Self { storage }
53    }
54}
55
56#[async_trait]
57impl RpcDataProvider for StorageRpcDataProvider {
58    async fn get_archive_info(&self, mc_seqno: u32) -> Result<ArchiveInfo> {
59        let node_state = self.storage.node_state();
60
61        match node_state.load_last_mc_block_id() {
62            Some(last_applied_mc_block) => {
63                if mc_seqno > last_applied_mc_block.seqno {
64                    return Ok(ArchiveInfo::TooNew);
65                }
66
67                let block_storage = self.storage.block_storage();
68
69                let id = block_storage.get_archive_id(mc_seqno);
70                let size_res = match id {
71                    ArchiveId::Found(id) => block_storage.get_archive_size(id),
72                    ArchiveId::TooNew | ArchiveId::NotFound => Ok(None),
73                };
74
75                Ok(match (id, size_res) {
76                    (ArchiveId::Found(id), Ok(Some(size))) if size > 0 => ArchiveInfo::Found {
77                        id: id as u64,
78                        size: NonZeroU64::new(size as _).unwrap(),
79                        chunk_size: BlockStorage::DEFAULT_BLOB_CHUNK_SIZE,
80                    },
81                    (ArchiveId::Found(_) | ArchiveId::TooNew, Ok(None)) => ArchiveInfo::TooNew,
82                    _ => ArchiveInfo::NotFound,
83                })
84            }
85            None => {
86                anyhow::bail!("get_archive_id failed: no blocks applied")
87            }
88        }
89    }
90
91    async fn get_archive_chunk(&self, archive_id: u32, offset: u64) -> Result<Bytes> {
92        self.storage
93            .block_storage()
94            .get_archive_chunk(archive_id, offset)
95            .await
96    }
97
98    async fn get_persistent_state_info(
99        &self,
100        block_id: &BlockId,
101        kind: PersistentStateKind,
102    ) -> Result<Option<PersistentStateInfo>> {
103        let persistent_state_storage = self.storage.persistent_state_storage();
104        Ok(persistent_state_storage.get_state_info(block_id, kind))
105    }
106
107    async fn get_persistent_state_chunk(
108        &self,
109        block_id: &BlockId,
110        offset: u64,
111        kind: PersistentStateKind,
112    ) -> Result<Option<Bytes>> {
113        let persistent_state_storage = self.storage.persistent_state_storage();
114        Ok(persistent_state_storage
115            .read_state_part(block_id, offset, kind)
116            .await
117            .map(Bytes::from))
118    }
119}
120
121// === S3 Implementation ===
122
123#[cfg(feature = "s3")]
124pub use s3_impl::S3RpcDataProvider;
125
126#[cfg(feature = "s3")]
127mod s3_impl {
128    use std::num::NonZeroU32;
129
130    use governor::clock::DefaultClock;
131    use governor::state::{InMemoryState, NotKeyed};
132    use governor::{Quota, RateLimiter};
133    use object_store::ObjectStoreExt;
134
135    use super::*;
136    use crate::blockchain_rpc::S3ProxyConfig;
137    use crate::s3::S3Client;
138
139    type S3RateLimiter = RateLimiter<NotKeyed, InMemoryState, DefaultClock>;
140
141    pub struct S3RpcDataProvider {
142        client: S3Client,
143        storage: CoreStorage,
144        chunk_size: NonZeroU32,
145        rate_limiter: S3RateLimiter,
146        bandwidth_limiter: Option<S3RateLimiter>,
147    }
148
149    impl S3RpcDataProvider {
150        pub fn new(client: S3Client, storage: CoreStorage, config: &S3ProxyConfig) -> Self {
151            let chunk_size = client.chunk_size();
152            let rate_limiter = RateLimiter::direct(Quota::per_second(config.rate_limit));
153
154            let bandwidth_limiter =
155                NonZeroU32::new(config.bandwidth_limit.as_u64() as u32).map(|bytes_per_sec| {
156                    let burst = bytes_per_sec.get().max(chunk_size.get());
157                    RateLimiter::direct(
158                        Quota::per_second(bytes_per_sec)
159                            .allow_burst(NonZeroU32::new(burst).unwrap()),
160                    )
161                });
162
163            Self {
164                client,
165                storage,
166                chunk_size,
167                rate_limiter,
168                bandwidth_limiter,
169            }
170        }
171    }
172
173    #[async_trait]
174    impl RpcDataProvider for S3RpcDataProvider {
175        async fn get_archive_info(&self, mc_seqno: u32) -> Result<ArchiveInfo> {
176            self.check_rate_limit()?;
177
178            let archive_id = self.storage.block_storage().estimate_archive_id(mc_seqno);
179
180            match self.client.get_archive_info(archive_id).await {
181                Ok(Some(info)) => Ok(ArchiveInfo::Found {
182                    id: info.archive_id as u64,
183                    size: info.size,
184                    chunk_size: self.chunk_size,
185                }),
186                Ok(None) => Ok(ArchiveInfo::NotFound),
187                Err(e) => {
188                    tracing::warn!(archive_id, "failed to get archive info from S3: {e:?}");
189                    Ok(ArchiveInfo::NotFound)
190                }
191            }
192        }
193
194        async fn get_archive_chunk(&self, archive_id: u32, offset: u64) -> Result<Bytes> {
195            let chunk_size = self.chunk_size.get() as u64;
196
197            anyhow::ensure!(
198                offset.is_multiple_of(chunk_size),
199                "unaligned archive chunk offset"
200            );
201
202            self.check_rate_limit()?;
203            self.check_bandwidth_limit()?;
204
205            let path = self.client.make_archive_key(archive_id);
206            let client = self.client.client();
207
208            let range = std::ops::Range {
209                start: offset,
210                end: offset + chunk_size,
211            };
212
213            client.get_range(&path, range).await.map_err(Into::into)
214        }
215
216        async fn get_persistent_state_info(
217            &self,
218            block_id: &BlockId,
219            kind: PersistentStateKind,
220        ) -> Result<Option<PersistentStateInfo>> {
221            self.check_rate_limit()?;
222
223            Ok(self
224                .client
225                .get_persistent_state_info(block_id, kind)
226                .await?
227                .map(|info| PersistentStateInfo {
228                    size: info.size,
229                    chunk_size: self.chunk_size,
230                }))
231        }
232
233        async fn get_persistent_state_chunk(
234            &self,
235            block_id: &BlockId,
236            offset: u64,
237            kind: PersistentStateKind,
238        ) -> Result<Option<Bytes>> {
239            self.check_rate_limit()?;
240            self.check_bandwidth_limit()?;
241
242            let chunk_size = self.chunk_size.get() as u64;
243
244            if !offset.is_multiple_of(chunk_size) {
245                return Ok(None);
246            }
247
248            let path = self.client.make_state_key(block_id, kind);
249            let client = self.client.client();
250
251            let range = std::ops::Range {
252                start: offset,
253                end: offset + chunk_size,
254            };
255
256            match client.get_range(&path, range).await {
257                Ok(data) => Ok(Some(data)),
258                Err(object_store::Error::NotFound { .. }) => Ok(None),
259                Err(e) => Err(e.into()),
260            }
261        }
262    }
263
264    impl S3RpcDataProvider {
265        fn check_rate_limit(&self) -> Result<()> {
266            self.rate_limiter
267                .check()
268                .map_err(|_err| anyhow::anyhow!("S3 rate limit exceeded"))
269        }
270
271        fn check_bandwidth_limit(&self) -> Result<()> {
272            if let Some(limiter) = &self.bandwidth_limiter {
273                limiter
274                    .check_n(self.chunk_size)
275                    .expect("shouldn't happen since burst = bytes_per_sec.max(chunk_size")
276                    .map_err(|err| anyhow::anyhow!("S3 bandwidth limit exceeded {err:?}"))?;
277            }
278            Ok(())
279        }
280    }
281}
282
283// === Hybrid Implementation ===
284
285pub struct HybridRpcProvider<T1, T2> {
286    primary: T1,
287    fallback: T2,
288}
289
290impl<T1, T2> HybridRpcProvider<T1, T2> {
291    pub fn new(primary: T1, fallback: T2) -> Self {
292        Self { primary, fallback }
293    }
294}
295
296#[async_trait]
297impl<T1, T2> RpcDataProvider for HybridRpcProvider<T1, T2>
298where
299    T1: RpcDataProvider,
300    T2: RpcDataProvider,
301{
302    async fn get_archive_info(&self, mc_seqno: u32) -> Result<ArchiveInfo> {
303        // Try primary first
304        match self.primary.get_archive_info(mc_seqno).await {
305            Ok(info @ ArchiveInfo::Found { .. }) => return Ok(info),
306            Ok(ArchiveInfo::TooNew) => {
307                return Ok(ArchiveInfo::TooNew);
308            }
309            Ok(ArchiveInfo::NotFound) => {}
310            Err(e) => {
311                tracing::warn!(mc_seqno, "primary archive provider error: {e:?}");
312            }
313        }
314
315        // Fallback
316        self.fallback.get_archive_info(mc_seqno).await
317    }
318
319    async fn get_archive_chunk(&self, archive_id: u32, offset: u64) -> Result<Bytes> {
320        // Try primary first
321        match self.primary.get_archive_chunk(archive_id, offset).await {
322            Ok(chunk) => return Ok(chunk),
323            Err(e) => {
324                tracing::warn!(archive_id, offset, "primary archive provider error: {e:?}");
325            }
326        }
327
328        // Fallback
329        self.fallback.get_archive_chunk(archive_id, offset).await
330    }
331
332    async fn get_persistent_state_info(
333        &self,
334        block_id: &BlockId,
335        kind: PersistentStateKind,
336    ) -> Result<Option<PersistentStateInfo>> {
337        // Try primary first
338        match self.primary.get_persistent_state_info(block_id, kind).await {
339            Ok(Some(info)) => return Ok(Some(info)),
340            Ok(None) => {}
341            Err(e) => {
342                tracing::warn!(?block_id, ?kind, "primary state provider error: {e:?}");
343            }
344        }
345
346        // Fallback
347        self.fallback
348            .get_persistent_state_info(block_id, kind)
349            .await
350    }
351
352    async fn get_persistent_state_chunk(
353        &self,
354        block_id: &BlockId,
355        offset: u64,
356        kind: PersistentStateKind,
357    ) -> Result<Option<Bytes>> {
358        // Try primary first
359        match self
360            .primary
361            .get_persistent_state_chunk(block_id, offset, kind)
362            .await
363        {
364            Ok(Some(chunk)) => return Ok(Some(chunk)),
365            Ok(None) => {}
366            Err(e) => {
367                tracing::warn!(
368                    ?block_id,
369                    offset,
370                    ?kind,
371                    "primary state provider error: {e:?}"
372                );
373            }
374        }
375
376        // Fallback
377        self.fallback
378            .get_persistent_state_chunk(block_id, offset, kind)
379            .await
380    }
381}
382
383// === IntoRpcProvider trait ===
384
385pub trait IntoRpcDataProvider {
386    fn into_data_provider(self) -> Arc<dyn RpcDataProvider>;
387}
388
389impl<T: RpcDataProvider> IntoRpcDataProvider for (T,) {
390    #[inline]
391    fn into_data_provider(self) -> Arc<dyn RpcDataProvider> {
392        Arc::new(self.0)
393    }
394}
395
396impl<T1: RpcDataProvider, T2: RpcDataProvider> IntoRpcDataProvider for (T1, Option<T2>) {
397    fn into_data_provider(self) -> Arc<dyn RpcDataProvider> {
398        let (primary, fallback) = self;
399        match fallback {
400            None => Arc::new(primary),
401            Some(fallback) => Arc::new(HybridRpcProvider::new(primary, fallback)),
402        }
403    }
404}
405
406impl IntoRpcDataProvider for CoreStorage {
407    #[inline]
408    fn into_data_provider(self) -> Arc<dyn RpcDataProvider> {
409        Arc::new(StorageRpcDataProvider::new(self))
410    }
411}