1use std::num::{NonZeroU32, NonZeroU64};
2use std::sync::Arc;
3
4use anyhow::Context;
5use bytes::{Buf, Bytes};
6use serde::{Deserialize, Serialize};
7use tycho_block_util::message::validate_external_message;
8use tycho_network::{Response, Service, ServiceRequest, try_handle_prefix};
9use tycho_types::models::BlockId;
10use tycho_util::futures::BoxFutureOrNoop;
11use tycho_util::metrics::HistogramGuard;
12
13use crate::blockchain_rpc::broadcast_listener::{BroadcastListener, NoopBroadcastListener};
14use crate::blockchain_rpc::{BAD_REQUEST_ERROR_CODE, INTERNAL_ERROR_CODE, NOT_FOUND_ERROR_CODE};
15use crate::proto::blockchain::*;
16use crate::proto::overlay;
17use crate::storage::{
18 ArchiveId, BlockConnection, BlockStorage, CoreStorage, KeyBlocksDirection, PersistentStateKind,
19};
20
21const RPC_METHOD_TIMINGS_METRIC: &str = "tycho_blockchain_rpc_method_time";
22
23#[cfg(not(test))]
24const BLOCK_DATA_CHUNK_SIZE: u32 = 1024 * 1024; #[cfg(test)]
26const BLOCK_DATA_CHUNK_SIZE: u32 = 10; #[derive(Debug, Clone, Serialize, Deserialize)]
29#[serde(default)]
30#[non_exhaustive]
31pub struct BlockchainRpcServiceConfig {
32 pub max_key_blocks_list_len: usize,
36
37 pub serve_persistent_states: bool,
41}
42
43impl Default for BlockchainRpcServiceConfig {
44 fn default() -> Self {
45 Self {
46 max_key_blocks_list_len: 8,
47 serve_persistent_states: true,
48 }
49 }
50}
51
52pub struct BlockchainRpcServiceBuilder<MandatoryFields> {
53 config: BlockchainRpcServiceConfig,
54 mandatory_fields: MandatoryFields,
55}
56
57impl<B> BlockchainRpcServiceBuilder<(B, CoreStorage)>
58where
59 B: BroadcastListener,
60{
61 pub fn build(self) -> BlockchainRpcService<B> {
62 let (broadcast_listener, storage) = self.mandatory_fields;
63
64 BlockchainRpcService {
65 inner: Arc::new(Inner {
66 storage,
67 config: self.config,
68 broadcast_listener,
69 }),
70 }
71 }
72}
73
74impl<T1> BlockchainRpcServiceBuilder<(T1, ())> {
75 pub fn with_storage(
76 self,
77 storage: CoreStorage,
78 ) -> BlockchainRpcServiceBuilder<(T1, CoreStorage)> {
79 let (broadcast_listener, _) = self.mandatory_fields;
80
81 BlockchainRpcServiceBuilder {
82 config: self.config,
83 mandatory_fields: (broadcast_listener, storage),
84 }
85 }
86}
87
88impl<T2> BlockchainRpcServiceBuilder<((), T2)> {
89 pub fn with_broadcast_listener<T1>(
90 self,
91 broadcast_listener: T1,
92 ) -> BlockchainRpcServiceBuilder<(T1, T2)>
93 where
94 T1: BroadcastListener,
95 {
96 let (_, storage) = self.mandatory_fields;
97
98 BlockchainRpcServiceBuilder {
99 config: self.config,
100 mandatory_fields: (broadcast_listener, storage),
101 }
102 }
103
104 pub fn without_broadcast_listener(
105 self,
106 ) -> BlockchainRpcServiceBuilder<(NoopBroadcastListener, T2)> {
107 let (_, storage) = self.mandatory_fields;
108
109 BlockchainRpcServiceBuilder {
110 config: self.config,
111 mandatory_fields: (NoopBroadcastListener, storage),
112 }
113 }
114}
115
116impl<T1, T2> BlockchainRpcServiceBuilder<(T1, T2)> {
117 pub fn with_config(self, config: BlockchainRpcServiceConfig) -> Self {
118 Self { config, ..self }
119 }
120}
121
122#[repr(transparent)]
123pub struct BlockchainRpcService<B = NoopBroadcastListener> {
124 inner: Arc<Inner<B>>,
125}
126
127impl<B> Clone for BlockchainRpcService<B> {
128 #[inline]
129 fn clone(&self) -> Self {
130 Self {
131 inner: self.inner.clone(),
132 }
133 }
134}
135
136impl BlockchainRpcService<()> {
137 pub fn builder() -> BlockchainRpcServiceBuilder<((), ())> {
138 BlockchainRpcServiceBuilder {
139 config: Default::default(),
140 mandatory_fields: ((), ()),
141 }
142 }
143}
144
145macro_rules! match_request {
146 ($req_body:expr, $tag:expr, {
147 $(
148 #[meta($name:literal $(, $($args:tt)*)?)]
149 $([$raw:tt])? $ty:path as $pat:pat => $expr:expr
150 ),*$(,)?
151 }, $err:pat => $err_exr:expr) => {
152 '__match_req: {
153 let $err = match $tag {
154 $(<$ty>::TL_ID => match tl_proto::deserialize::<$ty>(&($req_body)) {
155 Ok($pat) => break '__match_req match_request!(
156 @expr
157 { $name $($($args)*)? }
158 { $($raw)? }
159 $expr
160 ),
161 Err(e) => e,
162 })*
163 _ => tl_proto::TlError::UnknownConstructor,
164 };
165 $err_exr
166 }
167 };
168
169 (@expr { $name:literal $($args:tt)* } { $($raw:tt)+ } $expr:expr) => {
171 $expr
172 };
173 (@expr { $name:literal $($args:tt)* } { } $expr:expr) => {{
175 match_request!(@debug $name {} { $($args)* });
176
177 let __started_at = std::time::Instant::now();
178
179 BoxFutureOrNoop::future(async move {
180 scopeguard::defer! {
181 metrics::histogram!(
182 RPC_METHOD_TIMINGS_METRIC,
183 "method" => $name
184 ).record(__started_at.elapsed());
185 }
186
187 Some(Response::from_tl($expr))
188 })
189 }};
190
191 (@debug $name:literal { } { $(,)? }) => {
193 match_request!(@debug_impl $name)
194 };
195 (@debug $name:literal { $($res:tt)+ } { $(,)? }) => {
196 match_request!(@debug_impl $($res)+, $name)
197 };
198 (@debug $name:literal { $($res:tt)* } { $t:tt $($args:tt)* }) => {
199 match_request!(@debug $name { $($res)* $t } { $($args)* })
200 };
201 (@debug_impl $($args:tt)*) => {
202 tracing::debug!($($args)*)
203 };
204}
205
206impl<B: BroadcastListener> Service<ServiceRequest> for BlockchainRpcService<B> {
207 type QueryResponse = Response;
208 type OnQueryFuture = BoxFutureOrNoop<Option<Self::QueryResponse>>;
209 type OnMessageFuture = BoxFutureOrNoop<()>;
210
211 #[tracing::instrument(level = "debug", name = "on_blockchain_service_query", skip_all)]
212 fn on_query(&self, req: ServiceRequest) -> Self::OnQueryFuture {
213 let (constructor, body) = match try_handle_prefix(&req) {
214 Ok(rest) => rest,
215 Err(e) => {
216 tracing::debug!("failed to deserialize query: {e}");
217 return BoxFutureOrNoop::Noop;
218 }
219 };
220
221 let inner = self.inner.clone();
222
223 match_request!(body, constructor, {
225 #[meta("ping")]
226 [raw] overlay::Ping as _ => BoxFutureOrNoop::future(async {
227 Some(Response::from_tl(overlay::Pong))
228 }),
229
230 #[meta(
231 "getNextKeyBlockIds",
232 block_id = %req.block_id,
233 max_size = req.max_size,
234 )]
235 rpc::GetNextKeyBlockIds as req => inner.handle_get_next_key_block_ids(&req),
236
237 #[meta("getBlockFull", block_id = %req.block_id)]
238 rpc::GetBlockFull as req => inner.handle_get_block_full(&req).await,
239
240 #[meta("getNextBlockFull", prev_block_id = %req.prev_block_id)]
241 rpc::GetNextBlockFull as req => inner.handle_get_next_block_full(&req).await,
242
243 #[meta(
244 "getBlockDataChunk",
245 block_id = %req.block_id,
246 offset = %req.offset,
247 )]
248 rpc::GetBlockDataChunk as req => inner.handle_get_block_data_chunk(&req).await,
249
250 #[meta("getKeyBlockProof", block_id = %req.block_id)]
251 rpc::GetKeyBlockProof as req => inner.handle_get_key_block_proof(&req).await,
252
253 #[meta("getZerostateProof")]
254 rpc::GetZerostateProof as _ => inner.handle_get_zerostate_proof().await,
255
256 #[meta("getPersistentShardStateInfo", block_id = %req.block_id)]
257 rpc::GetPersistentShardStateInfo as req => inner.handle_get_persistent_state_info(&req),
258
259 #[meta("getPersistentQueueStateInfo", block_id = %req.block_id)]
260 rpc::GetPersistentQueueStateInfo as req => inner.handle_get_queue_persistent_state_info(&req),
261
262 #[meta(
263 "getPersistentShardStateChunk",
264 block_id = %req.block_id,
265 offset = %req.offset,
266 )]
267 rpc::GetPersistentShardStateChunk as req => {
268 inner.handle_get_persistent_shard_state_chunk(&req).await
269 },
270
271 #[meta(
272 "getPersistentQueueStateChunk",
273 block_id = %req.block_id,
274 offset = %req.offset,
275 )]
276 rpc::GetPersistentQueueStateChunk as req => {
277 inner.handle_get_persistent_queue_state_chunk(&req).await
278 },
279
280 #[meta("getArchiveInfo", mc_seqno = %req.mc_seqno)]
281 rpc::GetArchiveInfo as req => inner.handle_get_archive_info(&req).await,
282
283 #[meta(
284 "getArchiveChunk",
285 archive_id = %req.archive_id,
286 offset = %req.offset,
287 )]
288 rpc::GetArchiveChunk as req => inner.handle_get_archive_chunk(&req).await,
289 }, e => {
290 tracing::debug!("failed to deserialize query: {e}");
291 BoxFutureOrNoop::Noop
292 })
293 }
294
295 #[tracing::instrument(level = "debug", name = "on_blockchain_service_message", skip_all)]
296 fn on_message(&self, mut req: ServiceRequest) -> Self::OnMessageFuture {
297 use tl_proto::{BytesMeta, TlRead};
298
299 if req.body.len() < 8 {
303 return BoxFutureOrNoop::Noop;
304 }
305
306 if req.body.get_u32_le() != overlay::BroadcastPrefix::TL_ID {
308 return BoxFutureOrNoop::Noop;
309 }
310
311 match req.body.get_u32_le() {
313 MessageBroadcastRef::TL_ID => {
314 match BytesMeta::read_from(&mut req.body.as_ref()) {
315 Ok(meta) if req.body.len() == meta.prefix_len + meta.len + meta.padding => {
317 req.body.advance(meta.prefix_len);
318 req.body.truncate(meta.len);
319 }
320 Ok(_) => {
321 tracing::debug!("malformed external message broadcast");
322 return BoxFutureOrNoop::Noop;
323 }
324 Err(e) => {
325 tracing::debug!("failed to deserialize external message broadcast: {e:?}");
326 return BoxFutureOrNoop::Noop;
327 }
328 }
329
330 let inner = self.inner.clone();
331 metrics::counter!("tycho_rpc_broadcast_external_message_rx_bytes_total")
332 .increment(req.body.len() as u64);
333 BoxFutureOrNoop::future(async move {
334 if let Err(e) = validate_external_message(&req.body).await {
335 tracing::debug!("invalid external message: {e:?}");
336 return;
337 }
338
339 inner
340 .broadcast_listener
341 .handle_message(req.metadata, req.body)
342 .await;
343 })
344 }
345 constructor => {
346 tracing::debug!("unknown broadcast constructor: {constructor:08x}");
347 BoxFutureOrNoop::Noop
348 }
349 }
350 }
351}
352
353struct Inner<B> {
354 storage: CoreStorage,
355 config: BlockchainRpcServiceConfig,
356 broadcast_listener: B,
357}
358
359impl<B> Inner<B> {
360 fn storage(&self) -> &CoreStorage {
361 &self.storage
362 }
363
364 fn handle_get_next_key_block_ids(
365 &self,
366 req: &rpc::GetNextKeyBlockIds,
367 ) -> overlay::Response<KeyBlockIds> {
368 let block_handle_storage = self.storage().block_handle_storage();
369
370 let limit = std::cmp::min(req.max_size as usize, self.config.max_key_blocks_list_len);
371
372 let get_next_key_block_ids = || {
373 if !req.block_id.shard.is_masterchain() {
374 anyhow::bail!("first block id is not from masterchain");
375 }
376
377 let mut iterator = block_handle_storage
378 .key_blocks_iterator(KeyBlocksDirection::ForwardFrom(req.block_id.seqno))
379 .take(limit + 1);
380
381 if let Some(id) = iterator.next() {
382 anyhow::ensure!(
383 id.root_hash == req.block_id.root_hash,
384 "first block root hash mismatch"
385 );
386 anyhow::ensure!(
387 id.file_hash == req.block_id.file_hash,
388 "first block file hash mismatch"
389 );
390 }
391
392 Ok::<_, anyhow::Error>(iterator.take(limit).collect::<Vec<_>>())
393 };
394
395 match get_next_key_block_ids() {
396 Ok(ids) => {
397 let incomplete = ids.len() < limit;
398 overlay::Response::Ok(KeyBlockIds {
399 block_ids: ids,
400 incomplete,
401 })
402 }
403 Err(e) => {
404 tracing::warn!("get_next_key_block_ids failed: {e:?}");
405 overlay::Response::Err(INTERNAL_ERROR_CODE)
406 }
407 }
408 }
409
410 async fn handle_get_block_full(&self, req: &rpc::GetBlockFull) -> overlay::Response<BlockFull> {
411 match self.get_block_full(&req.block_id).await {
412 Ok(block_full) => overlay::Response::Ok(block_full),
413 Err(e) => {
414 tracing::warn!("get_block_full failed: {e:?}");
415 overlay::Response::Err(INTERNAL_ERROR_CODE)
416 }
417 }
418 }
419
420 async fn handle_get_next_block_full(
421 &self,
422 req: &rpc::GetNextBlockFull,
423 ) -> overlay::Response<BlockFull> {
424 let block_handle_storage = self.storage().block_handle_storage();
425 let block_connection_storage = self.storage().block_connection_storage();
426
427 let get_next_block_full = async {
428 let next_block_id = match block_handle_storage.load_handle(&req.prev_block_id) {
429 Some(handle) if handle.has_next1() => block_connection_storage
430 .load_connection(&req.prev_block_id, BlockConnection::Next1)
431 .context("connection not found")?,
432 _ => return Ok(BlockFull::NotFound),
433 };
434
435 self.get_block_full(&next_block_id).await
436 };
437
438 match get_next_block_full.await {
439 Ok(block_full) => overlay::Response::Ok(block_full),
440 Err(e) => {
441 tracing::warn!("get_next_block_full failed: {e:?}");
442 overlay::Response::Err(INTERNAL_ERROR_CODE)
443 }
444 }
445 }
446
447 async fn handle_get_block_data_chunk(
448 &self,
449 req: &rpc::GetBlockDataChunk,
450 ) -> overlay::Response<Data> {
451 let block_handle_storage = self.storage().block_handle_storage();
452 let block_storage = self.storage().block_storage();
453
454 let handle = match block_handle_storage.load_handle(&req.block_id) {
455 Some(handle) if handle.has_data() => handle,
456 _ => {
457 tracing::debug!("block data not found for chunked read");
458 return overlay::Response::Err(NOT_FOUND_ERROR_CODE);
459 }
460 };
461
462 let offset = req.offset as u64;
463 match block_storage
464 .load_block_data_range(&handle, offset, BLOCK_DATA_CHUNK_SIZE as u64)
465 .await
466 {
467 Ok(Some(data)) => overlay::Response::Ok(Data { data }),
468 Ok(None) => {
469 tracing::debug!("block data chunk not found at offset {}", offset);
470 overlay::Response::Err(NOT_FOUND_ERROR_CODE)
471 }
472 Err(e) => {
473 tracing::warn!("get_block_data_chunk failed: {e:?}");
474 overlay::Response::Err(INTERNAL_ERROR_CODE)
475 }
476 }
477 }
478
479 async fn handle_get_key_block_proof(
480 &self,
481 req: &rpc::GetKeyBlockProof,
482 ) -> overlay::Response<KeyBlockProof> {
483 let block_handle_storage = self.storage().block_handle_storage();
484 let block_storage = self.storage().block_storage();
485
486 let get_key_block_proof = async {
487 match block_handle_storage.load_handle(&req.block_id) {
488 Some(handle) if handle.has_proof() => {
489 let data = block_storage.load_block_proof_raw(&handle).await?;
490 Ok::<_, anyhow::Error>(KeyBlockProof::Found {
491 proof: Bytes::from_owner(data),
492 })
493 }
494 _ => Ok(KeyBlockProof::NotFound),
495 }
496 };
497
498 match get_key_block_proof.await {
499 Ok(key_block_proof) => overlay::Response::Ok(key_block_proof),
500 Err(e) => {
501 tracing::warn!("get_key_block_proof failed: {e:?}");
502 overlay::Response::Err(INTERNAL_ERROR_CODE)
503 }
504 }
505 }
506
507 async fn handle_get_zerostate_proof(&self) -> overlay::Response<ZerostateProof> {
508 let storage = self.storage().node_state();
509 let proof_opt = storage.load_zerostate_proof_bytes();
510 let result = match proof_opt {
511 Some(proof) => ZerostateProof::Found { proof },
512 None => ZerostateProof::NotFound,
513 };
514
515 overlay::Response::Ok(result)
516 }
517
518 async fn handle_get_archive_info(
519 &self,
520 req: &rpc::GetArchiveInfo,
521 ) -> overlay::Response<ArchiveInfo> {
522 let mc_seqno = req.mc_seqno;
523 let node_state = self.storage.node_state();
524
525 match node_state.load_last_mc_block_id() {
526 Some(last_applied_mc_block) => {
527 if mc_seqno > last_applied_mc_block.seqno {
528 return overlay::Response::Ok(ArchiveInfo::TooNew);
529 }
530
531 let block_storage = self.storage().block_storage();
532
533 let id = block_storage.get_archive_id(mc_seqno);
534 let size_res = match id {
535 ArchiveId::Found(id) => block_storage.get_archive_size(id),
536 ArchiveId::TooNew | ArchiveId::NotFound => Ok(None),
537 };
538
539 overlay::Response::Ok(match (id, size_res) {
540 (ArchiveId::Found(id), Ok(Some(size))) if size > 0 => ArchiveInfo::Found {
541 id: id as u64,
542 size: NonZeroU64::new(size as _).unwrap(),
543 chunk_size: BlockStorage::DEFAULT_BLOB_CHUNK_SIZE,
544 },
545 (ArchiveId::Found(_) | ArchiveId::TooNew, Ok(None)) => ArchiveInfo::TooNew,
546 _ => ArchiveInfo::NotFound,
547 })
548 }
549 None => {
550 tracing::warn!("get_archive_id failed: no blocks applied");
551 overlay::Response::Err(INTERNAL_ERROR_CODE)
552 }
553 }
554 }
555
556 async fn handle_get_archive_chunk(
557 &self,
558 req: &rpc::GetArchiveChunk,
559 ) -> overlay::Response<Data> {
560 let block_storage = self.storage.block_storage();
561
562 let get_archive_chunk = || async {
563 let archive_slice = block_storage
564 .get_archive_chunk(req.archive_id as u32, req.offset)
565 .await?;
566 Ok::<_, anyhow::Error>(archive_slice)
567 };
568
569 match get_archive_chunk().await {
570 Ok(data) => overlay::Response::Ok(Data { data }),
571 Err(e) => {
572 tracing::warn!("get_archive_chunk failed: {e:?}");
573 overlay::Response::Err(INTERNAL_ERROR_CODE)
574 }
575 }
576 }
577
578 fn handle_get_persistent_state_info(
579 &self,
580 req: &rpc::GetPersistentShardStateInfo,
581 ) -> overlay::Response<PersistentStateInfo> {
582 let label = [("method", "getPersistentShardStateInfo")];
583 let _hist = HistogramGuard::begin_with_labels(RPC_METHOD_TIMINGS_METRIC, &label);
584 let res = self.read_persistent_state_info(&req.block_id, PersistentStateKind::Shard);
585 overlay::Response::Ok(res)
586 }
587
588 fn handle_get_queue_persistent_state_info(
589 &self,
590 req: &rpc::GetPersistentQueueStateInfo,
591 ) -> overlay::Response<PersistentStateInfo> {
592 let res = self.read_persistent_state_info(&req.block_id, PersistentStateKind::Queue);
593 overlay::Response::Ok(res)
594 }
595
596 async fn handle_get_persistent_shard_state_chunk(
597 &self,
598 req: &rpc::GetPersistentShardStateChunk,
599 ) -> overlay::Response<Data> {
600 self.read_persistent_state_chunk(&req.block_id, req.offset, PersistentStateKind::Shard)
601 .await
602 }
603
604 async fn handle_get_persistent_queue_state_chunk(
605 &self,
606 req: &rpc::GetPersistentQueueStateChunk,
607 ) -> overlay::Response<Data> {
608 self.read_persistent_state_chunk(&req.block_id, req.offset, PersistentStateKind::Queue)
609 .await
610 }
611}
612
613impl<B> Inner<B> {
614 async fn get_block_full(&self, block_id: &BlockId) -> anyhow::Result<BlockFull> {
615 let block_handle_storage = self.storage().block_handle_storage();
616 let block_storage = self.storage().block_storage();
617
618 let handle = match block_handle_storage.load_handle(block_id) {
619 Some(handle) if handle.has_all_block_parts() => handle,
620 _ => return Ok(BlockFull::NotFound),
621 };
622
623 let data = match block_storage
625 .load_block_data_range(&handle, 0, BLOCK_DATA_CHUNK_SIZE as u64)
626 .await?
627 {
628 Some(data) => data,
629 None => return Ok(BlockFull::NotFound),
630 };
631
632 let data_size = if data.len() < BLOCK_DATA_CHUNK_SIZE as usize {
633 data.len() as u32
635 } else {
636 match block_storage.get_compressed_block_data_size(&handle)? {
638 Some(size) => size as u32,
639 None => return Ok(BlockFull::NotFound),
640 }
641 };
642
643 let block = BlockData {
644 data,
645 size: NonZeroU32::new(data_size).expect("shouldn't happen"),
646 chunk_size: NonZeroU32::new(BLOCK_DATA_CHUNK_SIZE).expect("shouldn't happen"),
647 };
648
649 let (proof, queue_diff) = tokio::join!(
650 block_storage.load_block_proof_raw(&handle),
651 block_storage.load_queue_diff_raw(&handle)
652 );
653
654 Ok(BlockFull::Found {
655 block_id: *block_id,
656 block,
657 proof: Bytes::from_owner(proof?),
658 queue_diff: Bytes::from_owner(queue_diff?),
659 })
660 }
661
662 fn read_persistent_state_info(
663 &self,
664 block_id: &BlockId,
665 state_kind: PersistentStateKind,
666 ) -> PersistentStateInfo {
667 let persistent_state_storage = self.storage().persistent_state_storage();
668 if self.config.serve_persistent_states
669 && let Some(info) = persistent_state_storage.get_state_info(block_id, state_kind)
670 {
671 return PersistentStateInfo::Found {
672 size: info.size,
673 chunk_size: info.chunk_size,
674 };
675 }
676 PersistentStateInfo::NotFound
677 }
678
679 async fn read_persistent_state_chunk(
680 &self,
681 block_id: &BlockId,
682 offset: u64,
683 state_kind: PersistentStateKind,
684 ) -> overlay::Response<Data> {
685 let persistent_state_storage = self.storage().persistent_state_storage();
686
687 let persistent_state_request_validation = || {
688 anyhow::ensure!(
689 self.config.serve_persistent_states,
690 "persistent states are disabled"
691 );
692 Ok::<_, anyhow::Error>(())
693 };
694
695 if let Err(e) = persistent_state_request_validation() {
696 tracing::debug!("persistent state request validation failed: {e:?}");
697 return overlay::Response::Err(BAD_REQUEST_ERROR_CODE);
698 }
699
700 match persistent_state_storage
701 .read_state_part(block_id, offset, state_kind)
702 .await
703 {
704 Some(data) => overlay::Response::Ok(Data { data: data.into() }),
705 None => {
706 tracing::debug!("failed to read persistent state part");
707 overlay::Response::Err(NOT_FOUND_ERROR_CODE)
708 }
709 }
710 }
711}