1use std::num::NonZeroU32;
2use std::sync::Arc;
3
4use anyhow::Context;
5use bytes::{Buf, Bytes};
6#[cfg(feature = "s3")]
7use bytesize::ByteSize;
8use serde::{Deserialize, Serialize};
9use tycho_block_util::message::validate_external_message;
10use tycho_network::{Response, Service, ServiceRequest, try_handle_prefix};
11use tycho_types::models::BlockId;
12use tycho_util::futures::BoxFutureOrNoop;
13use tycho_util::metrics::HistogramGuard;
14
15use crate::blockchain_rpc::broadcast_listener::{BroadcastListener, NoopBroadcastListener};
16use crate::blockchain_rpc::providers::{IntoRpcDataProvider, RpcDataProvider};
17use crate::blockchain_rpc::{BAD_REQUEST_ERROR_CODE, INTERNAL_ERROR_CODE, NOT_FOUND_ERROR_CODE};
18use crate::proto::blockchain::*;
19use crate::proto::overlay;
20use crate::storage::{BlockConnection, CoreStorage, KeyBlocksDirection, PersistentStateKind};
21
22const RPC_METHOD_TIMINGS_METRIC: &str = "tycho_blockchain_rpc_method_time";
23
24#[cfg(not(test))]
25const BLOCK_DATA_CHUNK_SIZE: u32 = 1024 * 1024; #[cfg(test)]
27const BLOCK_DATA_CHUNK_SIZE: u32 = 10; #[cfg(feature = "s3")]
30#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct S3ProxyConfig {
32 pub rate_limit: NonZeroU32,
36
37 pub bandwidth_limit: ByteSize,
41}
42
43#[cfg(feature = "s3")]
44impl Default for S3ProxyConfig {
45 fn default() -> Self {
46 Self {
47 rate_limit: NonZeroU32::new(100).unwrap(),
48 bandwidth_limit: ByteSize::mib(100),
49 }
50 }
51}
52
53#[derive(Debug, Clone, Serialize, Deserialize)]
54#[serde(default)]
55#[non_exhaustive]
56pub struct BlockchainRpcServiceConfig {
57 pub max_key_blocks_list_len: usize,
61
62 pub serve_persistent_states: bool,
66
67 #[cfg(feature = "s3")]
71 pub s3_proxy: Option<S3ProxyConfig>,
72}
73
74impl Default for BlockchainRpcServiceConfig {
75 fn default() -> Self {
76 Self {
77 max_key_blocks_list_len: 8,
78 serve_persistent_states: true,
79 #[cfg(feature = "s3")]
80 s3_proxy: Some(S3ProxyConfig::default()),
81 }
82 }
83}
84
85pub struct BlockchainRpcServiceBuilder<MandatoryFields> {
86 config: BlockchainRpcServiceConfig,
87 mandatory_fields: MandatoryFields,
88}
89
90impl<B> BlockchainRpcServiceBuilder<(B, CoreStorage, Arc<dyn RpcDataProvider>)>
91where
92 B: BroadcastListener,
93{
94 pub fn build(self) -> BlockchainRpcService<B> {
95 let (broadcast_listener, storage, rpc_data_provider) = self.mandatory_fields;
96
97 BlockchainRpcService {
98 inner: Arc::new(Inner {
99 storage,
100 rpc_data_provider,
101 config: self.config,
102 broadcast_listener,
103 }),
104 }
105 }
106}
107
108impl<B> BlockchainRpcServiceBuilder<(B, CoreStorage, ())>
109where
110 B: BroadcastListener,
111{
112 pub fn build(self) -> BlockchainRpcService<B> {
113 let (broadcast_listener, storage, _) = self.mandatory_fields;
114 let rpc_data_provider = storage.clone().into_data_provider();
115
116 BlockchainRpcService {
117 inner: Arc::new(Inner {
118 storage,
119 rpc_data_provider,
120 config: self.config,
121 broadcast_listener,
122 }),
123 }
124 }
125}
126
127impl<T1, T3> BlockchainRpcServiceBuilder<(T1, (), T3)> {
128 pub fn with_storage(
129 self,
130 storage: CoreStorage,
131 ) -> BlockchainRpcServiceBuilder<(T1, CoreStorage, T3)> {
132 let (broadcast_listener, _, archive_provider) = self.mandatory_fields;
133
134 BlockchainRpcServiceBuilder {
135 config: self.config,
136 mandatory_fields: (broadcast_listener, storage, archive_provider),
137 }
138 }
139}
140
141impl<T2, T3> BlockchainRpcServiceBuilder<((), T2, T3)> {
142 pub fn with_broadcast_listener<T1>(
143 self,
144 broadcast_listener: T1,
145 ) -> BlockchainRpcServiceBuilder<(T1, T2, T3)>
146 where
147 T1: BroadcastListener,
148 {
149 let (_, storage, archive_provider) = self.mandatory_fields;
150
151 BlockchainRpcServiceBuilder {
152 config: self.config,
153 mandatory_fields: (broadcast_listener, storage, archive_provider),
154 }
155 }
156
157 pub fn without_broadcast_listener(
158 self,
159 ) -> BlockchainRpcServiceBuilder<(NoopBroadcastListener, T2, T3)> {
160 let (_, storage, archive_provider) = self.mandatory_fields;
161
162 BlockchainRpcServiceBuilder {
163 config: self.config,
164 mandatory_fields: (NoopBroadcastListener, storage, archive_provider),
165 }
166 }
167}
168
169impl<T1, T2> BlockchainRpcServiceBuilder<(T1, T2, ())> {
170 pub fn with_data_provider<RP: IntoRpcDataProvider>(
171 self,
172 provider: RP,
173 ) -> BlockchainRpcServiceBuilder<(T1, T2, Arc<dyn RpcDataProvider>)> {
174 let (broadcast_listener, storage, _) = self.mandatory_fields;
175 let rpc_data_provider = provider.into_data_provider();
176
177 BlockchainRpcServiceBuilder {
178 config: self.config,
179 mandatory_fields: (broadcast_listener, storage, rpc_data_provider),
180 }
181 }
182}
183
184impl<T1, T2, T3> BlockchainRpcServiceBuilder<(T1, T2, T3)> {
185 pub fn with_config(self, config: BlockchainRpcServiceConfig) -> Self {
186 Self { config, ..self }
187 }
188}
189
190#[repr(transparent)]
191pub struct BlockchainRpcService<B = NoopBroadcastListener> {
192 inner: Arc<Inner<B>>,
193}
194
195impl<B> Clone for BlockchainRpcService<B> {
196 #[inline]
197 fn clone(&self) -> Self {
198 Self {
199 inner: self.inner.clone(),
200 }
201 }
202}
203
204impl BlockchainRpcService<()> {
205 pub fn builder() -> BlockchainRpcServiceBuilder<((), (), ())> {
206 BlockchainRpcServiceBuilder {
207 config: Default::default(),
208 mandatory_fields: ((), (), ()),
209 }
210 }
211}
212
213macro_rules! match_request {
214 ($req_body:expr, $tag:expr, {
215 $(
216 #[meta($name:literal $(, $($args:tt)*)?)]
217 $([$raw:tt])? $ty:path as $pat:pat => $expr:expr
218 ),*$(,)?
219 }, $err:pat => $err_exr:expr) => {
220 '__match_req: {
221 let $err = match $tag {
222 $(<$ty>::TL_ID => match tl_proto::deserialize::<$ty>(&($req_body)) {
223 Ok($pat) => break '__match_req match_request!(
224 @expr
225 { $name $($($args)*)? }
226 { $($raw)? }
227 $expr
228 ),
229 Err(e) => e,
230 })*
231 _ => tl_proto::TlError::UnknownConstructor,
232 };
233 $err_exr
234 }
235 };
236
237 (@expr { $name:literal $($args:tt)* } { $($raw:tt)+ } $expr:expr) => {
239 $expr
240 };
241 (@expr { $name:literal $($args:tt)* } { } $expr:expr) => {{
243 match_request!(@debug $name {} { $($args)* });
244
245 let __started_at = std::time::Instant::now();
246
247 BoxFutureOrNoop::future(async move {
248 scopeguard::defer! {
249 metrics::histogram!(
250 RPC_METHOD_TIMINGS_METRIC,
251 "method" => $name
252 ).record(__started_at.elapsed());
253 }
254
255 Some(Response::from_tl($expr))
256 })
257 }};
258
259 (@debug $name:literal { } { $(,)? }) => {
261 match_request!(@debug_impl $name)
262 };
263 (@debug $name:literal { $($res:tt)+ } { $(,)? }) => {
264 match_request!(@debug_impl $($res)+, $name)
265 };
266 (@debug $name:literal { $($res:tt)* } { $t:tt $($args:tt)* }) => {
267 match_request!(@debug $name { $($res)* $t } { $($args)* })
268 };
269 (@debug_impl $($args:tt)*) => {
270 tracing::debug!($($args)*)
271 };
272}
273
274impl<B: BroadcastListener> Service<ServiceRequest> for BlockchainRpcService<B> {
275 type QueryResponse = Response;
276 type OnQueryFuture = BoxFutureOrNoop<Option<Self::QueryResponse>>;
277 type OnMessageFuture = BoxFutureOrNoop<()>;
278
279 #[tracing::instrument(level = "debug", name = "on_blockchain_service_query", skip_all)]
280 fn on_query(&self, req: ServiceRequest) -> Self::OnQueryFuture {
281 let (constructor, body) = match try_handle_prefix(&req) {
282 Ok(rest) => rest,
283 Err(e) => {
284 tracing::debug!("failed to deserialize query: {e}");
285 return BoxFutureOrNoop::Noop;
286 }
287 };
288
289 let inner = self.inner.clone();
290
291 match_request!(body, constructor, {
293 #[meta("ping")]
294 [raw] overlay::Ping as _ => BoxFutureOrNoop::future(async {
295 Some(Response::from_tl(overlay::Pong))
296 }),
297
298 #[meta(
299 "getNextKeyBlockIds",
300 block_id = %req.block_id,
301 max_size = req.max_size,
302 )]
303 rpc::GetNextKeyBlockIds as req => inner.handle_get_next_key_block_ids(&req),
304
305 #[meta("getBlockFull", block_id = %req.block_id)]
306 rpc::GetBlockFull as req => inner.handle_get_block_full(&req).await,
307
308 #[meta("getNextBlockFull", prev_block_id = %req.prev_block_id)]
309 rpc::GetNextBlockFull as req => inner.handle_get_next_block_full(&req).await,
310
311 #[meta(
312 "getBlockDataChunk",
313 block_id = %req.block_id,
314 offset = %req.offset,
315 )]
316 rpc::GetBlockDataChunk as req => inner.handle_get_block_data_chunk(&req).await,
317
318 #[meta("getKeyBlockProof", block_id = %req.block_id)]
319 rpc::GetKeyBlockProof as req => inner.handle_get_key_block_proof(&req).await,
320
321 #[meta("getZerostateProof")]
322 rpc::GetZerostateProof as _ => inner.handle_get_zerostate_proof().await,
323
324 #[meta("getPersistentShardStateInfo", block_id = %req.block_id)]
325 rpc::GetPersistentShardStateInfo as req => inner.handle_get_persistent_state_info(&req).await,
326
327 #[meta("getPersistentQueueStateInfo", block_id = %req.block_id)]
328 rpc::GetPersistentQueueStateInfo as req => inner.handle_get_queue_persistent_state_info(&req).await,
329
330 #[meta(
331 "getPersistentShardStateChunk",
332 block_id = %req.block_id,
333 offset = %req.offset,
334 )]
335 rpc::GetPersistentShardStateChunk as req => {
336 inner.handle_get_persistent_shard_state_chunk(&req).await
337 },
338
339 #[meta(
340 "getPersistentQueueStateChunk",
341 block_id = %req.block_id,
342 offset = %req.offset,
343 )]
344 rpc::GetPersistentQueueStateChunk as req => {
345 inner.handle_get_persistent_queue_state_chunk(&req).await
346 },
347
348 #[meta("getArchiveInfo", mc_seqno = %req.mc_seqno)]
349 rpc::GetArchiveInfo as req => inner.handle_get_archive_info(&req).await,
350
351 #[meta(
352 "getArchiveChunk",
353 archive_id = %req.archive_id,
354 offset = %req.offset,
355 )]
356 rpc::GetArchiveChunk as req => inner.handle_get_archive_chunk(&req).await,
357 }, e => {
358 tracing::debug!("failed to deserialize query: {e}");
359 BoxFutureOrNoop::Noop
360 })
361 }
362
363 #[tracing::instrument(level = "debug", name = "on_blockchain_service_message", skip_all)]
364 fn on_message(&self, mut req: ServiceRequest) -> Self::OnMessageFuture {
365 use tl_proto::{BytesMeta, TlRead};
366
367 if req.body.len() < 8 {
371 return BoxFutureOrNoop::Noop;
372 }
373
374 if req.body.get_u32_le() != overlay::BroadcastPrefix::TL_ID {
376 return BoxFutureOrNoop::Noop;
377 }
378
379 match req.body.get_u32_le() {
381 MessageBroadcastRef::TL_ID => {
382 match BytesMeta::read_from(&mut req.body.as_ref()) {
383 Ok(meta) if req.body.len() == meta.prefix_len + meta.len + meta.padding => {
385 req.body.advance(meta.prefix_len);
386 req.body.truncate(meta.len);
387 }
388 Ok(_) => {
389 tracing::debug!("malformed external message broadcast");
390 return BoxFutureOrNoop::Noop;
391 }
392 Err(e) => {
393 tracing::debug!("failed to deserialize external message broadcast: {e:?}");
394 return BoxFutureOrNoop::Noop;
395 }
396 }
397
398 let inner = self.inner.clone();
399 metrics::counter!("tycho_rpc_broadcast_external_message_rx_bytes_total")
400 .increment(req.body.len() as u64);
401 BoxFutureOrNoop::future(async move {
402 if let Err(e) = validate_external_message(&req.body).await {
403 tracing::debug!("invalid external message: {e:?}");
404 return;
405 }
406
407 inner
408 .broadcast_listener
409 .handle_message(req.metadata, req.body)
410 .await;
411 })
412 }
413 constructor => {
414 tracing::debug!("unknown broadcast constructor: {constructor:08x}");
415 BoxFutureOrNoop::Noop
416 }
417 }
418 }
419}
420
421struct Inner<B> {
422 storage: CoreStorage,
423 config: BlockchainRpcServiceConfig,
424 broadcast_listener: B,
425 rpc_data_provider: Arc<dyn RpcDataProvider>,
426}
427
428impl<B> Inner<B> {
429 fn storage(&self) -> &CoreStorage {
430 &self.storage
431 }
432
433 fn handle_get_next_key_block_ids(
434 &self,
435 req: &rpc::GetNextKeyBlockIds,
436 ) -> overlay::Response<KeyBlockIds> {
437 let block_handle_storage = self.storage().block_handle_storage();
438
439 let limit = std::cmp::min(req.max_size as usize, self.config.max_key_blocks_list_len);
440
441 let get_next_key_block_ids = || {
442 if !req.block_id.shard.is_masterchain() {
443 anyhow::bail!("first block id is not from masterchain");
444 }
445
446 let mut iterator = block_handle_storage
447 .key_blocks_iterator(KeyBlocksDirection::ForwardFrom(req.block_id.seqno))
448 .take(limit + 1);
449
450 if let Some(id) = iterator.next() {
451 anyhow::ensure!(
452 id.root_hash == req.block_id.root_hash,
453 "first block root hash mismatch"
454 );
455 anyhow::ensure!(
456 id.file_hash == req.block_id.file_hash,
457 "first block file hash mismatch"
458 );
459 }
460
461 Ok::<_, anyhow::Error>(iterator.take(limit).collect::<Vec<_>>())
462 };
463
464 match get_next_key_block_ids() {
465 Ok(ids) => {
466 let incomplete = ids.len() < limit;
467 overlay::Response::Ok(KeyBlockIds {
468 block_ids: ids,
469 incomplete,
470 })
471 }
472 Err(e) => {
473 tracing::warn!("get_next_key_block_ids failed: {e:?}");
474 overlay::Response::Err(INTERNAL_ERROR_CODE)
475 }
476 }
477 }
478
479 async fn handle_get_block_full(&self, req: &rpc::GetBlockFull) -> overlay::Response<BlockFull> {
480 match self.get_block_full(&req.block_id).await {
481 Ok(block_full) => overlay::Response::Ok(block_full),
482 Err(e) => {
483 tracing::warn!("get_block_full failed: {e:?}");
484 overlay::Response::Err(INTERNAL_ERROR_CODE)
485 }
486 }
487 }
488
489 async fn handle_get_next_block_full(
490 &self,
491 req: &rpc::GetNextBlockFull,
492 ) -> overlay::Response<BlockFull> {
493 let block_handle_storage = self.storage().block_handle_storage();
494 let block_connection_storage = self.storage().block_connection_storage();
495
496 let get_next_block_full = async {
497 let next_block_id = match block_handle_storage.load_handle(&req.prev_block_id) {
498 Some(handle) if handle.has_next1() => block_connection_storage
499 .load_connection(&req.prev_block_id, BlockConnection::Next1)
500 .context("connection not found")?,
501 _ => return Ok(BlockFull::NotFound),
502 };
503
504 self.get_block_full(&next_block_id).await
505 };
506
507 match get_next_block_full.await {
508 Ok(block_full) => overlay::Response::Ok(block_full),
509 Err(e) => {
510 tracing::warn!("get_next_block_full failed: {e:?}");
511 overlay::Response::Err(INTERNAL_ERROR_CODE)
512 }
513 }
514 }
515
516 async fn handle_get_block_data_chunk(
517 &self,
518 req: &rpc::GetBlockDataChunk,
519 ) -> overlay::Response<Data> {
520 let block_handle_storage = self.storage().block_handle_storage();
521 let block_storage = self.storage().block_storage();
522
523 let handle = match block_handle_storage.load_handle(&req.block_id) {
524 Some(handle) if handle.has_data() => handle,
525 _ => {
526 tracing::debug!("block data not found for chunked read");
527 return overlay::Response::Err(NOT_FOUND_ERROR_CODE);
528 }
529 };
530
531 let offset = req.offset as u64;
532 match block_storage
533 .load_block_data_range(&handle, offset, BLOCK_DATA_CHUNK_SIZE as u64)
534 .await
535 {
536 Ok(Some(data)) => overlay::Response::Ok(Data { data }),
537 Ok(None) => {
538 tracing::debug!("block data chunk not found at offset {}", offset);
539 overlay::Response::Err(NOT_FOUND_ERROR_CODE)
540 }
541 Err(e) => {
542 tracing::warn!("get_block_data_chunk failed: {e:?}");
543 overlay::Response::Err(INTERNAL_ERROR_CODE)
544 }
545 }
546 }
547
548 async fn handle_get_key_block_proof(
549 &self,
550 req: &rpc::GetKeyBlockProof,
551 ) -> overlay::Response<KeyBlockProof> {
552 let block_handle_storage = self.storage().block_handle_storage();
553 let block_storage = self.storage().block_storage();
554
555 let get_key_block_proof = async {
556 match block_handle_storage.load_handle(&req.block_id) {
557 Some(handle) if handle.has_proof() => {
558 let data = block_storage.load_block_proof_raw(&handle).await?;
559 Ok::<_, anyhow::Error>(KeyBlockProof::Found {
560 proof: Bytes::from_owner(data),
561 })
562 }
563 _ => Ok(KeyBlockProof::NotFound),
564 }
565 };
566
567 match get_key_block_proof.await {
568 Ok(key_block_proof) => overlay::Response::Ok(key_block_proof),
569 Err(e) => {
570 tracing::warn!("get_key_block_proof failed: {e:?}");
571 overlay::Response::Err(INTERNAL_ERROR_CODE)
572 }
573 }
574 }
575
576 async fn handle_get_zerostate_proof(&self) -> overlay::Response<ZerostateProof> {
577 let storage = self.storage().node_state();
578 let proof_opt = storage.load_zerostate_proof_bytes();
579 let result = match proof_opt {
580 Some(proof) => ZerostateProof::Found { proof },
581 None => ZerostateProof::NotFound,
582 };
583
584 overlay::Response::Ok(result)
585 }
586
587 async fn handle_get_archive_info(
588 &self,
589 req: &rpc::GetArchiveInfo,
590 ) -> overlay::Response<ArchiveInfo> {
591 match self.rpc_data_provider.get_archive_info(req.mc_seqno).await {
592 Ok(info) => overlay::Response::Ok(info),
593 Err(e) => {
594 tracing::warn!(mc_seqno = req.mc_seqno, "get_archive_info failed: {e:?}");
595 overlay::Response::Err(INTERNAL_ERROR_CODE)
596 }
597 }
598 }
599
600 async fn handle_get_archive_chunk(
601 &self,
602 req: &rpc::GetArchiveChunk,
603 ) -> overlay::Response<Data> {
604 match self
605 .rpc_data_provider
606 .get_archive_chunk(req.archive_id as u32, req.offset)
607 .await
608 {
609 Ok(data) => overlay::Response::Ok(Data { data }),
610 Err(e) => {
611 tracing::warn!(
612 archive_id = req.archive_id,
613 offset = req.offset,
614 "get_archive_chunk failed: {e:?}"
615 );
616 overlay::Response::Err(INTERNAL_ERROR_CODE)
617 }
618 }
619 }
620
621 async fn handle_get_persistent_state_info(
622 &self,
623 req: &rpc::GetPersistentShardStateInfo,
624 ) -> overlay::Response<PersistentStateInfo> {
625 let label = [("method", "getPersistentShardStateInfo")];
626 let _hist = HistogramGuard::begin_with_labels(RPC_METHOD_TIMINGS_METRIC, &label);
627 match self
628 .read_persistent_state_info(&req.block_id, PersistentStateKind::Shard)
629 .await
630 {
631 Ok(info) => overlay::Response::Ok(info),
632 Err(e) => {
633 tracing::warn!(
634 block_id = ?req.block_id,
635 "get_persistent_state_info failed: {e:?}"
636 );
637 overlay::Response::Err(INTERNAL_ERROR_CODE)
638 }
639 }
640 }
641
642 async fn handle_get_queue_persistent_state_info(
643 &self,
644 req: &rpc::GetPersistentQueueStateInfo,
645 ) -> overlay::Response<PersistentStateInfo> {
646 match self
647 .read_persistent_state_info(&req.block_id, PersistentStateKind::Queue)
648 .await
649 {
650 Ok(info) => overlay::Response::Ok(info),
651 Err(e) => {
652 tracing::warn!(
653 block_id = ?req.block_id,
654 "get_persistent_state_info failed: {e:?}"
655 );
656 overlay::Response::Err(INTERNAL_ERROR_CODE)
657 }
658 }
659 }
660
661 async fn handle_get_persistent_shard_state_chunk(
662 &self,
663 req: &rpc::GetPersistentShardStateChunk,
664 ) -> overlay::Response<Data> {
665 self.read_persistent_state_chunk(&req.block_id, req.offset, PersistentStateKind::Shard)
666 .await
667 }
668
669 async fn handle_get_persistent_queue_state_chunk(
670 &self,
671 req: &rpc::GetPersistentQueueStateChunk,
672 ) -> overlay::Response<Data> {
673 self.read_persistent_state_chunk(&req.block_id, req.offset, PersistentStateKind::Queue)
674 .await
675 }
676}
677
678impl<B> Inner<B> {
679 async fn get_block_full(&self, block_id: &BlockId) -> anyhow::Result<BlockFull> {
680 let block_handle_storage = self.storage().block_handle_storage();
681 let block_storage = self.storage().block_storage();
682
683 let handle = match block_handle_storage.load_handle(block_id) {
684 Some(handle) if handle.has_all_block_parts() => handle,
685 _ => return Ok(BlockFull::NotFound),
686 };
687
688 let data = match block_storage
690 .load_block_data_range(&handle, 0, BLOCK_DATA_CHUNK_SIZE as u64)
691 .await?
692 {
693 Some(data) => data,
694 None => return Ok(BlockFull::NotFound),
695 };
696
697 let data_size = if data.len() < BLOCK_DATA_CHUNK_SIZE as usize {
698 data.len() as u32
700 } else {
701 match block_storage.get_compressed_block_data_size(&handle)? {
703 Some(size) => size as u32,
704 None => return Ok(BlockFull::NotFound),
705 }
706 };
707
708 let block = BlockData {
709 data,
710 size: NonZeroU32::new(data_size).expect("shouldn't happen"),
711 chunk_size: NonZeroU32::new(BLOCK_DATA_CHUNK_SIZE).expect("shouldn't happen"),
712 };
713
714 let (proof, queue_diff) = tokio::join!(
715 block_storage.load_block_proof_raw(&handle),
716 block_storage.load_queue_diff_raw(&handle)
717 );
718
719 Ok(BlockFull::Found {
720 block_id: *block_id,
721 block,
722 proof: Bytes::from_owner(proof?),
723 queue_diff: Bytes::from_owner(queue_diff?),
724 })
725 }
726
727 async fn read_persistent_state_info(
728 &self,
729 block_id: &BlockId,
730 state_kind: PersistentStateKind,
731 ) -> anyhow::Result<PersistentStateInfo> {
732 if self.config.serve_persistent_states
733 && let Some(info) = self
734 .rpc_data_provider
735 .get_persistent_state_info(block_id, state_kind)
736 .await?
737 {
738 return Ok(PersistentStateInfo::Found {
739 size: info.size,
740 chunk_size: info.chunk_size,
741 });
742 }
743
744 Ok(PersistentStateInfo::NotFound)
745 }
746
747 async fn read_persistent_state_chunk(
748 &self,
749 block_id: &BlockId,
750 offset: u64,
751 state_kind: PersistentStateKind,
752 ) -> overlay::Response<Data> {
753 let persistent_state_request_validation = || {
754 anyhow::ensure!(
755 self.config.serve_persistent_states,
756 "persistent states are disabled"
757 );
758 Ok::<_, anyhow::Error>(())
759 };
760
761 if let Err(e) = persistent_state_request_validation() {
762 tracing::debug!("persistent state request validation failed: {e:?}");
763 return overlay::Response::Err(BAD_REQUEST_ERROR_CODE);
764 }
765
766 match self
767 .rpc_data_provider
768 .get_persistent_state_chunk(block_id, offset, state_kind)
769 .await
770 {
771 Ok(Some(data)) => overlay::Response::Ok(Data { data }),
772 Ok(None) => overlay::Response::Err(NOT_FOUND_ERROR_CODE),
773 Err(e) => {
774 tracing::warn!(
775 ?block_id,
776 offset,
777 ?state_kind,
778 "get_persistent_state_chunk failed: {e:?}"
779 );
780 overlay::Response::Err(INTERNAL_ERROR_CODE)
781 }
782 }
783 }
784}