1use std::num::{NonZeroU32, NonZeroU64};
2use std::sync::Arc;
3
4use anyhow::Context;
5use bytes::{Buf, Bytes};
6use serde::{Deserialize, Serialize};
7use tycho_block_util::message::validate_external_message;
8use tycho_network::{Response, Service, ServiceRequest, try_handle_prefix};
9use tycho_types::models::BlockId;
10use tycho_util::futures::BoxFutureOrNoop;
11use tycho_util::metrics::HistogramGuard;
12
13use crate::blockchain_rpc::broadcast_listener::{BroadcastListener, NoopBroadcastListener};
14use crate::blockchain_rpc::{BAD_REQUEST_ERROR_CODE, INTERNAL_ERROR_CODE, NOT_FOUND_ERROR_CODE};
15use crate::proto::blockchain::*;
16use crate::proto::overlay;
17use crate::storage::{
18 ArchiveId, BlockConnection, BlockStorage, CoreStorage, KeyBlocksDirection, PersistentStateKind,
19};
20
21const RPC_METHOD_TIMINGS_METRIC: &str = "tycho_blockchain_rpc_method_time";
22
23#[cfg(not(test))]
24const BLOCK_DATA_CHUNK_SIZE: u32 = 1024 * 1024; #[cfg(test)]
26const BLOCK_DATA_CHUNK_SIZE: u32 = 10; #[derive(Debug, Clone, Serialize, Deserialize)]
29#[serde(default)]
30#[non_exhaustive]
31pub struct BlockchainRpcServiceConfig {
32 pub max_key_blocks_list_len: usize,
36
37 pub serve_persistent_states: bool,
41}
42
43impl Default for BlockchainRpcServiceConfig {
44 fn default() -> Self {
45 Self {
46 max_key_blocks_list_len: 8,
47 serve_persistent_states: true,
48 }
49 }
50}
51
52pub struct BlockchainRpcServiceBuilder<MandatoryFields> {
53 config: BlockchainRpcServiceConfig,
54 mandatory_fields: MandatoryFields,
55}
56
57impl<B> BlockchainRpcServiceBuilder<(B, CoreStorage)>
58where
59 B: BroadcastListener,
60{
61 pub fn build(self) -> BlockchainRpcService<B> {
62 let (broadcast_listener, storage) = self.mandatory_fields;
63
64 BlockchainRpcService {
65 inner: Arc::new(Inner {
66 storage,
67 config: self.config,
68 broadcast_listener,
69 }),
70 }
71 }
72}
73
74impl<T1> BlockchainRpcServiceBuilder<(T1, ())> {
75 pub fn with_storage(
76 self,
77 storage: CoreStorage,
78 ) -> BlockchainRpcServiceBuilder<(T1, CoreStorage)> {
79 let (broadcast_listener, _) = self.mandatory_fields;
80
81 BlockchainRpcServiceBuilder {
82 config: self.config,
83 mandatory_fields: (broadcast_listener, storage),
84 }
85 }
86}
87
88impl<T2> BlockchainRpcServiceBuilder<((), T2)> {
89 pub fn with_broadcast_listener<T1>(
90 self,
91 broadcast_listener: T1,
92 ) -> BlockchainRpcServiceBuilder<(T1, T2)>
93 where
94 T1: BroadcastListener,
95 {
96 let (_, storage) = self.mandatory_fields;
97
98 BlockchainRpcServiceBuilder {
99 config: self.config,
100 mandatory_fields: (broadcast_listener, storage),
101 }
102 }
103
104 pub fn without_broadcast_listener(
105 self,
106 ) -> BlockchainRpcServiceBuilder<(NoopBroadcastListener, T2)> {
107 let (_, storage) = self.mandatory_fields;
108
109 BlockchainRpcServiceBuilder {
110 config: self.config,
111 mandatory_fields: (NoopBroadcastListener, storage),
112 }
113 }
114}
115
116impl<T1, T2> BlockchainRpcServiceBuilder<(T1, T2)> {
117 pub fn with_config(self, config: BlockchainRpcServiceConfig) -> Self {
118 Self { config, ..self }
119 }
120}
121
122#[repr(transparent)]
123pub struct BlockchainRpcService<B = NoopBroadcastListener> {
124 inner: Arc<Inner<B>>,
125}
126
127impl<B> Clone for BlockchainRpcService<B> {
128 #[inline]
129 fn clone(&self) -> Self {
130 Self {
131 inner: self.inner.clone(),
132 }
133 }
134}
135
136impl BlockchainRpcService<()> {
137 pub fn builder() -> BlockchainRpcServiceBuilder<((), ())> {
138 BlockchainRpcServiceBuilder {
139 config: Default::default(),
140 mandatory_fields: ((), ()),
141 }
142 }
143}
144
145macro_rules! match_request {
146 ($req_body:expr, $tag:expr, {
147 $(
148 #[meta($name:literal $(, $($args:tt)*)?)]
149 $([$raw:tt])? $ty:path as $pat:pat => $expr:expr
150 ),*$(,)?
151 }, $err:pat => $err_exr:expr) => {
152 '__match_req: {
153 let $err = match $tag {
154 $(<$ty>::TL_ID => match tl_proto::deserialize::<$ty>(&($req_body)) {
155 Ok($pat) => break '__match_req match_request!(
156 @expr
157 { $name $($($args)*)? }
158 { $($raw)? }
159 $expr
160 ),
161 Err(e) => e,
162 })*
163 _ => tl_proto::TlError::UnknownConstructor,
164 };
165 $err_exr
166 }
167 };
168
169 (@expr { $name:literal $($args:tt)* } { $($raw:tt)+ } $expr:expr) => {
171 $expr
172 };
173 (@expr { $name:literal $($args:tt)* } { } $expr:expr) => {{
175 match_request!(@debug $name {} { $($args)* });
176
177 let __started_at = std::time::Instant::now();
178
179 BoxFutureOrNoop::future(async move {
180 scopeguard::defer! {
181 metrics::histogram!(
182 RPC_METHOD_TIMINGS_METRIC,
183 "method" => $name
184 ).record(__started_at.elapsed());
185 }
186
187 Some(Response::from_tl($expr))
188 })
189 }};
190
191 (@debug $name:literal { } { $(,)? }) => {
193 match_request!(@debug_impl $name)
194 };
195 (@debug $name:literal { $($res:tt)+ } { $(,)? }) => {
196 match_request!(@debug_impl $($res)+, $name)
197 };
198 (@debug $name:literal { $($res:tt)* } { $t:tt $($args:tt)* }) => {
199 match_request!(@debug $name { $($res)* $t } { $($args)* })
200 };
201 (@debug_impl $($args:tt)*) => {
202 tracing::debug!($($args)*)
203 };
204}
205
206impl<B: BroadcastListener> Service<ServiceRequest> for BlockchainRpcService<B> {
207 type QueryResponse = Response;
208 type OnQueryFuture = BoxFutureOrNoop<Option<Self::QueryResponse>>;
209 type OnMessageFuture = BoxFutureOrNoop<()>;
210
211 #[tracing::instrument(level = "debug", name = "on_blockchain_service_query", skip_all)]
212 fn on_query(&self, req: ServiceRequest) -> Self::OnQueryFuture {
213 let (constructor, body) = match try_handle_prefix(&req) {
214 Ok(rest) => rest,
215 Err(e) => {
216 tracing::debug!("failed to deserialize query: {e}");
217 return BoxFutureOrNoop::Noop;
218 }
219 };
220
221 let inner = self.inner.clone();
222
223 match_request!(body, constructor, {
225 #[meta("ping")]
226 [raw] overlay::Ping as _ => BoxFutureOrNoop::future(async {
227 Some(Response::from_tl(overlay::Pong))
228 }),
229
230 #[meta(
231 "getNextKeyBlockIds",
232 block_id = %req.block_id,
233 max_size = req.max_size,
234 )]
235 rpc::GetNextKeyBlockIds as req => inner.handle_get_next_key_block_ids(&req),
236
237 #[meta("getBlockFull", block_id = %req.block_id)]
238 rpc::GetBlockFull as req => inner.handle_get_block_full(&req).await,
239
240 #[meta("getNextBlockFull", prev_block_id = %req.prev_block_id)]
241 rpc::GetNextBlockFull as req => inner.handle_get_next_block_full(&req).await,
242
243 #[meta(
244 "getBlockDataChunk",
245 block_id = %req.block_id,
246 offset = %req.offset,
247 )]
248 rpc::GetBlockDataChunk as req => inner.handle_get_block_data_chunk(&req).await,
249
250 #[meta("getKeyBlockProof", block_id = %req.block_id)]
251 rpc::GetKeyBlockProof as req => inner.handle_get_key_block_proof(&req).await,
252
253 #[meta("getPersistentShardStateInfo", block_id = %req.block_id)]
254 rpc::GetPersistentShardStateInfo as req => inner.handle_get_persistent_state_info(&req),
255
256 #[meta("getPersistentQueueStateInfo", block_id = %req.block_id)]
257 rpc::GetPersistentQueueStateInfo as req => inner.handle_get_queue_persistent_state_info(&req),
258
259 #[meta(
260 "getPersistentShardStateChunk",
261 block_id = %req.block_id,
262 offset = %req.offset,
263 )]
264 rpc::GetPersistentShardStateChunk as req => {
265 inner.handle_get_persistent_shard_state_chunk(&req).await
266 },
267
268 #[meta(
269 "getPersistentQueueStateChunk",
270 block_id = %req.block_id,
271 offset = %req.offset,
272 )]
273 rpc::GetPersistentQueueStateChunk as req => {
274 inner.handle_get_persistent_queue_state_chunk(&req).await
275 },
276
277 #[meta("getArchiveInfo", mc_seqno = %req.mc_seqno)]
278 rpc::GetArchiveInfo as req => inner.handle_get_archive_info(&req).await,
279
280 #[meta(
281 "getArchiveChunk",
282 archive_id = %req.archive_id,
283 offset = %req.offset,
284 )]
285 rpc::GetArchiveChunk as req => inner.handle_get_archive_chunk(&req).await,
286 }, e => {
287 tracing::debug!("failed to deserialize query: {e}");
288 BoxFutureOrNoop::Noop
289 })
290 }
291
292 #[tracing::instrument(level = "debug", name = "on_blockchain_service_message", skip_all)]
293 fn on_message(&self, mut req: ServiceRequest) -> Self::OnMessageFuture {
294 use tl_proto::{BytesMeta, TlRead};
295
296 if req.body.len() < 8 {
300 return BoxFutureOrNoop::Noop;
301 }
302
303 if req.body.get_u32_le() != overlay::BroadcastPrefix::TL_ID {
305 return BoxFutureOrNoop::Noop;
306 }
307
308 match req.body.get_u32_le() {
310 MessageBroadcastRef::TL_ID => {
311 match BytesMeta::read_from(&mut req.body.as_ref()) {
312 Ok(meta) if req.body.len() == meta.prefix_len + meta.len + meta.padding => {
314 req.body.advance(meta.prefix_len);
315 req.body.truncate(meta.len);
316 }
317 Ok(_) => {
318 tracing::debug!("malformed external message broadcast");
319 return BoxFutureOrNoop::Noop;
320 }
321 Err(e) => {
322 tracing::debug!("failed to deserialize external message broadcast: {e:?}");
323 return BoxFutureOrNoop::Noop;
324 }
325 }
326
327 let inner = self.inner.clone();
328 metrics::counter!("tycho_rpc_broadcast_external_message_rx_bytes_total")
329 .increment(req.body.len() as u64);
330 BoxFutureOrNoop::future(async move {
331 if let Err(e) = validate_external_message(&req.body).await {
332 tracing::debug!("invalid external message: {e:?}");
333 return;
334 }
335
336 inner
337 .broadcast_listener
338 .handle_message(req.metadata, req.body)
339 .await;
340 })
341 }
342 constructor => {
343 tracing::debug!("unknown broadcast constructor: {constructor:08x}");
344 BoxFutureOrNoop::Noop
345 }
346 }
347 }
348}
349
350struct Inner<B> {
351 storage: CoreStorage,
352 config: BlockchainRpcServiceConfig,
353 broadcast_listener: B,
354}
355
356impl<B> Inner<B> {
357 fn storage(&self) -> &CoreStorage {
358 &self.storage
359 }
360
361 fn handle_get_next_key_block_ids(
362 &self,
363 req: &rpc::GetNextKeyBlockIds,
364 ) -> overlay::Response<KeyBlockIds> {
365 let block_handle_storage = self.storage().block_handle_storage();
366
367 let limit = std::cmp::min(req.max_size as usize, self.config.max_key_blocks_list_len);
368
369 let get_next_key_block_ids = || {
370 if !req.block_id.shard.is_masterchain() {
371 anyhow::bail!("first block id is not from masterchain");
372 }
373
374 let mut iterator = block_handle_storage
375 .key_blocks_iterator(KeyBlocksDirection::ForwardFrom(req.block_id.seqno))
376 .take(limit + 1);
377
378 if let Some(id) = iterator.next() {
379 anyhow::ensure!(
380 id.root_hash == req.block_id.root_hash,
381 "first block root hash mismatch"
382 );
383 anyhow::ensure!(
384 id.file_hash == req.block_id.file_hash,
385 "first block file hash mismatch"
386 );
387 }
388
389 Ok::<_, anyhow::Error>(iterator.take(limit).collect::<Vec<_>>())
390 };
391
392 match get_next_key_block_ids() {
393 Ok(ids) => {
394 let incomplete = ids.len() < limit;
395 overlay::Response::Ok(KeyBlockIds {
396 block_ids: ids,
397 incomplete,
398 })
399 }
400 Err(e) => {
401 tracing::warn!("get_next_key_block_ids failed: {e:?}");
402 overlay::Response::Err(INTERNAL_ERROR_CODE)
403 }
404 }
405 }
406
407 async fn handle_get_block_full(&self, req: &rpc::GetBlockFull) -> overlay::Response<BlockFull> {
408 match self.get_block_full(&req.block_id).await {
409 Ok(block_full) => overlay::Response::Ok(block_full),
410 Err(e) => {
411 tracing::warn!("get_block_full failed: {e:?}");
412 overlay::Response::Err(INTERNAL_ERROR_CODE)
413 }
414 }
415 }
416
417 async fn handle_get_next_block_full(
418 &self,
419 req: &rpc::GetNextBlockFull,
420 ) -> overlay::Response<BlockFull> {
421 let block_handle_storage = self.storage().block_handle_storage();
422 let block_connection_storage = self.storage().block_connection_storage();
423
424 let get_next_block_full = async {
425 let next_block_id = match block_handle_storage.load_handle(&req.prev_block_id) {
426 Some(handle) if handle.has_next1() => block_connection_storage
427 .load_connection(&req.prev_block_id, BlockConnection::Next1)
428 .context("connection not found")?,
429 _ => return Ok(BlockFull::NotFound),
430 };
431
432 self.get_block_full(&next_block_id).await
433 };
434
435 match get_next_block_full.await {
436 Ok(block_full) => overlay::Response::Ok(block_full),
437 Err(e) => {
438 tracing::warn!("get_next_block_full failed: {e:?}");
439 overlay::Response::Err(INTERNAL_ERROR_CODE)
440 }
441 }
442 }
443
444 async fn handle_get_block_data_chunk(
445 &self,
446 req: &rpc::GetBlockDataChunk,
447 ) -> overlay::Response<Data> {
448 let block_handle_storage = self.storage().block_handle_storage();
449 let block_storage = self.storage().block_storage();
450
451 let handle = match block_handle_storage.load_handle(&req.block_id) {
452 Some(handle) if handle.has_data() => handle,
453 _ => {
454 tracing::debug!("block data not found for chunked read");
455 return overlay::Response::Err(NOT_FOUND_ERROR_CODE);
456 }
457 };
458
459 let offset = req.offset as u64;
460 match block_storage
461 .load_block_data_range(&handle, offset, BLOCK_DATA_CHUNK_SIZE as u64)
462 .await
463 {
464 Ok(Some(data)) => overlay::Response::Ok(Data { data }),
465 Ok(None) => {
466 tracing::debug!("block data chunk not found at offset {}", offset);
467 overlay::Response::Err(NOT_FOUND_ERROR_CODE)
468 }
469 Err(e) => {
470 tracing::warn!("get_block_data_chunk failed: {e:?}");
471 overlay::Response::Err(INTERNAL_ERROR_CODE)
472 }
473 }
474 }
475
476 async fn handle_get_key_block_proof(
477 &self,
478 req: &rpc::GetKeyBlockProof,
479 ) -> overlay::Response<KeyBlockProof> {
480 let block_handle_storage = self.storage().block_handle_storage();
481 let block_storage = self.storage().block_storage();
482
483 let get_key_block_proof = async {
484 match block_handle_storage.load_handle(&req.block_id) {
485 Some(handle) if handle.has_proof() => {
486 let data = block_storage.load_block_proof_raw(&handle).await?;
487 Ok::<_, anyhow::Error>(KeyBlockProof::Found {
488 proof: Bytes::from_owner(data),
489 })
490 }
491 _ => Ok(KeyBlockProof::NotFound),
492 }
493 };
494
495 match get_key_block_proof.await {
496 Ok(key_block_proof) => overlay::Response::Ok(key_block_proof),
497 Err(e) => {
498 tracing::warn!("get_key_block_proof failed: {e:?}");
499 overlay::Response::Err(INTERNAL_ERROR_CODE)
500 }
501 }
502 }
503
504 async fn handle_get_archive_info(
505 &self,
506 req: &rpc::GetArchiveInfo,
507 ) -> overlay::Response<ArchiveInfo> {
508 let mc_seqno = req.mc_seqno;
509 let node_state = self.storage.node_state();
510
511 match node_state.load_last_mc_block_id() {
512 Some(last_applied_mc_block) => {
513 if mc_seqno > last_applied_mc_block.seqno {
514 return overlay::Response::Ok(ArchiveInfo::TooNew);
515 }
516
517 let block_storage = self.storage().block_storage();
518
519 let id = block_storage.get_archive_id(mc_seqno);
520 let size_res = match id {
521 ArchiveId::Found(id) => block_storage.get_archive_size(id),
522 ArchiveId::TooNew | ArchiveId::NotFound => Ok(None),
523 };
524
525 overlay::Response::Ok(match (id, size_res) {
526 (ArchiveId::Found(id), Ok(Some(size))) if size > 0 => ArchiveInfo::Found {
527 id: id as u64,
528 size: NonZeroU64::new(size as _).unwrap(),
529 chunk_size: BlockStorage::DEFAULT_BLOB_CHUNK_SIZE,
530 },
531 (ArchiveId::TooNew, Ok(None)) => ArchiveInfo::TooNew,
532 _ => ArchiveInfo::NotFound,
533 })
534 }
535 None => {
536 tracing::warn!("get_archive_id failed: no blocks applied");
537 overlay::Response::Err(INTERNAL_ERROR_CODE)
538 }
539 }
540 }
541
542 async fn handle_get_archive_chunk(
543 &self,
544 req: &rpc::GetArchiveChunk,
545 ) -> overlay::Response<Data> {
546 let block_storage = self.storage.block_storage();
547
548 let get_archive_chunk = || async {
549 let archive_slice = block_storage
550 .get_archive_chunk(req.archive_id as u32, req.offset)
551 .await?;
552 Ok::<_, anyhow::Error>(archive_slice)
553 };
554
555 match get_archive_chunk().await {
556 Ok(data) => overlay::Response::Ok(Data { data }),
557 Err(e) => {
558 tracing::warn!("get_archive_chunk failed: {e:?}");
559 overlay::Response::Err(INTERNAL_ERROR_CODE)
560 }
561 }
562 }
563
564 fn handle_get_persistent_state_info(
565 &self,
566 req: &rpc::GetPersistentShardStateInfo,
567 ) -> overlay::Response<PersistentStateInfo> {
568 let label = [("method", "getPersistentShardStateInfo")];
569 let _hist = HistogramGuard::begin_with_labels(RPC_METHOD_TIMINGS_METRIC, &label);
570 let res = self.read_persistent_state_info(&req.block_id, PersistentStateKind::Shard);
571 overlay::Response::Ok(res)
572 }
573
574 fn handle_get_queue_persistent_state_info(
575 &self,
576 req: &rpc::GetPersistentQueueStateInfo,
577 ) -> overlay::Response<PersistentStateInfo> {
578 let res = self.read_persistent_state_info(&req.block_id, PersistentStateKind::Queue);
579 overlay::Response::Ok(res)
580 }
581
582 async fn handle_get_persistent_shard_state_chunk(
583 &self,
584 req: &rpc::GetPersistentShardStateChunk,
585 ) -> overlay::Response<Data> {
586 self.read_persistent_state_chunk(&req.block_id, req.offset, PersistentStateKind::Shard)
587 .await
588 }
589
590 async fn handle_get_persistent_queue_state_chunk(
591 &self,
592 req: &rpc::GetPersistentQueueStateChunk,
593 ) -> overlay::Response<Data> {
594 self.read_persistent_state_chunk(&req.block_id, req.offset, PersistentStateKind::Queue)
595 .await
596 }
597}
598
599impl<B> Inner<B> {
600 async fn get_block_full(&self, block_id: &BlockId) -> anyhow::Result<BlockFull> {
601 let block_handle_storage = self.storage().block_handle_storage();
602 let block_storage = self.storage().block_storage();
603
604 let handle = match block_handle_storage.load_handle(block_id) {
605 Some(handle) if handle.has_all_block_parts() => handle,
606 _ => return Ok(BlockFull::NotFound),
607 };
608
609 let data = match block_storage
611 .load_block_data_range(&handle, 0, BLOCK_DATA_CHUNK_SIZE as u64)
612 .await?
613 {
614 Some(data) => data,
615 None => return Ok(BlockFull::NotFound),
616 };
617
618 let data_size = if data.len() < BLOCK_DATA_CHUNK_SIZE as usize {
619 data.len() as u32
621 } else {
622 match block_storage.get_compressed_block_data_size(&handle)? {
624 Some(size) => size as u32,
625 None => return Ok(BlockFull::NotFound),
626 }
627 };
628
629 let block = BlockData {
630 data,
631 size: NonZeroU32::new(data_size).expect("shouldn't happen"),
632 chunk_size: NonZeroU32::new(BLOCK_DATA_CHUNK_SIZE).expect("shouldn't happen"),
633 };
634
635 let (proof, queue_diff) = tokio::join!(
636 block_storage.load_block_proof_raw(&handle),
637 block_storage.load_queue_diff_raw(&handle)
638 );
639
640 Ok(BlockFull::Found {
641 block_id: *block_id,
642 block,
643 proof: Bytes::from_owner(proof?),
644 queue_diff: Bytes::from_owner(queue_diff?),
645 })
646 }
647
648 fn read_persistent_state_info(
649 &self,
650 block_id: &BlockId,
651 state_kind: PersistentStateKind,
652 ) -> PersistentStateInfo {
653 let persistent_state_storage = self.storage().persistent_state_storage();
654 if self.config.serve_persistent_states
655 && let Some(info) = persistent_state_storage.get_state_info(block_id, state_kind)
656 {
657 return PersistentStateInfo::Found {
658 size: info.size,
659 chunk_size: info.chunk_size,
660 };
661 }
662 PersistentStateInfo::NotFound
663 }
664
665 async fn read_persistent_state_chunk(
666 &self,
667 block_id: &BlockId,
668 offset: u64,
669 state_kind: PersistentStateKind,
670 ) -> overlay::Response<Data> {
671 let persistent_state_storage = self.storage().persistent_state_storage();
672
673 let persistent_state_request_validation = || {
674 anyhow::ensure!(
675 self.config.serve_persistent_states,
676 "persistent states are disabled"
677 );
678 Ok::<_, anyhow::Error>(())
679 };
680
681 if let Err(e) = persistent_state_request_validation() {
682 tracing::debug!("persistent state request validation failed: {e:?}");
683 return overlay::Response::Err(BAD_REQUEST_ERROR_CODE);
684 }
685
686 match persistent_state_storage
687 .read_state_part(block_id, offset, state_kind)
688 .await
689 {
690 Some(data) => overlay::Response::Ok(Data { data: data.into() }),
691 None => {
692 tracing::debug!("failed to read persistent state part");
693 overlay::Response::Err(NOT_FOUND_ERROR_CODE)
694 }
695 }
696 }
697}