Skip to main content

tycho_core/blockchain_rpc/
service.rs

1use std::num::NonZeroU32;
2use std::sync::Arc;
3
4use anyhow::Context;
5use bytes::{Buf, Bytes};
6#[cfg(feature = "s3")]
7use bytesize::ByteSize;
8use serde::{Deserialize, Serialize};
9use tycho_block_util::message::validate_external_message;
10use tycho_network::{Response, Service, ServiceRequest, try_handle_prefix};
11use tycho_types::models::BlockId;
12use tycho_util::futures::BoxFutureOrNoop;
13use tycho_util::metrics::HistogramGuard;
14
15use crate::blockchain_rpc::broadcast_listener::{BroadcastListener, NoopBroadcastListener};
16use crate::blockchain_rpc::providers::{IntoRpcDataProvider, RpcDataProvider};
17use crate::blockchain_rpc::{BAD_REQUEST_ERROR_CODE, INTERNAL_ERROR_CODE, NOT_FOUND_ERROR_CODE};
18use crate::proto::blockchain::*;
19use crate::proto::overlay;
20use crate::storage::{BlockConnection, CoreStorage, KeyBlocksDirection, PersistentStateKind};
21
22const RPC_METHOD_TIMINGS_METRIC: &str = "tycho_blockchain_rpc_method_time";
23
24#[cfg(not(test))]
25const BLOCK_DATA_CHUNK_SIZE: u32 = 1024 * 1024; // 1 MB
26#[cfg(test)]
27const BLOCK_DATA_CHUNK_SIZE: u32 = 10; // 10 bytes so we have zillions of chunks in tests
28
29#[cfg(feature = "s3")]
30#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct S3ProxyConfig {
32    /// Rate limit (requests per second)
33    ///
34    /// Default: 100.
35    pub rate_limit: NonZeroU32,
36
37    /// Bandwidth limit (bytes per second)
38    ///
39    /// Default: 100 MiB (0 - unlimited)
40    pub bandwidth_limit: ByteSize,
41}
42
43#[cfg(feature = "s3")]
44impl Default for S3ProxyConfig {
45    fn default() -> Self {
46        Self {
47            rate_limit: NonZeroU32::new(100).unwrap(),
48            bandwidth_limit: ByteSize::mib(100),
49        }
50    }
51}
52
53#[derive(Debug, Clone, Serialize, Deserialize)]
54#[serde(default)]
55#[non_exhaustive]
56pub struct BlockchainRpcServiceConfig {
57    /// The maximum number of key blocks in the response.
58    ///
59    /// Default: 8.
60    pub max_key_blocks_list_len: usize,
61
62    /// Whether to serve persistent states.
63    ///
64    /// Default: yes.
65    pub serve_persistent_states: bool,
66
67    /// S3 proxy configuration.
68    ///
69    /// Default: enabled.
70    #[cfg(feature = "s3")]
71    pub s3_proxy: Option<S3ProxyConfig>,
72}
73
74impl Default for BlockchainRpcServiceConfig {
75    fn default() -> Self {
76        Self {
77            max_key_blocks_list_len: 8,
78            serve_persistent_states: true,
79            #[cfg(feature = "s3")]
80            s3_proxy: Some(S3ProxyConfig::default()),
81        }
82    }
83}
84
85pub struct BlockchainRpcServiceBuilder<MandatoryFields> {
86    config: BlockchainRpcServiceConfig,
87    mandatory_fields: MandatoryFields,
88}
89
90impl<B> BlockchainRpcServiceBuilder<(B, CoreStorage, Arc<dyn RpcDataProvider>)>
91where
92    B: BroadcastListener,
93{
94    pub fn build(self) -> BlockchainRpcService<B> {
95        let (broadcast_listener, storage, rpc_data_provider) = self.mandatory_fields;
96
97        BlockchainRpcService {
98            inner: Arc::new(Inner {
99                storage,
100                rpc_data_provider,
101                config: self.config,
102                broadcast_listener,
103            }),
104        }
105    }
106}
107
108impl<B> BlockchainRpcServiceBuilder<(B, CoreStorage, ())>
109where
110    B: BroadcastListener,
111{
112    pub fn build(self) -> BlockchainRpcService<B> {
113        let (broadcast_listener, storage, _) = self.mandatory_fields;
114        let rpc_data_provider = storage.clone().into_data_provider();
115
116        BlockchainRpcService {
117            inner: Arc::new(Inner {
118                storage,
119                rpc_data_provider,
120                config: self.config,
121                broadcast_listener,
122            }),
123        }
124    }
125}
126
127impl<T1, T3> BlockchainRpcServiceBuilder<(T1, (), T3)> {
128    pub fn with_storage(
129        self,
130        storage: CoreStorage,
131    ) -> BlockchainRpcServiceBuilder<(T1, CoreStorage, T3)> {
132        let (broadcast_listener, _, archive_provider) = self.mandatory_fields;
133
134        BlockchainRpcServiceBuilder {
135            config: self.config,
136            mandatory_fields: (broadcast_listener, storage, archive_provider),
137        }
138    }
139}
140
141impl<T2, T3> BlockchainRpcServiceBuilder<((), T2, T3)> {
142    pub fn with_broadcast_listener<T1>(
143        self,
144        broadcast_listener: T1,
145    ) -> BlockchainRpcServiceBuilder<(T1, T2, T3)>
146    where
147        T1: BroadcastListener,
148    {
149        let (_, storage, archive_provider) = self.mandatory_fields;
150
151        BlockchainRpcServiceBuilder {
152            config: self.config,
153            mandatory_fields: (broadcast_listener, storage, archive_provider),
154        }
155    }
156
157    pub fn without_broadcast_listener(
158        self,
159    ) -> BlockchainRpcServiceBuilder<(NoopBroadcastListener, T2, T3)> {
160        let (_, storage, archive_provider) = self.mandatory_fields;
161
162        BlockchainRpcServiceBuilder {
163            config: self.config,
164            mandatory_fields: (NoopBroadcastListener, storage, archive_provider),
165        }
166    }
167}
168
169impl<T1, T2> BlockchainRpcServiceBuilder<(T1, T2, ())> {
170    pub fn with_data_provider<RP: IntoRpcDataProvider>(
171        self,
172        provider: RP,
173    ) -> BlockchainRpcServiceBuilder<(T1, T2, Arc<dyn RpcDataProvider>)> {
174        let (broadcast_listener, storage, _) = self.mandatory_fields;
175        let rpc_data_provider = provider.into_data_provider();
176
177        BlockchainRpcServiceBuilder {
178            config: self.config,
179            mandatory_fields: (broadcast_listener, storage, rpc_data_provider),
180        }
181    }
182}
183
184impl<T1, T2, T3> BlockchainRpcServiceBuilder<(T1, T2, T3)> {
185    pub fn with_config(self, config: BlockchainRpcServiceConfig) -> Self {
186        Self { config, ..self }
187    }
188}
189
190#[repr(transparent)]
191pub struct BlockchainRpcService<B = NoopBroadcastListener> {
192    inner: Arc<Inner<B>>,
193}
194
195impl<B> Clone for BlockchainRpcService<B> {
196    #[inline]
197    fn clone(&self) -> Self {
198        Self {
199            inner: self.inner.clone(),
200        }
201    }
202}
203
204impl BlockchainRpcService<()> {
205    pub fn builder() -> BlockchainRpcServiceBuilder<((), (), ())> {
206        BlockchainRpcServiceBuilder {
207            config: Default::default(),
208            mandatory_fields: ((), (), ()),
209        }
210    }
211}
212
213macro_rules! match_request {
214     ($req_body:expr, $tag:expr, {
215        $(
216            #[meta($name:literal $(, $($args:tt)*)?)]
217            $([$raw:tt])? $ty:path as $pat:pat => $expr:expr
218        ),*$(,)?
219    }, $err:pat => $err_exr:expr) => {
220        '__match_req: {
221            let $err = match $tag {
222                $(<$ty>::TL_ID => match tl_proto::deserialize::<$ty>(&($req_body)) {
223                        Ok($pat) => break '__match_req match_request!(
224                            @expr
225                            { $name $($($args)*)? }
226                            { $($raw)? }
227                            $expr
228                        ),
229                        Err(e) => e,
230                })*
231                _ => tl_proto::TlError::UnknownConstructor,
232            };
233            $err_exr
234        }
235    };
236
237    // Raw expression (just use it as is).
238    (@expr { $name:literal $($args:tt)* } { $($raw:tt)+ } $expr:expr) => {
239        $expr
240    };
241    // Wrapped expression (adds debug log and metrics).
242    (@expr { $name:literal $($args:tt)* } { } $expr:expr) => {{
243        match_request!(@debug $name {} { $($args)* });
244
245        let __started_at = std::time::Instant::now();
246
247        BoxFutureOrNoop::future(async move {
248            scopeguard::defer! {
249                metrics::histogram!(
250                    RPC_METHOD_TIMINGS_METRIC,
251                    "method" => $name
252                ).record(__started_at.elapsed());
253            }
254
255            Some(Response::from_tl($expr))
256        })
257    }};
258
259    // Stuff to remove trailing comma from args.
260    (@debug $name:literal { } { $(,)? }) => {
261        match_request!(@debug_impl $name)
262    };
263    (@debug $name:literal { $($res:tt)+ } { $(,)? }) => {
264        match_request!(@debug_impl $($res)+, $name)
265    };
266    (@debug $name:literal { $($res:tt)* } { $t:tt $($args:tt)* }) => {
267        match_request!(@debug $name { $($res)* $t } { $($args)* })
268    };
269    (@debug_impl $($args:tt)*) => {
270        tracing::debug!($($args)*)
271    };
272}
273
274impl<B: BroadcastListener> Service<ServiceRequest> for BlockchainRpcService<B> {
275    type QueryResponse = Response;
276    type OnQueryFuture = BoxFutureOrNoop<Option<Self::QueryResponse>>;
277    type OnMessageFuture = BoxFutureOrNoop<()>;
278
279    #[tracing::instrument(level = "debug", name = "on_blockchain_service_query", skip_all)]
280    fn on_query(&self, req: ServiceRequest) -> Self::OnQueryFuture {
281        let (constructor, body) = match try_handle_prefix(&req) {
282            Ok(rest) => rest,
283            Err(e) => {
284                tracing::debug!("failed to deserialize query: {e}");
285                return BoxFutureOrNoop::Noop;
286            }
287        };
288
289        let inner = self.inner.clone();
290
291        // NOTE: update `constructor_to_string` after adding new methods
292        match_request!(body, constructor, {
293            #[meta("ping")]
294            [raw] overlay::Ping as _ => BoxFutureOrNoop::future(async {
295                Some(Response::from_tl(overlay::Pong))
296            }),
297
298            #[meta(
299                "getNextKeyBlockIds",
300                block_id = %req.block_id,
301                max_size = req.max_size,
302            )]
303            rpc::GetNextKeyBlockIds as req => inner.handle_get_next_key_block_ids(&req),
304
305            #[meta("getBlockFull", block_id = %req.block_id)]
306            rpc::GetBlockFull as req => inner.handle_get_block_full(&req).await,
307
308            #[meta("getNextBlockFull", prev_block_id = %req.prev_block_id)]
309            rpc::GetNextBlockFull as req => inner.handle_get_next_block_full(&req).await,
310
311            #[meta(
312                "getBlockDataChunk",
313                block_id = %req.block_id,
314                offset = %req.offset,
315            )]
316            rpc::GetBlockDataChunk as req => inner.handle_get_block_data_chunk(&req).await,
317
318            #[meta("getKeyBlockProof", block_id = %req.block_id)]
319            rpc::GetKeyBlockProof as req => inner.handle_get_key_block_proof(&req).await,
320
321            #[meta("getZerostateProof")]
322            rpc::GetZerostateProof as _ => inner.handle_get_zerostate_proof().await,
323
324            #[meta("getPersistentShardStateInfo", block_id = %req.block_id)]
325            rpc::GetPersistentShardStateInfo as req => inner.handle_get_persistent_state_info(&req).await,
326
327            #[meta("getPersistentQueueStateInfo", block_id = %req.block_id)]
328            rpc::GetPersistentQueueStateInfo as req => inner.handle_get_queue_persistent_state_info(&req).await,
329
330            #[meta(
331                "getPersistentShardStateChunk",
332                block_id = %req.block_id,
333                offset = %req.offset,
334            )]
335            rpc::GetPersistentShardStateChunk as req => {
336                inner.handle_get_persistent_shard_state_chunk(&req).await
337            },
338
339            #[meta(
340                "getPersistentQueueStateChunk",
341                block_id = %req.block_id,
342                offset = %req.offset,
343            )]
344            rpc::GetPersistentQueueStateChunk as req => {
345                inner.handle_get_persistent_queue_state_chunk(&req).await
346            },
347
348            #[meta("getArchiveInfo", mc_seqno = %req.mc_seqno)]
349            rpc::GetArchiveInfo as req => inner.handle_get_archive_info(&req).await,
350
351            #[meta(
352                "getArchiveChunk",
353                archive_id = %req.archive_id,
354                offset = %req.offset,
355            )]
356            rpc::GetArchiveChunk as req => inner.handle_get_archive_chunk(&req).await,
357        }, e => {
358            tracing::debug!("failed to deserialize query: {e}");
359            BoxFutureOrNoop::Noop
360        })
361    }
362
363    #[tracing::instrument(level = "debug", name = "on_blockchain_service_message", skip_all)]
364    fn on_message(&self, mut req: ServiceRequest) -> Self::OnMessageFuture {
365        use tl_proto::{BytesMeta, TlRead};
366
367        // TODO: Do nothing if `B` is `NoopBroadcastListener` via `castaway` ?
368
369        // Require message body to contain at least two constructors.
370        if req.body.len() < 8 {
371            return BoxFutureOrNoop::Noop;
372        }
373
374        // Skip broadcast prefix
375        if req.body.get_u32_le() != overlay::BroadcastPrefix::TL_ID {
376            return BoxFutureOrNoop::Noop;
377        }
378
379        // Read (CONSUME) the next constructor.
380        match req.body.get_u32_le() {
381            MessageBroadcastRef::TL_ID => {
382                match BytesMeta::read_from(&mut req.body.as_ref()) {
383                    // NOTE: `len` is 24bit integer
384                    Ok(meta) if req.body.len() == meta.prefix_len + meta.len + meta.padding => {
385                        req.body.advance(meta.prefix_len);
386                        req.body.truncate(meta.len);
387                    }
388                    Ok(_) => {
389                        tracing::debug!("malformed external message broadcast");
390                        return BoxFutureOrNoop::Noop;
391                    }
392                    Err(e) => {
393                        tracing::debug!("failed to deserialize external message broadcast: {e:?}");
394                        return BoxFutureOrNoop::Noop;
395                    }
396                }
397
398                let inner = self.inner.clone();
399                metrics::counter!("tycho_rpc_broadcast_external_message_rx_bytes_total")
400                    .increment(req.body.len() as u64);
401                BoxFutureOrNoop::future(async move {
402                    if let Err(e) = validate_external_message(&req.body).await {
403                        tracing::debug!("invalid external message: {e:?}");
404                        return;
405                    }
406
407                    inner
408                        .broadcast_listener
409                        .handle_message(req.metadata, req.body)
410                        .await;
411                })
412            }
413            constructor => {
414                tracing::debug!("unknown broadcast constructor: {constructor:08x}");
415                BoxFutureOrNoop::Noop
416            }
417        }
418    }
419}
420
421struct Inner<B> {
422    storage: CoreStorage,
423    config: BlockchainRpcServiceConfig,
424    broadcast_listener: B,
425    rpc_data_provider: Arc<dyn RpcDataProvider>,
426}
427
428impl<B> Inner<B> {
429    fn storage(&self) -> &CoreStorage {
430        &self.storage
431    }
432
433    fn handle_get_next_key_block_ids(
434        &self,
435        req: &rpc::GetNextKeyBlockIds,
436    ) -> overlay::Response<KeyBlockIds> {
437        let block_handle_storage = self.storage().block_handle_storage();
438
439        let limit = std::cmp::min(req.max_size as usize, self.config.max_key_blocks_list_len);
440
441        let get_next_key_block_ids = || {
442            if !req.block_id.shard.is_masterchain() {
443                anyhow::bail!("first block id is not from masterchain");
444            }
445
446            let mut iterator = block_handle_storage
447                .key_blocks_iterator(KeyBlocksDirection::ForwardFrom(req.block_id.seqno))
448                .take(limit + 1);
449
450            if let Some(id) = iterator.next() {
451                anyhow::ensure!(
452                    id.root_hash == req.block_id.root_hash,
453                    "first block root hash mismatch"
454                );
455                anyhow::ensure!(
456                    id.file_hash == req.block_id.file_hash,
457                    "first block file hash mismatch"
458                );
459            }
460
461            Ok::<_, anyhow::Error>(iterator.take(limit).collect::<Vec<_>>())
462        };
463
464        match get_next_key_block_ids() {
465            Ok(ids) => {
466                let incomplete = ids.len() < limit;
467                overlay::Response::Ok(KeyBlockIds {
468                    block_ids: ids,
469                    incomplete,
470                })
471            }
472            Err(e) => {
473                tracing::warn!("get_next_key_block_ids failed: {e:?}");
474                overlay::Response::Err(INTERNAL_ERROR_CODE)
475            }
476        }
477    }
478
479    async fn handle_get_block_full(&self, req: &rpc::GetBlockFull) -> overlay::Response<BlockFull> {
480        match self.get_block_full(&req.block_id).await {
481            Ok(block_full) => overlay::Response::Ok(block_full),
482            Err(e) => {
483                tracing::warn!("get_block_full failed: {e:?}");
484                overlay::Response::Err(INTERNAL_ERROR_CODE)
485            }
486        }
487    }
488
489    async fn handle_get_next_block_full(
490        &self,
491        req: &rpc::GetNextBlockFull,
492    ) -> overlay::Response<BlockFull> {
493        let block_handle_storage = self.storage().block_handle_storage();
494        let block_connection_storage = self.storage().block_connection_storage();
495
496        let get_next_block_full = async {
497            let next_block_id = match block_handle_storage.load_handle(&req.prev_block_id) {
498                Some(handle) if handle.has_next1() => block_connection_storage
499                    .load_connection(&req.prev_block_id, BlockConnection::Next1)
500                    .context("connection not found")?,
501                _ => return Ok(BlockFull::NotFound),
502            };
503
504            self.get_block_full(&next_block_id).await
505        };
506
507        match get_next_block_full.await {
508            Ok(block_full) => overlay::Response::Ok(block_full),
509            Err(e) => {
510                tracing::warn!("get_next_block_full failed: {e:?}");
511                overlay::Response::Err(INTERNAL_ERROR_CODE)
512            }
513        }
514    }
515
516    async fn handle_get_block_data_chunk(
517        &self,
518        req: &rpc::GetBlockDataChunk,
519    ) -> overlay::Response<Data> {
520        let block_handle_storage = self.storage().block_handle_storage();
521        let block_storage = self.storage().block_storage();
522
523        let handle = match block_handle_storage.load_handle(&req.block_id) {
524            Some(handle) if handle.has_data() => handle,
525            _ => {
526                tracing::debug!("block data not found for chunked read");
527                return overlay::Response::Err(NOT_FOUND_ERROR_CODE);
528            }
529        };
530
531        let offset = req.offset as u64;
532        match block_storage
533            .load_block_data_range(&handle, offset, BLOCK_DATA_CHUNK_SIZE as u64)
534            .await
535        {
536            Ok(Some(data)) => overlay::Response::Ok(Data { data }),
537            Ok(None) => {
538                tracing::debug!("block data chunk not found at offset {}", offset);
539                overlay::Response::Err(NOT_FOUND_ERROR_CODE)
540            }
541            Err(e) => {
542                tracing::warn!("get_block_data_chunk failed: {e:?}");
543                overlay::Response::Err(INTERNAL_ERROR_CODE)
544            }
545        }
546    }
547
548    async fn handle_get_key_block_proof(
549        &self,
550        req: &rpc::GetKeyBlockProof,
551    ) -> overlay::Response<KeyBlockProof> {
552        let block_handle_storage = self.storage().block_handle_storage();
553        let block_storage = self.storage().block_storage();
554
555        let get_key_block_proof = async {
556            match block_handle_storage.load_handle(&req.block_id) {
557                Some(handle) if handle.has_proof() => {
558                    let data = block_storage.load_block_proof_raw(&handle).await?;
559                    Ok::<_, anyhow::Error>(KeyBlockProof::Found {
560                        proof: Bytes::from_owner(data),
561                    })
562                }
563                _ => Ok(KeyBlockProof::NotFound),
564            }
565        };
566
567        match get_key_block_proof.await {
568            Ok(key_block_proof) => overlay::Response::Ok(key_block_proof),
569            Err(e) => {
570                tracing::warn!("get_key_block_proof failed: {e:?}");
571                overlay::Response::Err(INTERNAL_ERROR_CODE)
572            }
573        }
574    }
575
576    async fn handle_get_zerostate_proof(&self) -> overlay::Response<ZerostateProof> {
577        let storage = self.storage().node_state();
578        let proof_opt = storage.load_zerostate_proof_bytes();
579        let result = match proof_opt {
580            Some(proof) => ZerostateProof::Found { proof },
581            None => ZerostateProof::NotFound,
582        };
583
584        overlay::Response::Ok(result)
585    }
586
587    async fn handle_get_archive_info(
588        &self,
589        req: &rpc::GetArchiveInfo,
590    ) -> overlay::Response<ArchiveInfo> {
591        match self.rpc_data_provider.get_archive_info(req.mc_seqno).await {
592            Ok(info) => overlay::Response::Ok(info),
593            Err(e) => {
594                tracing::warn!(mc_seqno = req.mc_seqno, "get_archive_info failed: {e:?}");
595                overlay::Response::Err(INTERNAL_ERROR_CODE)
596            }
597        }
598    }
599
600    async fn handle_get_archive_chunk(
601        &self,
602        req: &rpc::GetArchiveChunk,
603    ) -> overlay::Response<Data> {
604        match self
605            .rpc_data_provider
606            .get_archive_chunk(req.archive_id as u32, req.offset)
607            .await
608        {
609            Ok(data) => overlay::Response::Ok(Data { data }),
610            Err(e) => {
611                tracing::warn!(
612                    archive_id = req.archive_id,
613                    offset = req.offset,
614                    "get_archive_chunk failed: {e:?}"
615                );
616                overlay::Response::Err(INTERNAL_ERROR_CODE)
617            }
618        }
619    }
620
621    async fn handle_get_persistent_state_info(
622        &self,
623        req: &rpc::GetPersistentShardStateInfo,
624    ) -> overlay::Response<PersistentStateInfo> {
625        let label = [("method", "getPersistentShardStateInfo")];
626        let _hist = HistogramGuard::begin_with_labels(RPC_METHOD_TIMINGS_METRIC, &label);
627        match self
628            .read_persistent_state_info(&req.block_id, PersistentStateKind::Shard)
629            .await
630        {
631            Ok(info) => overlay::Response::Ok(info),
632            Err(e) => {
633                tracing::warn!(
634                    block_id = ?req.block_id,
635                    "get_persistent_state_info failed: {e:?}"
636                );
637                overlay::Response::Err(INTERNAL_ERROR_CODE)
638            }
639        }
640    }
641
642    async fn handle_get_queue_persistent_state_info(
643        &self,
644        req: &rpc::GetPersistentQueueStateInfo,
645    ) -> overlay::Response<PersistentStateInfo> {
646        match self
647            .read_persistent_state_info(&req.block_id, PersistentStateKind::Queue)
648            .await
649        {
650            Ok(info) => overlay::Response::Ok(info),
651            Err(e) => {
652                tracing::warn!(
653                    block_id = ?req.block_id,
654                    "get_persistent_state_info failed: {e:?}"
655                );
656                overlay::Response::Err(INTERNAL_ERROR_CODE)
657            }
658        }
659    }
660
661    async fn handle_get_persistent_shard_state_chunk(
662        &self,
663        req: &rpc::GetPersistentShardStateChunk,
664    ) -> overlay::Response<Data> {
665        self.read_persistent_state_chunk(&req.block_id, req.offset, PersistentStateKind::Shard)
666            .await
667    }
668
669    async fn handle_get_persistent_queue_state_chunk(
670        &self,
671        req: &rpc::GetPersistentQueueStateChunk,
672    ) -> overlay::Response<Data> {
673        self.read_persistent_state_chunk(&req.block_id, req.offset, PersistentStateKind::Queue)
674            .await
675    }
676}
677
678impl<B> Inner<B> {
679    async fn get_block_full(&self, block_id: &BlockId) -> anyhow::Result<BlockFull> {
680        let block_handle_storage = self.storage().block_handle_storage();
681        let block_storage = self.storage().block_storage();
682
683        let handle = match block_handle_storage.load_handle(block_id) {
684            Some(handle) if handle.has_all_block_parts() => handle,
685            _ => return Ok(BlockFull::NotFound),
686        };
687
688        // Get first chunk of compressed data
689        let data = match block_storage
690            .load_block_data_range(&handle, 0, BLOCK_DATA_CHUNK_SIZE as u64)
691            .await?
692        {
693            Some(data) => data,
694            None => return Ok(BlockFull::NotFound),
695        };
696
697        let data_size = if data.len() < BLOCK_DATA_CHUNK_SIZE as usize {
698            // Small block - entire data fits in one chunk
699            data.len() as u32
700        } else {
701            // Large block - need to get total size from storage
702            match block_storage.get_compressed_block_data_size(&handle)? {
703                Some(size) => size as u32,
704                None => return Ok(BlockFull::NotFound),
705            }
706        };
707
708        let block = BlockData {
709            data,
710            size: NonZeroU32::new(data_size).expect("shouldn't happen"),
711            chunk_size: NonZeroU32::new(BLOCK_DATA_CHUNK_SIZE).expect("shouldn't happen"),
712        };
713
714        let (proof, queue_diff) = tokio::join!(
715            block_storage.load_block_proof_raw(&handle),
716            block_storage.load_queue_diff_raw(&handle)
717        );
718
719        Ok(BlockFull::Found {
720            block_id: *block_id,
721            block,
722            proof: Bytes::from_owner(proof?),
723            queue_diff: Bytes::from_owner(queue_diff?),
724        })
725    }
726
727    async fn read_persistent_state_info(
728        &self,
729        block_id: &BlockId,
730        state_kind: PersistentStateKind,
731    ) -> anyhow::Result<PersistentStateInfo> {
732        if self.config.serve_persistent_states
733            && let Some(info) = self
734                .rpc_data_provider
735                .get_persistent_state_info(block_id, state_kind)
736                .await?
737        {
738            return Ok(PersistentStateInfo::Found {
739                size: info.size,
740                chunk_size: info.chunk_size,
741            });
742        }
743
744        Ok(PersistentStateInfo::NotFound)
745    }
746
747    async fn read_persistent_state_chunk(
748        &self,
749        block_id: &BlockId,
750        offset: u64,
751        state_kind: PersistentStateKind,
752    ) -> overlay::Response<Data> {
753        let persistent_state_request_validation = || {
754            anyhow::ensure!(
755                self.config.serve_persistent_states,
756                "persistent states are disabled"
757            );
758            Ok::<_, anyhow::Error>(())
759        };
760
761        if let Err(e) = persistent_state_request_validation() {
762            tracing::debug!("persistent state request validation failed: {e:?}");
763            return overlay::Response::Err(BAD_REQUEST_ERROR_CODE);
764        }
765
766        match self
767            .rpc_data_provider
768            .get_persistent_state_chunk(block_id, offset, state_kind)
769            .await
770        {
771            Ok(Some(data)) => overlay::Response::Ok(Data { data }),
772            Ok(None) => overlay::Response::Err(NOT_FOUND_ERROR_CODE),
773            Err(e) => {
774                tracing::warn!(
775                    ?block_id,
776                    offset,
777                    ?state_kind,
778                    "get_persistent_state_chunk failed: {e:?}"
779                );
780                overlay::Response::Err(INTERNAL_ERROR_CODE)
781            }
782        }
783    }
784}