tycho_core/blockchain_rpc/
service.rs

1use std::num::{NonZeroU32, NonZeroU64};
2use std::sync::Arc;
3
4use anyhow::Context;
5use bytes::{Buf, Bytes};
6use serde::{Deserialize, Serialize};
7use tycho_block_util::message::validate_external_message;
8use tycho_network::{Response, Service, ServiceRequest, try_handle_prefix};
9use tycho_types::models::BlockId;
10use tycho_util::futures::BoxFutureOrNoop;
11use tycho_util::metrics::HistogramGuard;
12
13use crate::blockchain_rpc::broadcast_listener::{BroadcastListener, NoopBroadcastListener};
14use crate::blockchain_rpc::{BAD_REQUEST_ERROR_CODE, INTERNAL_ERROR_CODE, NOT_FOUND_ERROR_CODE};
15use crate::proto::blockchain::*;
16use crate::proto::overlay;
17use crate::storage::{
18    ArchiveId, BlockConnection, CoreStorage, KeyBlocksDirection, PersistentStateKind,
19};
20
21const RPC_METHOD_TIMINGS_METRIC: &str = "tycho_blockchain_rpc_method_time";
22
23#[derive(Debug, Clone, Serialize, Deserialize)]
24#[serde(default)]
25#[non_exhaustive]
26pub struct BlockchainRpcServiceConfig {
27    /// The maximum number of key blocks in the response.
28    ///
29    /// Default: 8.
30    pub max_key_blocks_list_len: usize,
31
32    /// Whether to serve persistent states.
33    ///
34    /// Default: yes.
35    pub serve_persistent_states: bool,
36}
37
38impl Default for BlockchainRpcServiceConfig {
39    fn default() -> Self {
40        Self {
41            max_key_blocks_list_len: 8,
42            serve_persistent_states: true,
43        }
44    }
45}
46
47pub struct BlockchainRpcServiceBuilder<MandatoryFields> {
48    config: BlockchainRpcServiceConfig,
49    mandatory_fields: MandatoryFields,
50}
51
52impl<B> BlockchainRpcServiceBuilder<(B, CoreStorage)>
53where
54    B: BroadcastListener,
55{
56    pub fn build(self) -> BlockchainRpcService<B> {
57        let (broadcast_listener, storage) = self.mandatory_fields;
58
59        BlockchainRpcService {
60            inner: Arc::new(Inner {
61                storage,
62                config: self.config,
63                broadcast_listener,
64            }),
65        }
66    }
67}
68
69impl<T1> BlockchainRpcServiceBuilder<(T1, ())> {
70    pub fn with_storage(
71        self,
72        storage: CoreStorage,
73    ) -> BlockchainRpcServiceBuilder<(T1, CoreStorage)> {
74        let (broadcast_listener, _) = self.mandatory_fields;
75
76        BlockchainRpcServiceBuilder {
77            config: self.config,
78            mandatory_fields: (broadcast_listener, storage),
79        }
80    }
81}
82
83impl<T2> BlockchainRpcServiceBuilder<((), T2)> {
84    pub fn with_broadcast_listener<T1>(
85        self,
86        broadcast_listener: T1,
87    ) -> BlockchainRpcServiceBuilder<(T1, T2)>
88    where
89        T1: BroadcastListener,
90    {
91        let (_, storage) = self.mandatory_fields;
92
93        BlockchainRpcServiceBuilder {
94            config: self.config,
95            mandatory_fields: (broadcast_listener, storage),
96        }
97    }
98
99    pub fn without_broadcast_listener(
100        self,
101    ) -> BlockchainRpcServiceBuilder<(NoopBroadcastListener, T2)> {
102        let (_, storage) = self.mandatory_fields;
103
104        BlockchainRpcServiceBuilder {
105            config: self.config,
106            mandatory_fields: (NoopBroadcastListener, storage),
107        }
108    }
109}
110
111impl<T1, T2> BlockchainRpcServiceBuilder<(T1, T2)> {
112    pub fn with_config(self, config: BlockchainRpcServiceConfig) -> Self {
113        Self { config, ..self }
114    }
115}
116
117#[repr(transparent)]
118pub struct BlockchainRpcService<B = NoopBroadcastListener> {
119    inner: Arc<Inner<B>>,
120}
121
122impl<B> Clone for BlockchainRpcService<B> {
123    #[inline]
124    fn clone(&self) -> Self {
125        Self {
126            inner: self.inner.clone(),
127        }
128    }
129}
130
131impl BlockchainRpcService<()> {
132    pub fn builder() -> BlockchainRpcServiceBuilder<((), ())> {
133        BlockchainRpcServiceBuilder {
134            config: Default::default(),
135            mandatory_fields: ((), ()),
136        }
137    }
138}
139
140impl<B: BroadcastListener> Service<ServiceRequest> for BlockchainRpcService<B> {
141    type QueryResponse = Response;
142    type OnQueryFuture = BoxFutureOrNoop<Option<Self::QueryResponse>>;
143    type OnMessageFuture = BoxFutureOrNoop<()>;
144
145    #[tracing::instrument(level = "debug", name = "on_blockchain_service_query", skip_all)]
146    fn on_query(&self, req: ServiceRequest) -> Self::OnQueryFuture {
147        let (constructor, body) = match try_handle_prefix(&req) {
148            Ok(rest) => rest,
149            Err(e) => {
150                tracing::debug!("failed to deserialize query: {e}");
151                return BoxFutureOrNoop::Noop;
152            }
153        };
154
155        tycho_network::match_tl_request!(body, tag = constructor, {
156            overlay::Ping as _ => BoxFutureOrNoop::future(async {
157                Some(Response::from_tl(overlay::Pong))
158            }),
159            rpc::GetNextKeyBlockIds as req => {
160                tracing::debug!(
161                    block_id = %req.block_id,
162                    max_size = req.max_size,
163                    "getNextKeyBlockIds",
164                );
165
166                let inner = self.inner.clone();
167                BoxFutureOrNoop::future(async move {
168                    let res = inner.handle_get_next_key_block_ids(&req);
169                    Some(Response::from_tl(res))
170                })
171            },
172            rpc::GetBlockFull as req => {
173                tracing::debug!(block_id = %req.block_id, "getBlockFull");
174
175                let inner = self.inner.clone();
176                BoxFutureOrNoop::future(async move {
177                    let res = inner.handle_get_block_full(&req).await;
178                    Some(Response::from_tl(res))
179                })
180            },
181            rpc::GetNextBlockFull as req => {
182                tracing::debug!(prev_block_id = %req.prev_block_id, "getNextBlockFull");
183
184                let inner = self.inner.clone();
185                BoxFutureOrNoop::future(async move {
186                    let res = inner.handle_get_next_block_full(&req).await;
187                    Some(Response::from_tl(res))
188                })
189            },
190            rpc::GetBlockDataChunk as req => {
191                tracing::debug!(block_id = %req.block_id, offset = %req.offset, "getBlockDataChunk");
192
193                let inner = self.inner.clone();
194                BoxFutureOrNoop::future(async move {
195                    let res = inner.handle_get_block_data_chunk(&req);
196                    Some(Response::from_tl(res))
197                })
198            },
199            rpc::GetKeyBlockProof as req => {
200                tracing::debug!(block_id = %req.block_id, "getKeyBlockProof");
201
202                let inner = self.inner.clone();
203                BoxFutureOrNoop::future(async move {
204                    let res = inner.handle_get_key_block_proof(&req).await;
205                    Some(Response::from_tl(res))
206                })
207            },
208            rpc::GetPersistentShardStateInfo as req => {
209                tracing::debug!(block_id = %req.block_id, "getPersistentShardStateInfo");
210
211                let inner = self.inner.clone();
212                BoxFutureOrNoop::future(async move {
213                    let res = inner.handle_get_persistent_state_info(&req);
214                    Some(Response::from_tl(res))
215                })
216            },
217            rpc::GetPersistentQueueStateInfo as req => {
218                tracing::debug!(block_id = %req.block_id, "getPersistentQueueStateInfo");
219
220                let inner = self.inner.clone();
221                BoxFutureOrNoop::future(async move {
222                    let res = inner.handle_get_queue_persistent_state_info(&req);
223                    Some(Response::from_tl(res))
224                })
225            },
226            rpc::GetPersistentShardStateChunk as req => {
227                tracing::debug!(
228                    block_id = %req.block_id,
229                    offset = %req.offset,
230                    "getPersistentShardStateChunk"
231                );
232
233                let inner = self.inner.clone();
234                BoxFutureOrNoop::future(async move {
235                    let res = inner.handle_get_persistent_shard_state_chunk(&req).await;
236                    Some(Response::from_tl(res))
237                })
238            },
239            rpc::GetPersistentQueueStateChunk as req => {
240                tracing::debug!(
241                    block_id = %req.block_id,
242                    offset = %req.offset,
243                    "getPersistentQueueStateChunk"
244                );
245
246                let inner = self.inner.clone();
247                BoxFutureOrNoop::future(async move {
248                    let res = inner.handle_get_persistent_queue_state_chunk(&req).await;
249                    Some(Response::from_tl(res))
250                })
251            },
252            rpc::GetArchiveInfo as req => {
253                tracing::debug!(mc_seqno = %req.mc_seqno, "getArchiveInfo");
254
255                let inner = self.inner.clone();
256                BoxFutureOrNoop::future(async move {
257                    let res = inner.handle_get_archive_info(&req).await;
258                    Some(Response::from_tl(res))
259                })
260            },
261            rpc::GetArchiveChunk as req => {
262                tracing::debug!(
263                    archive_id = %req.archive_id,
264                    offset = %req.offset,
265                    "getArchiveChunk"
266                );
267
268                let inner = self.inner.clone();
269                BoxFutureOrNoop::future(async move {
270                    let res = inner.handle_get_archive_chunk(&req).await;
271                    Some(Response::from_tl(res))
272                })
273            },
274        }, e => {
275            tracing::debug!("failed to deserialize query: {e}");
276            BoxFutureOrNoop::Noop
277        })
278    }
279
280    #[tracing::instrument(level = "debug", name = "on_blockchain_service_message", skip_all)]
281    fn on_message(&self, mut req: ServiceRequest) -> Self::OnMessageFuture {
282        use tl_proto::{BytesMeta, TlRead};
283
284        // TODO: Do nothing if `B` is `NoopBroadcastListener` via `castaway` ?
285
286        // Require message body to contain at least two constructors.
287        if req.body.len() < 8 {
288            return BoxFutureOrNoop::Noop;
289        }
290
291        // Skip broadcast prefix
292        if req.body.get_u32_le() != overlay::BroadcastPrefix::TL_ID {
293            return BoxFutureOrNoop::Noop;
294        }
295
296        // Read (CONSUME) the next constructor.
297        match req.body.get_u32_le() {
298            MessageBroadcastRef::TL_ID => {
299                match BytesMeta::read_from(&mut req.body.as_ref()) {
300                    // NOTE: `len` is 24bit integer
301                    Ok(meta) if req.body.len() == meta.prefix_len + meta.len + meta.padding => {
302                        req.body.advance(meta.prefix_len);
303                        req.body.truncate(meta.len);
304                    }
305                    Ok(_) => {
306                        tracing::debug!("malformed external message broadcast");
307                        return BoxFutureOrNoop::Noop;
308                    }
309                    Err(e) => {
310                        tracing::debug!("failed to deserialize external message broadcast: {e:?}");
311                        return BoxFutureOrNoop::Noop;
312                    }
313                }
314
315                let inner = self.inner.clone();
316                metrics::counter!("tycho_rpc_broadcast_external_message_rx_bytes_total")
317                    .increment(req.body.len() as u64);
318                BoxFutureOrNoop::future(async move {
319                    if let Err(e) = validate_external_message(&req.body).await {
320                        tracing::debug!("invalid external message: {e:?}");
321                        return;
322                    }
323
324                    inner
325                        .broadcast_listener
326                        .handle_message(req.metadata, req.body)
327                        .await;
328                })
329            }
330            constructor => {
331                tracing::debug!("unknown broadcast constructor: {constructor:08x}");
332                BoxFutureOrNoop::Noop
333            }
334        }
335    }
336}
337
338struct Inner<B> {
339    storage: CoreStorage,
340    config: BlockchainRpcServiceConfig,
341    broadcast_listener: B,
342}
343
344impl<B> Inner<B> {
345    fn storage(&self) -> &CoreStorage {
346        &self.storage
347    }
348
349    fn handle_get_next_key_block_ids(
350        &self,
351        req: &rpc::GetNextKeyBlockIds,
352    ) -> overlay::Response<KeyBlockIds> {
353        let label = [("method", "getNextKeyBlockIds")];
354        let _hist = HistogramGuard::begin_with_labels(RPC_METHOD_TIMINGS_METRIC, &label);
355
356        let block_handle_storage = self.storage().block_handle_storage();
357
358        let limit = std::cmp::min(req.max_size as usize, self.config.max_key_blocks_list_len);
359
360        let get_next_key_block_ids = || {
361            if !req.block_id.shard.is_masterchain() {
362                anyhow::bail!("first block id is not from masterchain");
363            }
364
365            let mut iterator = block_handle_storage
366                .key_blocks_iterator(KeyBlocksDirection::ForwardFrom(req.block_id.seqno))
367                .take(limit + 1);
368
369            if let Some(id) = iterator.next() {
370                anyhow::ensure!(
371                    id.root_hash == req.block_id.root_hash,
372                    "first block root hash mismatch"
373                );
374                anyhow::ensure!(
375                    id.file_hash == req.block_id.file_hash,
376                    "first block file hash mismatch"
377                );
378            }
379
380            Ok::<_, anyhow::Error>(iterator.take(limit).collect::<Vec<_>>())
381        };
382
383        match get_next_key_block_ids() {
384            Ok(ids) => {
385                let incomplete = ids.len() < limit;
386                overlay::Response::Ok(KeyBlockIds {
387                    block_ids: ids,
388                    incomplete,
389                })
390            }
391            Err(e) => {
392                tracing::warn!("get_next_key_block_ids failed: {e:?}");
393                overlay::Response::Err(INTERNAL_ERROR_CODE)
394            }
395        }
396    }
397
398    async fn handle_get_block_full(&self, req: &rpc::GetBlockFull) -> overlay::Response<BlockFull> {
399        let label = [("method", "getBlockFull")];
400        let _hist = HistogramGuard::begin_with_labels(RPC_METHOD_TIMINGS_METRIC, &label);
401
402        match self.get_block_full(&req.block_id).await {
403            Ok(block_full) => overlay::Response::Ok(block_full),
404            Err(e) => {
405                tracing::warn!("get_block_full failed: {e:?}");
406                overlay::Response::Err(INTERNAL_ERROR_CODE)
407            }
408        }
409    }
410
411    async fn handle_get_next_block_full(
412        &self,
413        req: &rpc::GetNextBlockFull,
414    ) -> overlay::Response<BlockFull> {
415        let label = [("method", "getNextBlockFull")];
416        let _hist = HistogramGuard::begin_with_labels(RPC_METHOD_TIMINGS_METRIC, &label);
417
418        let block_handle_storage = self.storage().block_handle_storage();
419        let block_connection_storage = self.storage().block_connection_storage();
420
421        let get_next_block_full = async {
422            let next_block_id = match block_handle_storage.load_handle(&req.prev_block_id) {
423                Some(handle) if handle.has_next1() => block_connection_storage
424                    .load_connection(&req.prev_block_id, BlockConnection::Next1)
425                    .context("connection not found")?,
426                _ => return Ok(BlockFull::NotFound),
427            };
428
429            self.get_block_full(&next_block_id).await
430        };
431
432        match get_next_block_full.await {
433            Ok(block_full) => overlay::Response::Ok(block_full),
434            Err(e) => {
435                tracing::warn!("get_next_block_full failed: {e:?}");
436                overlay::Response::Err(INTERNAL_ERROR_CODE)
437            }
438        }
439    }
440
441    fn handle_get_block_data_chunk(&self, req: &rpc::GetBlockDataChunk) -> overlay::Response<Data> {
442        let label = [("method", "getBlockDataChunk")];
443        let _hist = HistogramGuard::begin_with_labels(RPC_METHOD_TIMINGS_METRIC, &label);
444
445        let block_storage = self.storage.block_storage();
446        match block_storage.get_block_data_chunk(&req.block_id, req.offset) {
447            Ok(Some(data)) => overlay::Response::Ok(Data {
448                data: Bytes::from_owner(data),
449            }),
450            Ok(None) => overlay::Response::Err(NOT_FOUND_ERROR_CODE),
451            Err(e) => {
452                tracing::warn!("get_block_data_chunk failed: {e:?}");
453                overlay::Response::Err(INTERNAL_ERROR_CODE)
454            }
455        }
456    }
457
458    async fn handle_get_key_block_proof(
459        &self,
460        req: &rpc::GetKeyBlockProof,
461    ) -> overlay::Response<KeyBlockProof> {
462        let label = [("method", "getKeyBlockProof")];
463        let _hist = HistogramGuard::begin_with_labels(RPC_METHOD_TIMINGS_METRIC, &label);
464
465        let block_handle_storage = self.storage().block_handle_storage();
466        let block_storage = self.storage().block_storage();
467
468        let get_key_block_proof = async {
469            match block_handle_storage.load_handle(&req.block_id) {
470                Some(handle) if handle.has_proof() => {
471                    let data = block_storage.load_block_proof_raw(&handle).await?;
472                    Ok::<_, anyhow::Error>(KeyBlockProof::Found {
473                        proof: Bytes::from_owner(data),
474                    })
475                }
476                _ => Ok(KeyBlockProof::NotFound),
477            }
478        };
479
480        match get_key_block_proof.await {
481            Ok(key_block_proof) => overlay::Response::Ok(key_block_proof),
482            Err(e) => {
483                tracing::warn!("get_key_block_proof failed: {e:?}");
484                overlay::Response::Err(INTERNAL_ERROR_CODE)
485            }
486        }
487    }
488
489    async fn handle_get_archive_info(
490        &self,
491        req: &rpc::GetArchiveInfo,
492    ) -> overlay::Response<ArchiveInfo> {
493        let mc_seqno = req.mc_seqno;
494        let node_state = self.storage.node_state();
495
496        let label = [("method", "getArchiveInfo")];
497        let _hist = HistogramGuard::begin_with_labels(RPC_METHOD_TIMINGS_METRIC, &label);
498
499        match node_state.load_last_mc_block_id() {
500            Some(last_applied_mc_block) => {
501                if mc_seqno > last_applied_mc_block.seqno {
502                    return overlay::Response::Ok(ArchiveInfo::TooNew);
503                }
504
505                let block_storage = self.storage().block_storage();
506
507                let id = block_storage.get_archive_id(mc_seqno);
508                let size_res = match id {
509                    ArchiveId::Found(id) => block_storage.get_archive_size(id),
510                    ArchiveId::TooNew | ArchiveId::NotFound => Ok(None),
511                };
512
513                overlay::Response::Ok(match (id, size_res) {
514                    (ArchiveId::Found(id), Ok(Some(size))) if size > 0 => ArchiveInfo::Found {
515                        id: id as u64,
516                        size: NonZeroU64::new(size as _).unwrap(),
517                        chunk_size: block_storage.archive_chunk_size(),
518                    },
519                    (ArchiveId::TooNew, Ok(None)) => ArchiveInfo::TooNew,
520                    _ => ArchiveInfo::NotFound,
521                })
522            }
523            None => {
524                tracing::warn!("get_archive_id failed: no blocks applied");
525                overlay::Response::Err(INTERNAL_ERROR_CODE)
526            }
527        }
528    }
529
530    async fn handle_get_archive_chunk(
531        &self,
532        req: &rpc::GetArchiveChunk,
533    ) -> overlay::Response<Data> {
534        let label = [("method", "getArchiveChunk")];
535        let _hist = HistogramGuard::begin_with_labels(RPC_METHOD_TIMINGS_METRIC, &label);
536
537        let block_storage = self.storage.block_storage();
538
539        let get_archive_chunk = || async {
540            let archive_slice = block_storage
541                .get_archive_chunk(req.archive_id as u32, req.offset)
542                .await?;
543
544            Ok::<_, anyhow::Error>(archive_slice)
545        };
546
547        match get_archive_chunk().await {
548            Ok(data) => overlay::Response::Ok(Data {
549                data: Bytes::from_owner(data),
550            }),
551            Err(e) => {
552                tracing::warn!("get_archive_chunk failed: {e:?}");
553                overlay::Response::Err(INTERNAL_ERROR_CODE)
554            }
555        }
556    }
557
558    fn handle_get_persistent_state_info(
559        &self,
560        req: &rpc::GetPersistentShardStateInfo,
561    ) -> overlay::Response<PersistentStateInfo> {
562        let label = [("method", "getPersistentShardStateInfo")];
563        let _hist = HistogramGuard::begin_with_labels(RPC_METHOD_TIMINGS_METRIC, &label);
564        let res = self.read_persistent_state_info(&req.block_id, PersistentStateKind::Shard);
565        overlay::Response::Ok(res)
566    }
567
568    fn handle_get_queue_persistent_state_info(
569        &self,
570        req: &rpc::GetPersistentQueueStateInfo,
571    ) -> overlay::Response<PersistentStateInfo> {
572        let label = [("method", "getQueuePersistentStateInfo")];
573        let _hist = HistogramGuard::begin_with_labels(RPC_METHOD_TIMINGS_METRIC, &label);
574        let res = self.read_persistent_state_info(&req.block_id, PersistentStateKind::Queue);
575        overlay::Response::Ok(res)
576    }
577
578    async fn handle_get_persistent_shard_state_chunk(
579        &self,
580        req: &rpc::GetPersistentShardStateChunk,
581    ) -> overlay::Response<Data> {
582        let label = [("method", "getPersistentShardStateChunk")];
583        let _hist = HistogramGuard::begin_with_labels(RPC_METHOD_TIMINGS_METRIC, &label);
584        self.read_persistent_state_chunk(&req.block_id, req.offset, PersistentStateKind::Shard)
585            .await
586    }
587
588    async fn handle_get_persistent_queue_state_chunk(
589        &self,
590        req: &rpc::GetPersistentQueueStateChunk,
591    ) -> overlay::Response<Data> {
592        let label = [("method", "getPersistentQueueStateChunk")];
593        let _hist = HistogramGuard::begin_with_labels(RPC_METHOD_TIMINGS_METRIC, &label);
594        self.read_persistent_state_chunk(&req.block_id, req.offset, PersistentStateKind::Queue)
595            .await
596    }
597}
598
599impl<B> Inner<B> {
600    async fn get_block_full(&self, block_id: &BlockId) -> anyhow::Result<BlockFull> {
601        let block_handle_storage = self.storage().block_handle_storage();
602        let block_storage = self.storage().block_storage();
603
604        let handle = match block_handle_storage.load_handle(block_id) {
605            Some(handle) if handle.has_all_block_parts() => handle,
606            _ => return Ok(BlockFull::NotFound),
607        };
608
609        let Some(data) = block_storage.get_block_data_chunk(block_id, 0)? else {
610            return Ok(BlockFull::NotFound);
611        };
612
613        let data_chunk_size = block_storage.block_data_chunk_size();
614        let data_size = if data.len() < data_chunk_size.get() as usize {
615            // NOTE: Skip one RocksDB read for relatively small blocks
616            //       Average block size is 4KB, while the chunk size is 1MB.
617            data.len() as u32
618        } else {
619            match block_storage.get_block_data_size(block_id)? {
620                Some(size) => size,
621                None => return Ok(BlockFull::NotFound),
622            }
623        };
624
625        let block = BlockData {
626            data: Bytes::from_owner(data),
627            size: NonZeroU32::new(data_size).expect("shouldn't happen"),
628            chunk_size: data_chunk_size,
629        };
630
631        let (proof, queue_diff) = tokio::join!(
632            block_storage.load_block_proof_raw(&handle),
633            block_storage.load_queue_diff_raw(&handle)
634        );
635
636        Ok(BlockFull::Found {
637            block_id: *block_id,
638            block,
639            proof: Bytes::from_owner(proof?),
640            queue_diff: Bytes::from_owner(queue_diff?),
641        })
642    }
643
644    fn read_persistent_state_info(
645        &self,
646        block_id: &BlockId,
647        state_kind: PersistentStateKind,
648    ) -> PersistentStateInfo {
649        let persistent_state_storage = self.storage().persistent_state_storage();
650        if self.config.serve_persistent_states {
651            if let Some(info) = persistent_state_storage.get_state_info(block_id, state_kind) {
652                return PersistentStateInfo::Found {
653                    size: info.size,
654                    chunk_size: info.chunk_size,
655                };
656            }
657        }
658        PersistentStateInfo::NotFound
659    }
660
661    async fn read_persistent_state_chunk(
662        &self,
663        block_id: &BlockId,
664        offset: u64,
665        state_kind: PersistentStateKind,
666    ) -> overlay::Response<Data> {
667        let persistent_state_storage = self.storage().persistent_state_storage();
668
669        let persistent_state_request_validation = || {
670            anyhow::ensure!(
671                self.config.serve_persistent_states,
672                "persistent states are disabled"
673            );
674            Ok::<_, anyhow::Error>(())
675        };
676
677        if let Err(e) = persistent_state_request_validation() {
678            tracing::debug!("persistent state request validation failed: {e:?}");
679            return overlay::Response::Err(BAD_REQUEST_ERROR_CODE);
680        }
681
682        match persistent_state_storage
683            .read_state_part(block_id, offset, state_kind)
684            .await
685        {
686            Some(data) => overlay::Response::Ok(Data { data: data.into() }),
687            None => {
688                tracing::debug!("failed to read persistent state part");
689                overlay::Response::Err(NOT_FOUND_ERROR_CODE)
690            }
691        }
692    }
693}