Skip to main content

tycho_core/blockchain_rpc/
service.rs

1use std::num::{NonZeroU32, NonZeroU64};
2use std::sync::Arc;
3
4use anyhow::Context;
5use bytes::{Buf, Bytes};
6use serde::{Deserialize, Serialize};
7use tycho_block_util::message::validate_external_message;
8use tycho_network::{Response, Service, ServiceRequest, try_handle_prefix};
9use tycho_types::models::BlockId;
10use tycho_util::futures::BoxFutureOrNoop;
11use tycho_util::metrics::HistogramGuard;
12
13use crate::blockchain_rpc::broadcast_listener::{BroadcastListener, NoopBroadcastListener};
14use crate::blockchain_rpc::{BAD_REQUEST_ERROR_CODE, INTERNAL_ERROR_CODE, NOT_FOUND_ERROR_CODE};
15use crate::proto::blockchain::*;
16use crate::proto::overlay;
17use crate::storage::{
18    ArchiveId, BlockConnection, BlockStorage, CoreStorage, KeyBlocksDirection, PersistentStateKind,
19};
20
21const RPC_METHOD_TIMINGS_METRIC: &str = "tycho_blockchain_rpc_method_time";
22
23#[cfg(not(test))]
24const BLOCK_DATA_CHUNK_SIZE: u32 = 1024 * 1024; // 1 MB
25#[cfg(test)]
26const BLOCK_DATA_CHUNK_SIZE: u32 = 10; // 10 bytes so we have zillions of chunks in tests
27
28#[derive(Debug, Clone, Serialize, Deserialize)]
29#[serde(default)]
30#[non_exhaustive]
31pub struct BlockchainRpcServiceConfig {
32    /// The maximum number of key blocks in the response.
33    ///
34    /// Default: 8.
35    pub max_key_blocks_list_len: usize,
36
37    /// Whether to serve persistent states.
38    ///
39    /// Default: yes.
40    pub serve_persistent_states: bool,
41}
42
43impl Default for BlockchainRpcServiceConfig {
44    fn default() -> Self {
45        Self {
46            max_key_blocks_list_len: 8,
47            serve_persistent_states: true,
48        }
49    }
50}
51
52pub struct BlockchainRpcServiceBuilder<MandatoryFields> {
53    config: BlockchainRpcServiceConfig,
54    mandatory_fields: MandatoryFields,
55}
56
57impl<B> BlockchainRpcServiceBuilder<(B, CoreStorage)>
58where
59    B: BroadcastListener,
60{
61    pub fn build(self) -> BlockchainRpcService<B> {
62        let (broadcast_listener, storage) = self.mandatory_fields;
63
64        BlockchainRpcService {
65            inner: Arc::new(Inner {
66                storage,
67                config: self.config,
68                broadcast_listener,
69            }),
70        }
71    }
72}
73
74impl<T1> BlockchainRpcServiceBuilder<(T1, ())> {
75    pub fn with_storage(
76        self,
77        storage: CoreStorage,
78    ) -> BlockchainRpcServiceBuilder<(T1, CoreStorage)> {
79        let (broadcast_listener, _) = self.mandatory_fields;
80
81        BlockchainRpcServiceBuilder {
82            config: self.config,
83            mandatory_fields: (broadcast_listener, storage),
84        }
85    }
86}
87
88impl<T2> BlockchainRpcServiceBuilder<((), T2)> {
89    pub fn with_broadcast_listener<T1>(
90        self,
91        broadcast_listener: T1,
92    ) -> BlockchainRpcServiceBuilder<(T1, T2)>
93    where
94        T1: BroadcastListener,
95    {
96        let (_, storage) = self.mandatory_fields;
97
98        BlockchainRpcServiceBuilder {
99            config: self.config,
100            mandatory_fields: (broadcast_listener, storage),
101        }
102    }
103
104    pub fn without_broadcast_listener(
105        self,
106    ) -> BlockchainRpcServiceBuilder<(NoopBroadcastListener, T2)> {
107        let (_, storage) = self.mandatory_fields;
108
109        BlockchainRpcServiceBuilder {
110            config: self.config,
111            mandatory_fields: (NoopBroadcastListener, storage),
112        }
113    }
114}
115
116impl<T1, T2> BlockchainRpcServiceBuilder<(T1, T2)> {
117    pub fn with_config(self, config: BlockchainRpcServiceConfig) -> Self {
118        Self { config, ..self }
119    }
120}
121
122#[repr(transparent)]
123pub struct BlockchainRpcService<B = NoopBroadcastListener> {
124    inner: Arc<Inner<B>>,
125}
126
127impl<B> Clone for BlockchainRpcService<B> {
128    #[inline]
129    fn clone(&self) -> Self {
130        Self {
131            inner: self.inner.clone(),
132        }
133    }
134}
135
136impl BlockchainRpcService<()> {
137    pub fn builder() -> BlockchainRpcServiceBuilder<((), ())> {
138        BlockchainRpcServiceBuilder {
139            config: Default::default(),
140            mandatory_fields: ((), ()),
141        }
142    }
143}
144
145macro_rules! match_request {
146     ($req_body:expr, $tag:expr, {
147        $(
148            #[meta($name:literal $(, $($args:tt)*)?)]
149            $([$raw:tt])? $ty:path as $pat:pat => $expr:expr
150        ),*$(,)?
151    }, $err:pat => $err_exr:expr) => {
152        '__match_req: {
153            let $err = match $tag {
154                $(<$ty>::TL_ID => match tl_proto::deserialize::<$ty>(&($req_body)) {
155                        Ok($pat) => break '__match_req match_request!(
156                            @expr
157                            { $name $($($args)*)? }
158                            { $($raw)? }
159                            $expr
160                        ),
161                        Err(e) => e,
162                })*
163                _ => tl_proto::TlError::UnknownConstructor,
164            };
165            $err_exr
166        }
167    };
168
169    // Raw expression (just use it as is).
170    (@expr { $name:literal $($args:tt)* } { $($raw:tt)+ } $expr:expr) => {
171        $expr
172    };
173    // Wrapped expression (adds debug log and metrics).
174    (@expr { $name:literal $($args:tt)* } { } $expr:expr) => {{
175        match_request!(@debug $name {} { $($args)* });
176
177        let __started_at = std::time::Instant::now();
178
179        BoxFutureOrNoop::future(async move {
180            scopeguard::defer! {
181                metrics::histogram!(
182                    RPC_METHOD_TIMINGS_METRIC,
183                    "method" => $name
184                ).record(__started_at.elapsed());
185            }
186
187            Some(Response::from_tl($expr))
188        })
189    }};
190
191    // Stuff to remove trailing comma from args.
192    (@debug $name:literal { } { $(,)? }) => {
193        match_request!(@debug_impl $name)
194    };
195    (@debug $name:literal { $($res:tt)+ } { $(,)? }) => {
196        match_request!(@debug_impl $($res)+, $name)
197    };
198    (@debug $name:literal { $($res:tt)* } { $t:tt $($args:tt)* }) => {
199        match_request!(@debug $name { $($res)* $t } { $($args)* })
200    };
201    (@debug_impl $($args:tt)*) => {
202        tracing::debug!($($args)*)
203    };
204}
205
206impl<B: BroadcastListener> Service<ServiceRequest> for BlockchainRpcService<B> {
207    type QueryResponse = Response;
208    type OnQueryFuture = BoxFutureOrNoop<Option<Self::QueryResponse>>;
209    type OnMessageFuture = BoxFutureOrNoop<()>;
210
211    #[tracing::instrument(level = "debug", name = "on_blockchain_service_query", skip_all)]
212    fn on_query(&self, req: ServiceRequest) -> Self::OnQueryFuture {
213        let (constructor, body) = match try_handle_prefix(&req) {
214            Ok(rest) => rest,
215            Err(e) => {
216                tracing::debug!("failed to deserialize query: {e}");
217                return BoxFutureOrNoop::Noop;
218            }
219        };
220
221        let inner = self.inner.clone();
222
223        // NOTE: update `constructor_to_string` after adding new methods
224        match_request!(body, constructor, {
225            #[meta("ping")]
226            [raw] overlay::Ping as _ => BoxFutureOrNoop::future(async {
227                Some(Response::from_tl(overlay::Pong))
228            }),
229
230            #[meta(
231                "getNextKeyBlockIds",
232                block_id = %req.block_id,
233                max_size = req.max_size,
234            )]
235            rpc::GetNextKeyBlockIds as req => inner.handle_get_next_key_block_ids(&req),
236
237            #[meta("getBlockFull", block_id = %req.block_id)]
238            rpc::GetBlockFull as req => inner.handle_get_block_full(&req).await,
239
240            #[meta("getNextBlockFull", prev_block_id = %req.prev_block_id)]
241            rpc::GetNextBlockFull as req => inner.handle_get_next_block_full(&req).await,
242
243            #[meta(
244                "getBlockDataChunk",
245                block_id = %req.block_id,
246                offset = %req.offset,
247            )]
248            rpc::GetBlockDataChunk as req => inner.handle_get_block_data_chunk(&req).await,
249
250            #[meta("getKeyBlockProof", block_id = %req.block_id)]
251            rpc::GetKeyBlockProof as req => inner.handle_get_key_block_proof(&req).await,
252
253            #[meta("getZerostateProof")]
254            rpc::GetZerostateProof as _ => inner.handle_get_zerostate_proof().await,
255
256            #[meta("getPersistentShardStateInfo", block_id = %req.block_id)]
257            rpc::GetPersistentShardStateInfo as req => inner.handle_get_persistent_state_info(&req),
258
259            #[meta("getPersistentQueueStateInfo", block_id = %req.block_id)]
260            rpc::GetPersistentQueueStateInfo as req => inner.handle_get_queue_persistent_state_info(&req),
261
262            #[meta(
263                "getPersistentShardStateChunk",
264                block_id = %req.block_id,
265                offset = %req.offset,
266            )]
267            rpc::GetPersistentShardStateChunk as req => {
268                inner.handle_get_persistent_shard_state_chunk(&req).await
269            },
270
271            #[meta(
272                "getPersistentQueueStateChunk",
273                block_id = %req.block_id,
274                offset = %req.offset,
275            )]
276            rpc::GetPersistentQueueStateChunk as req => {
277                inner.handle_get_persistent_queue_state_chunk(&req).await
278            },
279
280            #[meta("getArchiveInfo", mc_seqno = %req.mc_seqno)]
281            rpc::GetArchiveInfo as req => inner.handle_get_archive_info(&req).await,
282
283            #[meta(
284                "getArchiveChunk",
285                archive_id = %req.archive_id,
286                offset = %req.offset,
287            )]
288            rpc::GetArchiveChunk as req => inner.handle_get_archive_chunk(&req).await,
289        }, e => {
290            tracing::debug!("failed to deserialize query: {e}");
291            BoxFutureOrNoop::Noop
292        })
293    }
294
295    #[tracing::instrument(level = "debug", name = "on_blockchain_service_message", skip_all)]
296    fn on_message(&self, mut req: ServiceRequest) -> Self::OnMessageFuture {
297        use tl_proto::{BytesMeta, TlRead};
298
299        // TODO: Do nothing if `B` is `NoopBroadcastListener` via `castaway` ?
300
301        // Require message body to contain at least two constructors.
302        if req.body.len() < 8 {
303            return BoxFutureOrNoop::Noop;
304        }
305
306        // Skip broadcast prefix
307        if req.body.get_u32_le() != overlay::BroadcastPrefix::TL_ID {
308            return BoxFutureOrNoop::Noop;
309        }
310
311        // Read (CONSUME) the next constructor.
312        match req.body.get_u32_le() {
313            MessageBroadcastRef::TL_ID => {
314                match BytesMeta::read_from(&mut req.body.as_ref()) {
315                    // NOTE: `len` is 24bit integer
316                    Ok(meta) if req.body.len() == meta.prefix_len + meta.len + meta.padding => {
317                        req.body.advance(meta.prefix_len);
318                        req.body.truncate(meta.len);
319                    }
320                    Ok(_) => {
321                        tracing::debug!("malformed external message broadcast");
322                        return BoxFutureOrNoop::Noop;
323                    }
324                    Err(e) => {
325                        tracing::debug!("failed to deserialize external message broadcast: {e:?}");
326                        return BoxFutureOrNoop::Noop;
327                    }
328                }
329
330                let inner = self.inner.clone();
331                metrics::counter!("tycho_rpc_broadcast_external_message_rx_bytes_total")
332                    .increment(req.body.len() as u64);
333                BoxFutureOrNoop::future(async move {
334                    if let Err(e) = validate_external_message(&req.body).await {
335                        tracing::debug!("invalid external message: {e:?}");
336                        return;
337                    }
338
339                    inner
340                        .broadcast_listener
341                        .handle_message(req.metadata, req.body)
342                        .await;
343                })
344            }
345            constructor => {
346                tracing::debug!("unknown broadcast constructor: {constructor:08x}");
347                BoxFutureOrNoop::Noop
348            }
349        }
350    }
351}
352
353struct Inner<B> {
354    storage: CoreStorage,
355    config: BlockchainRpcServiceConfig,
356    broadcast_listener: B,
357}
358
359impl<B> Inner<B> {
360    fn storage(&self) -> &CoreStorage {
361        &self.storage
362    }
363
364    fn handle_get_next_key_block_ids(
365        &self,
366        req: &rpc::GetNextKeyBlockIds,
367    ) -> overlay::Response<KeyBlockIds> {
368        let block_handle_storage = self.storage().block_handle_storage();
369
370        let limit = std::cmp::min(req.max_size as usize, self.config.max_key_blocks_list_len);
371
372        let get_next_key_block_ids = || {
373            if !req.block_id.shard.is_masterchain() {
374                anyhow::bail!("first block id is not from masterchain");
375            }
376
377            let mut iterator = block_handle_storage
378                .key_blocks_iterator(KeyBlocksDirection::ForwardFrom(req.block_id.seqno))
379                .take(limit + 1);
380
381            if let Some(id) = iterator.next() {
382                anyhow::ensure!(
383                    id.root_hash == req.block_id.root_hash,
384                    "first block root hash mismatch"
385                );
386                anyhow::ensure!(
387                    id.file_hash == req.block_id.file_hash,
388                    "first block file hash mismatch"
389                );
390            }
391
392            Ok::<_, anyhow::Error>(iterator.take(limit).collect::<Vec<_>>())
393        };
394
395        match get_next_key_block_ids() {
396            Ok(ids) => {
397                let incomplete = ids.len() < limit;
398                overlay::Response::Ok(KeyBlockIds {
399                    block_ids: ids,
400                    incomplete,
401                })
402            }
403            Err(e) => {
404                tracing::warn!("get_next_key_block_ids failed: {e:?}");
405                overlay::Response::Err(INTERNAL_ERROR_CODE)
406            }
407        }
408    }
409
410    async fn handle_get_block_full(&self, req: &rpc::GetBlockFull) -> overlay::Response<BlockFull> {
411        match self.get_block_full(&req.block_id).await {
412            Ok(block_full) => overlay::Response::Ok(block_full),
413            Err(e) => {
414                tracing::warn!("get_block_full failed: {e:?}");
415                overlay::Response::Err(INTERNAL_ERROR_CODE)
416            }
417        }
418    }
419
420    async fn handle_get_next_block_full(
421        &self,
422        req: &rpc::GetNextBlockFull,
423    ) -> overlay::Response<BlockFull> {
424        let block_handle_storage = self.storage().block_handle_storage();
425        let block_connection_storage = self.storage().block_connection_storage();
426
427        let get_next_block_full = async {
428            let next_block_id = match block_handle_storage.load_handle(&req.prev_block_id) {
429                Some(handle) if handle.has_next1() => block_connection_storage
430                    .load_connection(&req.prev_block_id, BlockConnection::Next1)
431                    .context("connection not found")?,
432                _ => return Ok(BlockFull::NotFound),
433            };
434
435            self.get_block_full(&next_block_id).await
436        };
437
438        match get_next_block_full.await {
439            Ok(block_full) => overlay::Response::Ok(block_full),
440            Err(e) => {
441                tracing::warn!("get_next_block_full failed: {e:?}");
442                overlay::Response::Err(INTERNAL_ERROR_CODE)
443            }
444        }
445    }
446
447    async fn handle_get_block_data_chunk(
448        &self,
449        req: &rpc::GetBlockDataChunk,
450    ) -> overlay::Response<Data> {
451        let block_handle_storage = self.storage().block_handle_storage();
452        let block_storage = self.storage().block_storage();
453
454        let handle = match block_handle_storage.load_handle(&req.block_id) {
455            Some(handle) if handle.has_data() => handle,
456            _ => {
457                tracing::debug!("block data not found for chunked read");
458                return overlay::Response::Err(NOT_FOUND_ERROR_CODE);
459            }
460        };
461
462        let offset = req.offset as u64;
463        match block_storage
464            .load_block_data_range(&handle, offset, BLOCK_DATA_CHUNK_SIZE as u64)
465            .await
466        {
467            Ok(Some(data)) => overlay::Response::Ok(Data { data }),
468            Ok(None) => {
469                tracing::debug!("block data chunk not found at offset {}", offset);
470                overlay::Response::Err(NOT_FOUND_ERROR_CODE)
471            }
472            Err(e) => {
473                tracing::warn!("get_block_data_chunk failed: {e:?}");
474                overlay::Response::Err(INTERNAL_ERROR_CODE)
475            }
476        }
477    }
478
479    async fn handle_get_key_block_proof(
480        &self,
481        req: &rpc::GetKeyBlockProof,
482    ) -> overlay::Response<KeyBlockProof> {
483        let block_handle_storage = self.storage().block_handle_storage();
484        let block_storage = self.storage().block_storage();
485
486        let get_key_block_proof = async {
487            match block_handle_storage.load_handle(&req.block_id) {
488                Some(handle) if handle.has_proof() => {
489                    let data = block_storage.load_block_proof_raw(&handle).await?;
490                    Ok::<_, anyhow::Error>(KeyBlockProof::Found {
491                        proof: Bytes::from_owner(data),
492                    })
493                }
494                _ => Ok(KeyBlockProof::NotFound),
495            }
496        };
497
498        match get_key_block_proof.await {
499            Ok(key_block_proof) => overlay::Response::Ok(key_block_proof),
500            Err(e) => {
501                tracing::warn!("get_key_block_proof failed: {e:?}");
502                overlay::Response::Err(INTERNAL_ERROR_CODE)
503            }
504        }
505    }
506
507    async fn handle_get_zerostate_proof(&self) -> overlay::Response<ZerostateProof> {
508        let storage = self.storage().node_state();
509        let proof_opt = storage.load_zerostate_proof_bytes();
510        let result = match proof_opt {
511            Some(proof) => ZerostateProof::Found { proof },
512            None => ZerostateProof::NotFound,
513        };
514
515        overlay::Response::Ok(result)
516    }
517
518    async fn handle_get_archive_info(
519        &self,
520        req: &rpc::GetArchiveInfo,
521    ) -> overlay::Response<ArchiveInfo> {
522        let mc_seqno = req.mc_seqno;
523        let node_state = self.storage.node_state();
524
525        match node_state.load_last_mc_block_id() {
526            Some(last_applied_mc_block) => {
527                if mc_seqno > last_applied_mc_block.seqno {
528                    return overlay::Response::Ok(ArchiveInfo::TooNew);
529                }
530
531                let block_storage = self.storage().block_storage();
532
533                let id = block_storage.get_archive_id(mc_seqno);
534                let size_res = match id {
535                    ArchiveId::Found(id) => block_storage.get_archive_size(id),
536                    ArchiveId::TooNew | ArchiveId::NotFound => Ok(None),
537                };
538
539                overlay::Response::Ok(match (id, size_res) {
540                    (ArchiveId::Found(id), Ok(Some(size))) if size > 0 => ArchiveInfo::Found {
541                        id: id as u64,
542                        size: NonZeroU64::new(size as _).unwrap(),
543                        chunk_size: BlockStorage::DEFAULT_BLOB_CHUNK_SIZE,
544                    },
545                    (ArchiveId::Found(_) | ArchiveId::TooNew, Ok(None)) => ArchiveInfo::TooNew,
546                    _ => ArchiveInfo::NotFound,
547                })
548            }
549            None => {
550                tracing::warn!("get_archive_id failed: no blocks applied");
551                overlay::Response::Err(INTERNAL_ERROR_CODE)
552            }
553        }
554    }
555
556    async fn handle_get_archive_chunk(
557        &self,
558        req: &rpc::GetArchiveChunk,
559    ) -> overlay::Response<Data> {
560        let block_storage = self.storage.block_storage();
561
562        let get_archive_chunk = || async {
563            let archive_slice = block_storage
564                .get_archive_chunk(req.archive_id as u32, req.offset)
565                .await?;
566            Ok::<_, anyhow::Error>(archive_slice)
567        };
568
569        match get_archive_chunk().await {
570            Ok(data) => overlay::Response::Ok(Data { data }),
571            Err(e) => {
572                tracing::warn!("get_archive_chunk failed: {e:?}");
573                overlay::Response::Err(INTERNAL_ERROR_CODE)
574            }
575        }
576    }
577
578    fn handle_get_persistent_state_info(
579        &self,
580        req: &rpc::GetPersistentShardStateInfo,
581    ) -> overlay::Response<PersistentStateInfo> {
582        let label = [("method", "getPersistentShardStateInfo")];
583        let _hist = HistogramGuard::begin_with_labels(RPC_METHOD_TIMINGS_METRIC, &label);
584        let res = self.read_persistent_state_info(&req.block_id, PersistentStateKind::Shard);
585        overlay::Response::Ok(res)
586    }
587
588    fn handle_get_queue_persistent_state_info(
589        &self,
590        req: &rpc::GetPersistentQueueStateInfo,
591    ) -> overlay::Response<PersistentStateInfo> {
592        let res = self.read_persistent_state_info(&req.block_id, PersistentStateKind::Queue);
593        overlay::Response::Ok(res)
594    }
595
596    async fn handle_get_persistent_shard_state_chunk(
597        &self,
598        req: &rpc::GetPersistentShardStateChunk,
599    ) -> overlay::Response<Data> {
600        self.read_persistent_state_chunk(&req.block_id, req.offset, PersistentStateKind::Shard)
601            .await
602    }
603
604    async fn handle_get_persistent_queue_state_chunk(
605        &self,
606        req: &rpc::GetPersistentQueueStateChunk,
607    ) -> overlay::Response<Data> {
608        self.read_persistent_state_chunk(&req.block_id, req.offset, PersistentStateKind::Queue)
609            .await
610    }
611}
612
613impl<B> Inner<B> {
614    async fn get_block_full(&self, block_id: &BlockId) -> anyhow::Result<BlockFull> {
615        let block_handle_storage = self.storage().block_handle_storage();
616        let block_storage = self.storage().block_storage();
617
618        let handle = match block_handle_storage.load_handle(block_id) {
619            Some(handle) if handle.has_all_block_parts() => handle,
620            _ => return Ok(BlockFull::NotFound),
621        };
622
623        // Get first chunk of compressed data
624        let data = match block_storage
625            .load_block_data_range(&handle, 0, BLOCK_DATA_CHUNK_SIZE as u64)
626            .await?
627        {
628            Some(data) => data,
629            None => return Ok(BlockFull::NotFound),
630        };
631
632        let data_size = if data.len() < BLOCK_DATA_CHUNK_SIZE as usize {
633            // Small block - entire data fits in one chunk
634            data.len() as u32
635        } else {
636            // Large block - need to get total size from storage
637            match block_storage.get_compressed_block_data_size(&handle)? {
638                Some(size) => size as u32,
639                None => return Ok(BlockFull::NotFound),
640            }
641        };
642
643        let block = BlockData {
644            data,
645            size: NonZeroU32::new(data_size).expect("shouldn't happen"),
646            chunk_size: NonZeroU32::new(BLOCK_DATA_CHUNK_SIZE).expect("shouldn't happen"),
647        };
648
649        let (proof, queue_diff) = tokio::join!(
650            block_storage.load_block_proof_raw(&handle),
651            block_storage.load_queue_diff_raw(&handle)
652        );
653
654        Ok(BlockFull::Found {
655            block_id: *block_id,
656            block,
657            proof: Bytes::from_owner(proof?),
658            queue_diff: Bytes::from_owner(queue_diff?),
659        })
660    }
661
662    fn read_persistent_state_info(
663        &self,
664        block_id: &BlockId,
665        state_kind: PersistentStateKind,
666    ) -> PersistentStateInfo {
667        let persistent_state_storage = self.storage().persistent_state_storage();
668        if self.config.serve_persistent_states
669            && let Some(info) = persistent_state_storage.get_state_info(block_id, state_kind)
670        {
671            return PersistentStateInfo::Found {
672                size: info.size,
673                chunk_size: info.chunk_size,
674            };
675        }
676        PersistentStateInfo::NotFound
677    }
678
679    async fn read_persistent_state_chunk(
680        &self,
681        block_id: &BlockId,
682        offset: u64,
683        state_kind: PersistentStateKind,
684    ) -> overlay::Response<Data> {
685        let persistent_state_storage = self.storage().persistent_state_storage();
686
687        let persistent_state_request_validation = || {
688            anyhow::ensure!(
689                self.config.serve_persistent_states,
690                "persistent states are disabled"
691            );
692            Ok::<_, anyhow::Error>(())
693        };
694
695        if let Err(e) = persistent_state_request_validation() {
696            tracing::debug!("persistent state request validation failed: {e:?}");
697            return overlay::Response::Err(BAD_REQUEST_ERROR_CODE);
698        }
699
700        match persistent_state_storage
701            .read_state_part(block_id, offset, state_kind)
702            .await
703        {
704            Some(data) => overlay::Response::Ok(Data { data: data.into() }),
705            None => {
706                tracing::debug!("failed to read persistent state part");
707                overlay::Response::Err(NOT_FOUND_ERROR_CODE)
708            }
709        }
710    }
711}