tycho_core/blockchain_rpc/
service.rs

1use std::num::{NonZeroU32, NonZeroU64};
2use std::sync::Arc;
3
4use anyhow::Context;
5use bytes::{Buf, Bytes};
6use serde::{Deserialize, Serialize};
7use tycho_block_util::message::validate_external_message;
8use tycho_network::{Response, Service, ServiceRequest, try_handle_prefix};
9use tycho_types::models::BlockId;
10use tycho_util::futures::BoxFutureOrNoop;
11use tycho_util::metrics::HistogramGuard;
12
13use crate::blockchain_rpc::broadcast_listener::{BroadcastListener, NoopBroadcastListener};
14use crate::blockchain_rpc::{BAD_REQUEST_ERROR_CODE, INTERNAL_ERROR_CODE, NOT_FOUND_ERROR_CODE};
15use crate::proto::blockchain::*;
16use crate::proto::overlay;
17use crate::storage::{
18    ArchiveId, BlockConnection, BlockStorage, CoreStorage, KeyBlocksDirection, PersistentStateKind,
19};
20
21const RPC_METHOD_TIMINGS_METRIC: &str = "tycho_blockchain_rpc_method_time";
22
23#[cfg(not(test))]
24const BLOCK_DATA_CHUNK_SIZE: u32 = 1024 * 1024; // 1 MB
25#[cfg(test)]
26const BLOCK_DATA_CHUNK_SIZE: u32 = 10; // 10 bytes so we have zillions of chunks in tests
27
28#[derive(Debug, Clone, Serialize, Deserialize)]
29#[serde(default)]
30#[non_exhaustive]
31pub struct BlockchainRpcServiceConfig {
32    /// The maximum number of key blocks in the response.
33    ///
34    /// Default: 8.
35    pub max_key_blocks_list_len: usize,
36
37    /// Whether to serve persistent states.
38    ///
39    /// Default: yes.
40    pub serve_persistent_states: bool,
41}
42
43impl Default for BlockchainRpcServiceConfig {
44    fn default() -> Self {
45        Self {
46            max_key_blocks_list_len: 8,
47            serve_persistent_states: true,
48        }
49    }
50}
51
52pub struct BlockchainRpcServiceBuilder<MandatoryFields> {
53    config: BlockchainRpcServiceConfig,
54    mandatory_fields: MandatoryFields,
55}
56
57impl<B> BlockchainRpcServiceBuilder<(B, CoreStorage)>
58where
59    B: BroadcastListener,
60{
61    pub fn build(self) -> BlockchainRpcService<B> {
62        let (broadcast_listener, storage) = self.mandatory_fields;
63
64        BlockchainRpcService {
65            inner: Arc::new(Inner {
66                storage,
67                config: self.config,
68                broadcast_listener,
69            }),
70        }
71    }
72}
73
74impl<T1> BlockchainRpcServiceBuilder<(T1, ())> {
75    pub fn with_storage(
76        self,
77        storage: CoreStorage,
78    ) -> BlockchainRpcServiceBuilder<(T1, CoreStorage)> {
79        let (broadcast_listener, _) = self.mandatory_fields;
80
81        BlockchainRpcServiceBuilder {
82            config: self.config,
83            mandatory_fields: (broadcast_listener, storage),
84        }
85    }
86}
87
88impl<T2> BlockchainRpcServiceBuilder<((), T2)> {
89    pub fn with_broadcast_listener<T1>(
90        self,
91        broadcast_listener: T1,
92    ) -> BlockchainRpcServiceBuilder<(T1, T2)>
93    where
94        T1: BroadcastListener,
95    {
96        let (_, storage) = self.mandatory_fields;
97
98        BlockchainRpcServiceBuilder {
99            config: self.config,
100            mandatory_fields: (broadcast_listener, storage),
101        }
102    }
103
104    pub fn without_broadcast_listener(
105        self,
106    ) -> BlockchainRpcServiceBuilder<(NoopBroadcastListener, T2)> {
107        let (_, storage) = self.mandatory_fields;
108
109        BlockchainRpcServiceBuilder {
110            config: self.config,
111            mandatory_fields: (NoopBroadcastListener, storage),
112        }
113    }
114}
115
116impl<T1, T2> BlockchainRpcServiceBuilder<(T1, T2)> {
117    pub fn with_config(self, config: BlockchainRpcServiceConfig) -> Self {
118        Self { config, ..self }
119    }
120}
121
122#[repr(transparent)]
123pub struct BlockchainRpcService<B = NoopBroadcastListener> {
124    inner: Arc<Inner<B>>,
125}
126
127impl<B> Clone for BlockchainRpcService<B> {
128    #[inline]
129    fn clone(&self) -> Self {
130        Self {
131            inner: self.inner.clone(),
132        }
133    }
134}
135
136impl BlockchainRpcService<()> {
137    pub fn builder() -> BlockchainRpcServiceBuilder<((), ())> {
138        BlockchainRpcServiceBuilder {
139            config: Default::default(),
140            mandatory_fields: ((), ()),
141        }
142    }
143}
144
145macro_rules! match_request {
146     ($req_body:expr, $tag:expr, {
147        $(
148            #[meta($name:literal $(, $($args:tt)*)?)]
149            $([$raw:tt])? $ty:path as $pat:pat => $expr:expr
150        ),*$(,)?
151    }, $err:pat => $err_exr:expr) => {
152        '__match_req: {
153            let $err = match $tag {
154                $(<$ty>::TL_ID => match tl_proto::deserialize::<$ty>(&($req_body)) {
155                        Ok($pat) => break '__match_req match_request!(
156                            @expr
157                            { $name $($($args)*)? }
158                            { $($raw)? }
159                            $expr
160                        ),
161                        Err(e) => e,
162                })*
163                _ => tl_proto::TlError::UnknownConstructor,
164            };
165            $err_exr
166        }
167    };
168
169    // Raw expression (just use it as is).
170    (@expr { $name:literal $($args:tt)* } { $($raw:tt)+ } $expr:expr) => {
171        $expr
172    };
173    // Wrapped expression (adds debug log and metrics).
174    (@expr { $name:literal $($args:tt)* } { } $expr:expr) => {{
175        match_request!(@debug $name {} { $($args)* });
176
177        let __started_at = std::time::Instant::now();
178
179        BoxFutureOrNoop::future(async move {
180            scopeguard::defer! {
181                metrics::histogram!(
182                    RPC_METHOD_TIMINGS_METRIC,
183                    "method" => $name
184                ).record(__started_at.elapsed());
185            }
186
187            Some(Response::from_tl($expr))
188        })
189    }};
190
191    // Stuff to remove trailing comma from args.
192    (@debug $name:literal { } { $(,)? }) => {
193        match_request!(@debug_impl $name)
194    };
195    (@debug $name:literal { $($res:tt)+ } { $(,)? }) => {
196        match_request!(@debug_impl $($res)+, $name)
197    };
198    (@debug $name:literal { $($res:tt)* } { $t:tt $($args:tt)* }) => {
199        match_request!(@debug $name { $($res)* $t } { $($args)* })
200    };
201    (@debug_impl $($args:tt)*) => {
202        tracing::debug!($($args)*)
203    };
204}
205
206impl<B: BroadcastListener> Service<ServiceRequest> for BlockchainRpcService<B> {
207    type QueryResponse = Response;
208    type OnQueryFuture = BoxFutureOrNoop<Option<Self::QueryResponse>>;
209    type OnMessageFuture = BoxFutureOrNoop<()>;
210
211    #[tracing::instrument(level = "debug", name = "on_blockchain_service_query", skip_all)]
212    fn on_query(&self, req: ServiceRequest) -> Self::OnQueryFuture {
213        let (constructor, body) = match try_handle_prefix(&req) {
214            Ok(rest) => rest,
215            Err(e) => {
216                tracing::debug!("failed to deserialize query: {e}");
217                return BoxFutureOrNoop::Noop;
218            }
219        };
220
221        let inner = self.inner.clone();
222
223        // NOTE: update `constructor_to_string` after adding new methods
224        match_request!(body, constructor, {
225            #[meta("ping")]
226            [raw] overlay::Ping as _ => BoxFutureOrNoop::future(async {
227                Some(Response::from_tl(overlay::Pong))
228            }),
229
230            #[meta(
231                "getNextKeyBlockIds",
232                block_id = %req.block_id,
233                max_size = req.max_size,
234            )]
235            rpc::GetNextKeyBlockIds as req => inner.handle_get_next_key_block_ids(&req),
236
237            #[meta("getBlockFull", block_id = %req.block_id)]
238            rpc::GetBlockFull as req => inner.handle_get_block_full(&req).await,
239
240            #[meta("getNextBlockFull", prev_block_id = %req.prev_block_id)]
241            rpc::GetNextBlockFull as req => inner.handle_get_next_block_full(&req).await,
242
243            #[meta(
244                "getBlockDataChunk",
245                block_id = %req.block_id,
246                offset = %req.offset,
247            )]
248            rpc::GetBlockDataChunk as req => inner.handle_get_block_data_chunk(&req).await,
249
250            #[meta("getKeyBlockProof", block_id = %req.block_id)]
251            rpc::GetKeyBlockProof as req => inner.handle_get_key_block_proof(&req).await,
252
253            #[meta("getPersistentShardStateInfo", block_id = %req.block_id)]
254            rpc::GetPersistentShardStateInfo as req => inner.handle_get_persistent_state_info(&req),
255
256            #[meta("getPersistentQueueStateInfo", block_id = %req.block_id)]
257            rpc::GetPersistentQueueStateInfo as req => inner.handle_get_queue_persistent_state_info(&req),
258
259            #[meta(
260                "getPersistentShardStateChunk",
261                block_id = %req.block_id,
262                offset = %req.offset,
263            )]
264            rpc::GetPersistentShardStateChunk as req => {
265                inner.handle_get_persistent_shard_state_chunk(&req).await
266            },
267
268            #[meta(
269                "getPersistentQueueStateChunk",
270                block_id = %req.block_id,
271                offset = %req.offset,
272            )]
273            rpc::GetPersistentQueueStateChunk as req => {
274                inner.handle_get_persistent_queue_state_chunk(&req).await
275            },
276
277            #[meta("getArchiveInfo", mc_seqno = %req.mc_seqno)]
278            rpc::GetArchiveInfo as req => inner.handle_get_archive_info(&req).await,
279
280            #[meta(
281                "getArchiveChunk",
282                archive_id = %req.archive_id,
283                offset = %req.offset,
284            )]
285            rpc::GetArchiveChunk as req => inner.handle_get_archive_chunk(&req).await,
286        }, e => {
287            tracing::debug!("failed to deserialize query: {e}");
288            BoxFutureOrNoop::Noop
289        })
290    }
291
292    #[tracing::instrument(level = "debug", name = "on_blockchain_service_message", skip_all)]
293    fn on_message(&self, mut req: ServiceRequest) -> Self::OnMessageFuture {
294        use tl_proto::{BytesMeta, TlRead};
295
296        // TODO: Do nothing if `B` is `NoopBroadcastListener` via `castaway` ?
297
298        // Require message body to contain at least two constructors.
299        if req.body.len() < 8 {
300            return BoxFutureOrNoop::Noop;
301        }
302
303        // Skip broadcast prefix
304        if req.body.get_u32_le() != overlay::BroadcastPrefix::TL_ID {
305            return BoxFutureOrNoop::Noop;
306        }
307
308        // Read (CONSUME) the next constructor.
309        match req.body.get_u32_le() {
310            MessageBroadcastRef::TL_ID => {
311                match BytesMeta::read_from(&mut req.body.as_ref()) {
312                    // NOTE: `len` is 24bit integer
313                    Ok(meta) if req.body.len() == meta.prefix_len + meta.len + meta.padding => {
314                        req.body.advance(meta.prefix_len);
315                        req.body.truncate(meta.len);
316                    }
317                    Ok(_) => {
318                        tracing::debug!("malformed external message broadcast");
319                        return BoxFutureOrNoop::Noop;
320                    }
321                    Err(e) => {
322                        tracing::debug!("failed to deserialize external message broadcast: {e:?}");
323                        return BoxFutureOrNoop::Noop;
324                    }
325                }
326
327                let inner = self.inner.clone();
328                metrics::counter!("tycho_rpc_broadcast_external_message_rx_bytes_total")
329                    .increment(req.body.len() as u64);
330                BoxFutureOrNoop::future(async move {
331                    if let Err(e) = validate_external_message(&req.body).await {
332                        tracing::debug!("invalid external message: {e:?}");
333                        return;
334                    }
335
336                    inner
337                        .broadcast_listener
338                        .handle_message(req.metadata, req.body)
339                        .await;
340                })
341            }
342            constructor => {
343                tracing::debug!("unknown broadcast constructor: {constructor:08x}");
344                BoxFutureOrNoop::Noop
345            }
346        }
347    }
348}
349
350struct Inner<B> {
351    storage: CoreStorage,
352    config: BlockchainRpcServiceConfig,
353    broadcast_listener: B,
354}
355
356impl<B> Inner<B> {
357    fn storage(&self) -> &CoreStorage {
358        &self.storage
359    }
360
361    fn handle_get_next_key_block_ids(
362        &self,
363        req: &rpc::GetNextKeyBlockIds,
364    ) -> overlay::Response<KeyBlockIds> {
365        let block_handle_storage = self.storage().block_handle_storage();
366
367        let limit = std::cmp::min(req.max_size as usize, self.config.max_key_blocks_list_len);
368
369        let get_next_key_block_ids = || {
370            if !req.block_id.shard.is_masterchain() {
371                anyhow::bail!("first block id is not from masterchain");
372            }
373
374            let mut iterator = block_handle_storage
375                .key_blocks_iterator(KeyBlocksDirection::ForwardFrom(req.block_id.seqno))
376                .take(limit + 1);
377
378            if let Some(id) = iterator.next() {
379                anyhow::ensure!(
380                    id.root_hash == req.block_id.root_hash,
381                    "first block root hash mismatch"
382                );
383                anyhow::ensure!(
384                    id.file_hash == req.block_id.file_hash,
385                    "first block file hash mismatch"
386                );
387            }
388
389            Ok::<_, anyhow::Error>(iterator.take(limit).collect::<Vec<_>>())
390        };
391
392        match get_next_key_block_ids() {
393            Ok(ids) => {
394                let incomplete = ids.len() < limit;
395                overlay::Response::Ok(KeyBlockIds {
396                    block_ids: ids,
397                    incomplete,
398                })
399            }
400            Err(e) => {
401                tracing::warn!("get_next_key_block_ids failed: {e:?}");
402                overlay::Response::Err(INTERNAL_ERROR_CODE)
403            }
404        }
405    }
406
407    async fn handle_get_block_full(&self, req: &rpc::GetBlockFull) -> overlay::Response<BlockFull> {
408        match self.get_block_full(&req.block_id).await {
409            Ok(block_full) => overlay::Response::Ok(block_full),
410            Err(e) => {
411                tracing::warn!("get_block_full failed: {e:?}");
412                overlay::Response::Err(INTERNAL_ERROR_CODE)
413            }
414        }
415    }
416
417    async fn handle_get_next_block_full(
418        &self,
419        req: &rpc::GetNextBlockFull,
420    ) -> overlay::Response<BlockFull> {
421        let block_handle_storage = self.storage().block_handle_storage();
422        let block_connection_storage = self.storage().block_connection_storage();
423
424        let get_next_block_full = async {
425            let next_block_id = match block_handle_storage.load_handle(&req.prev_block_id) {
426                Some(handle) if handle.has_next1() => block_connection_storage
427                    .load_connection(&req.prev_block_id, BlockConnection::Next1)
428                    .context("connection not found")?,
429                _ => return Ok(BlockFull::NotFound),
430            };
431
432            self.get_block_full(&next_block_id).await
433        };
434
435        match get_next_block_full.await {
436            Ok(block_full) => overlay::Response::Ok(block_full),
437            Err(e) => {
438                tracing::warn!("get_next_block_full failed: {e:?}");
439                overlay::Response::Err(INTERNAL_ERROR_CODE)
440            }
441        }
442    }
443
444    async fn handle_get_block_data_chunk(
445        &self,
446        req: &rpc::GetBlockDataChunk,
447    ) -> overlay::Response<Data> {
448        let block_handle_storage = self.storage().block_handle_storage();
449        let block_storage = self.storage().block_storage();
450
451        let handle = match block_handle_storage.load_handle(&req.block_id) {
452            Some(handle) if handle.has_data() => handle,
453            _ => {
454                tracing::debug!("block data not found for chunked read");
455                return overlay::Response::Err(NOT_FOUND_ERROR_CODE);
456            }
457        };
458
459        let offset = req.offset as u64;
460        match block_storage
461            .load_block_data_range(&handle, offset, BLOCK_DATA_CHUNK_SIZE as u64)
462            .await
463        {
464            Ok(Some(data)) => overlay::Response::Ok(Data { data }),
465            Ok(None) => {
466                tracing::debug!("block data chunk not found at offset {}", offset);
467                overlay::Response::Err(NOT_FOUND_ERROR_CODE)
468            }
469            Err(e) => {
470                tracing::warn!("get_block_data_chunk failed: {e:?}");
471                overlay::Response::Err(INTERNAL_ERROR_CODE)
472            }
473        }
474    }
475
476    async fn handle_get_key_block_proof(
477        &self,
478        req: &rpc::GetKeyBlockProof,
479    ) -> overlay::Response<KeyBlockProof> {
480        let block_handle_storage = self.storage().block_handle_storage();
481        let block_storage = self.storage().block_storage();
482
483        let get_key_block_proof = async {
484            match block_handle_storage.load_handle(&req.block_id) {
485                Some(handle) if handle.has_proof() => {
486                    let data = block_storage.load_block_proof_raw(&handle).await?;
487                    Ok::<_, anyhow::Error>(KeyBlockProof::Found {
488                        proof: Bytes::from_owner(data),
489                    })
490                }
491                _ => Ok(KeyBlockProof::NotFound),
492            }
493        };
494
495        match get_key_block_proof.await {
496            Ok(key_block_proof) => overlay::Response::Ok(key_block_proof),
497            Err(e) => {
498                tracing::warn!("get_key_block_proof failed: {e:?}");
499                overlay::Response::Err(INTERNAL_ERROR_CODE)
500            }
501        }
502    }
503
504    async fn handle_get_archive_info(
505        &self,
506        req: &rpc::GetArchiveInfo,
507    ) -> overlay::Response<ArchiveInfo> {
508        let mc_seqno = req.mc_seqno;
509        let node_state = self.storage.node_state();
510
511        match node_state.load_last_mc_block_id() {
512            Some(last_applied_mc_block) => {
513                if mc_seqno > last_applied_mc_block.seqno {
514                    return overlay::Response::Ok(ArchiveInfo::TooNew);
515                }
516
517                let block_storage = self.storage().block_storage();
518
519                let id = block_storage.get_archive_id(mc_seqno);
520                let size_res = match id {
521                    ArchiveId::Found(id) => block_storage.get_archive_size(id),
522                    ArchiveId::TooNew | ArchiveId::NotFound => Ok(None),
523                };
524
525                overlay::Response::Ok(match (id, size_res) {
526                    (ArchiveId::Found(id), Ok(Some(size))) if size > 0 => ArchiveInfo::Found {
527                        id: id as u64,
528                        size: NonZeroU64::new(size as _).unwrap(),
529                        chunk_size: BlockStorage::DEFAULT_BLOB_CHUNK_SIZE,
530                    },
531                    (ArchiveId::TooNew, Ok(None)) => ArchiveInfo::TooNew,
532                    _ => ArchiveInfo::NotFound,
533                })
534            }
535            None => {
536                tracing::warn!("get_archive_id failed: no blocks applied");
537                overlay::Response::Err(INTERNAL_ERROR_CODE)
538            }
539        }
540    }
541
542    async fn handle_get_archive_chunk(
543        &self,
544        req: &rpc::GetArchiveChunk,
545    ) -> overlay::Response<Data> {
546        let block_storage = self.storage.block_storage();
547
548        let get_archive_chunk = || async {
549            let archive_slice = block_storage
550                .get_archive_chunk(req.archive_id as u32, req.offset)
551                .await?;
552            Ok::<_, anyhow::Error>(archive_slice)
553        };
554
555        match get_archive_chunk().await {
556            Ok(data) => overlay::Response::Ok(Data { data }),
557            Err(e) => {
558                tracing::warn!("get_archive_chunk failed: {e:?}");
559                overlay::Response::Err(INTERNAL_ERROR_CODE)
560            }
561        }
562    }
563
564    fn handle_get_persistent_state_info(
565        &self,
566        req: &rpc::GetPersistentShardStateInfo,
567    ) -> overlay::Response<PersistentStateInfo> {
568        let label = [("method", "getPersistentShardStateInfo")];
569        let _hist = HistogramGuard::begin_with_labels(RPC_METHOD_TIMINGS_METRIC, &label);
570        let res = self.read_persistent_state_info(&req.block_id, PersistentStateKind::Shard);
571        overlay::Response::Ok(res)
572    }
573
574    fn handle_get_queue_persistent_state_info(
575        &self,
576        req: &rpc::GetPersistentQueueStateInfo,
577    ) -> overlay::Response<PersistentStateInfo> {
578        let res = self.read_persistent_state_info(&req.block_id, PersistentStateKind::Queue);
579        overlay::Response::Ok(res)
580    }
581
582    async fn handle_get_persistent_shard_state_chunk(
583        &self,
584        req: &rpc::GetPersistentShardStateChunk,
585    ) -> overlay::Response<Data> {
586        self.read_persistent_state_chunk(&req.block_id, req.offset, PersistentStateKind::Shard)
587            .await
588    }
589
590    async fn handle_get_persistent_queue_state_chunk(
591        &self,
592        req: &rpc::GetPersistentQueueStateChunk,
593    ) -> overlay::Response<Data> {
594        self.read_persistent_state_chunk(&req.block_id, req.offset, PersistentStateKind::Queue)
595            .await
596    }
597}
598
599impl<B> Inner<B> {
600    async fn get_block_full(&self, block_id: &BlockId) -> anyhow::Result<BlockFull> {
601        let block_handle_storage = self.storage().block_handle_storage();
602        let block_storage = self.storage().block_storage();
603
604        let handle = match block_handle_storage.load_handle(block_id) {
605            Some(handle) if handle.has_all_block_parts() => handle,
606            _ => return Ok(BlockFull::NotFound),
607        };
608
609        // Get first chunk of compressed data
610        let data = match block_storage
611            .load_block_data_range(&handle, 0, BLOCK_DATA_CHUNK_SIZE as u64)
612            .await?
613        {
614            Some(data) => data,
615            None => return Ok(BlockFull::NotFound),
616        };
617
618        let data_size = if data.len() < BLOCK_DATA_CHUNK_SIZE as usize {
619            // Small block - entire data fits in one chunk
620            data.len() as u32
621        } else {
622            // Large block - need to get total size from storage
623            match block_storage.get_compressed_block_data_size(&handle)? {
624                Some(size) => size as u32,
625                None => return Ok(BlockFull::NotFound),
626            }
627        };
628
629        let block = BlockData {
630            data,
631            size: NonZeroU32::new(data_size).expect("shouldn't happen"),
632            chunk_size: NonZeroU32::new(BLOCK_DATA_CHUNK_SIZE).expect("shouldn't happen"),
633        };
634
635        let (proof, queue_diff) = tokio::join!(
636            block_storage.load_block_proof_raw(&handle),
637            block_storage.load_queue_diff_raw(&handle)
638        );
639
640        Ok(BlockFull::Found {
641            block_id: *block_id,
642            block,
643            proof: Bytes::from_owner(proof?),
644            queue_diff: Bytes::from_owner(queue_diff?),
645        })
646    }
647
648    fn read_persistent_state_info(
649        &self,
650        block_id: &BlockId,
651        state_kind: PersistentStateKind,
652    ) -> PersistentStateInfo {
653        let persistent_state_storage = self.storage().persistent_state_storage();
654        if self.config.serve_persistent_states
655            && let Some(info) = persistent_state_storage.get_state_info(block_id, state_kind)
656        {
657            return PersistentStateInfo::Found {
658                size: info.size,
659                chunk_size: info.chunk_size,
660            };
661        }
662        PersistentStateInfo::NotFound
663    }
664
665    async fn read_persistent_state_chunk(
666        &self,
667        block_id: &BlockId,
668        offset: u64,
669        state_kind: PersistentStateKind,
670    ) -> overlay::Response<Data> {
671        let persistent_state_storage = self.storage().persistent_state_storage();
672
673        let persistent_state_request_validation = || {
674            anyhow::ensure!(
675                self.config.serve_persistent_states,
676                "persistent states are disabled"
677            );
678            Ok::<_, anyhow::Error>(())
679        };
680
681        if let Err(e) = persistent_state_request_validation() {
682            tracing::debug!("persistent state request validation failed: {e:?}");
683            return overlay::Response::Err(BAD_REQUEST_ERROR_CODE);
684        }
685
686        match persistent_state_storage
687            .read_state_part(block_id, offset, state_kind)
688            .await
689        {
690            Some(data) => overlay::Response::Ok(Data { data: data.into() }),
691            None => {
692                tracing::debug!("failed to read persistent state part");
693                overlay::Response::Err(NOT_FOUND_ERROR_CODE)
694            }
695        }
696    }
697}