1use std::num::{NonZeroU32, NonZeroU64};
2use std::sync::Arc;
3
4use anyhow::Context;
5use bytes::{Buf, Bytes};
6use serde::{Deserialize, Serialize};
7use tycho_block_util::message::validate_external_message;
8use tycho_network::{Response, Service, ServiceRequest, try_handle_prefix};
9use tycho_types::models::BlockId;
10use tycho_util::futures::BoxFutureOrNoop;
11use tycho_util::metrics::HistogramGuard;
12
13use crate::blockchain_rpc::broadcast_listener::{BroadcastListener, NoopBroadcastListener};
14use crate::blockchain_rpc::{BAD_REQUEST_ERROR_CODE, INTERNAL_ERROR_CODE, NOT_FOUND_ERROR_CODE};
15use crate::proto::blockchain::*;
16use crate::proto::overlay;
17use crate::storage::{
18 ArchiveId, BlockConnection, CoreStorage, KeyBlocksDirection, PersistentStateKind,
19};
20
21const RPC_METHOD_TIMINGS_METRIC: &str = "tycho_blockchain_rpc_method_time";
22
23#[derive(Debug, Clone, Serialize, Deserialize)]
24#[serde(default)]
25#[non_exhaustive]
26pub struct BlockchainRpcServiceConfig {
27 pub max_key_blocks_list_len: usize,
31
32 pub serve_persistent_states: bool,
36}
37
38impl Default for BlockchainRpcServiceConfig {
39 fn default() -> Self {
40 Self {
41 max_key_blocks_list_len: 8,
42 serve_persistent_states: true,
43 }
44 }
45}
46
47pub struct BlockchainRpcServiceBuilder<MandatoryFields> {
48 config: BlockchainRpcServiceConfig,
49 mandatory_fields: MandatoryFields,
50}
51
52impl<B> BlockchainRpcServiceBuilder<(B, CoreStorage)>
53where
54 B: BroadcastListener,
55{
56 pub fn build(self) -> BlockchainRpcService<B> {
57 let (broadcast_listener, storage) = self.mandatory_fields;
58
59 BlockchainRpcService {
60 inner: Arc::new(Inner {
61 storage,
62 config: self.config,
63 broadcast_listener,
64 }),
65 }
66 }
67}
68
69impl<T1> BlockchainRpcServiceBuilder<(T1, ())> {
70 pub fn with_storage(
71 self,
72 storage: CoreStorage,
73 ) -> BlockchainRpcServiceBuilder<(T1, CoreStorage)> {
74 let (broadcast_listener, _) = self.mandatory_fields;
75
76 BlockchainRpcServiceBuilder {
77 config: self.config,
78 mandatory_fields: (broadcast_listener, storage),
79 }
80 }
81}
82
83impl<T2> BlockchainRpcServiceBuilder<((), T2)> {
84 pub fn with_broadcast_listener<T1>(
85 self,
86 broadcast_listener: T1,
87 ) -> BlockchainRpcServiceBuilder<(T1, T2)>
88 where
89 T1: BroadcastListener,
90 {
91 let (_, storage) = self.mandatory_fields;
92
93 BlockchainRpcServiceBuilder {
94 config: self.config,
95 mandatory_fields: (broadcast_listener, storage),
96 }
97 }
98
99 pub fn without_broadcast_listener(
100 self,
101 ) -> BlockchainRpcServiceBuilder<(NoopBroadcastListener, T2)> {
102 let (_, storage) = self.mandatory_fields;
103
104 BlockchainRpcServiceBuilder {
105 config: self.config,
106 mandatory_fields: (NoopBroadcastListener, storage),
107 }
108 }
109}
110
111impl<T1, T2> BlockchainRpcServiceBuilder<(T1, T2)> {
112 pub fn with_config(self, config: BlockchainRpcServiceConfig) -> Self {
113 Self { config, ..self }
114 }
115}
116
117#[repr(transparent)]
118pub struct BlockchainRpcService<B = NoopBroadcastListener> {
119 inner: Arc<Inner<B>>,
120}
121
122impl<B> Clone for BlockchainRpcService<B> {
123 #[inline]
124 fn clone(&self) -> Self {
125 Self {
126 inner: self.inner.clone(),
127 }
128 }
129}
130
131impl BlockchainRpcService<()> {
132 pub fn builder() -> BlockchainRpcServiceBuilder<((), ())> {
133 BlockchainRpcServiceBuilder {
134 config: Default::default(),
135 mandatory_fields: ((), ()),
136 }
137 }
138}
139
140impl<B: BroadcastListener> Service<ServiceRequest> for BlockchainRpcService<B> {
141 type QueryResponse = Response;
142 type OnQueryFuture = BoxFutureOrNoop<Option<Self::QueryResponse>>;
143 type OnMessageFuture = BoxFutureOrNoop<()>;
144
145 #[tracing::instrument(level = "debug", name = "on_blockchain_service_query", skip_all)]
146 fn on_query(&self, req: ServiceRequest) -> Self::OnQueryFuture {
147 let (constructor, body) = match try_handle_prefix(&req) {
148 Ok(rest) => rest,
149 Err(e) => {
150 tracing::debug!("failed to deserialize query: {e}");
151 return BoxFutureOrNoop::Noop;
152 }
153 };
154
155 tycho_network::match_tl_request!(body, tag = constructor, {
156 overlay::Ping as _ => BoxFutureOrNoop::future(async {
157 Some(Response::from_tl(overlay::Pong))
158 }),
159 rpc::GetNextKeyBlockIds as req => {
160 tracing::debug!(
161 block_id = %req.block_id,
162 max_size = req.max_size,
163 "getNextKeyBlockIds",
164 );
165
166 let inner = self.inner.clone();
167 BoxFutureOrNoop::future(async move {
168 let res = inner.handle_get_next_key_block_ids(&req);
169 Some(Response::from_tl(res))
170 })
171 },
172 rpc::GetBlockFull as req => {
173 tracing::debug!(block_id = %req.block_id, "getBlockFull");
174
175 let inner = self.inner.clone();
176 BoxFutureOrNoop::future(async move {
177 let res = inner.handle_get_block_full(&req).await;
178 Some(Response::from_tl(res))
179 })
180 },
181 rpc::GetNextBlockFull as req => {
182 tracing::debug!(prev_block_id = %req.prev_block_id, "getNextBlockFull");
183
184 let inner = self.inner.clone();
185 BoxFutureOrNoop::future(async move {
186 let res = inner.handle_get_next_block_full(&req).await;
187 Some(Response::from_tl(res))
188 })
189 },
190 rpc::GetBlockDataChunk as req => {
191 tracing::debug!(block_id = %req.block_id, offset = %req.offset, "getBlockDataChunk");
192
193 let inner = self.inner.clone();
194 BoxFutureOrNoop::future(async move {
195 let res = inner.handle_get_block_data_chunk(&req);
196 Some(Response::from_tl(res))
197 })
198 },
199 rpc::GetKeyBlockProof as req => {
200 tracing::debug!(block_id = %req.block_id, "getKeyBlockProof");
201
202 let inner = self.inner.clone();
203 BoxFutureOrNoop::future(async move {
204 let res = inner.handle_get_key_block_proof(&req).await;
205 Some(Response::from_tl(res))
206 })
207 },
208 rpc::GetPersistentShardStateInfo as req => {
209 tracing::debug!(block_id = %req.block_id, "getPersistentShardStateInfo");
210
211 let inner = self.inner.clone();
212 BoxFutureOrNoop::future(async move {
213 let res = inner.handle_get_persistent_state_info(&req);
214 Some(Response::from_tl(res))
215 })
216 },
217 rpc::GetPersistentQueueStateInfo as req => {
218 tracing::debug!(block_id = %req.block_id, "getPersistentQueueStateInfo");
219
220 let inner = self.inner.clone();
221 BoxFutureOrNoop::future(async move {
222 let res = inner.handle_get_queue_persistent_state_info(&req);
223 Some(Response::from_tl(res))
224 })
225 },
226 rpc::GetPersistentShardStateChunk as req => {
227 tracing::debug!(
228 block_id = %req.block_id,
229 offset = %req.offset,
230 "getPersistentShardStateChunk"
231 );
232
233 let inner = self.inner.clone();
234 BoxFutureOrNoop::future(async move {
235 let res = inner.handle_get_persistent_shard_state_chunk(&req).await;
236 Some(Response::from_tl(res))
237 })
238 },
239 rpc::GetPersistentQueueStateChunk as req => {
240 tracing::debug!(
241 block_id = %req.block_id,
242 offset = %req.offset,
243 "getPersistentQueueStateChunk"
244 );
245
246 let inner = self.inner.clone();
247 BoxFutureOrNoop::future(async move {
248 let res = inner.handle_get_persistent_queue_state_chunk(&req).await;
249 Some(Response::from_tl(res))
250 })
251 },
252 rpc::GetArchiveInfo as req => {
253 tracing::debug!(mc_seqno = %req.mc_seqno, "getArchiveInfo");
254
255 let inner = self.inner.clone();
256 BoxFutureOrNoop::future(async move {
257 let res = inner.handle_get_archive_info(&req).await;
258 Some(Response::from_tl(res))
259 })
260 },
261 rpc::GetArchiveChunk as req => {
262 tracing::debug!(
263 archive_id = %req.archive_id,
264 offset = %req.offset,
265 "getArchiveChunk"
266 );
267
268 let inner = self.inner.clone();
269 BoxFutureOrNoop::future(async move {
270 let res = inner.handle_get_archive_chunk(&req).await;
271 Some(Response::from_tl(res))
272 })
273 },
274 }, e => {
275 tracing::debug!("failed to deserialize query: {e}");
276 BoxFutureOrNoop::Noop
277 })
278 }
279
280 #[tracing::instrument(level = "debug", name = "on_blockchain_service_message", skip_all)]
281 fn on_message(&self, mut req: ServiceRequest) -> Self::OnMessageFuture {
282 use tl_proto::{BytesMeta, TlRead};
283
284 if req.body.len() < 8 {
288 return BoxFutureOrNoop::Noop;
289 }
290
291 if req.body.get_u32_le() != overlay::BroadcastPrefix::TL_ID {
293 return BoxFutureOrNoop::Noop;
294 }
295
296 match req.body.get_u32_le() {
298 MessageBroadcastRef::TL_ID => {
299 match BytesMeta::read_from(&mut req.body.as_ref()) {
300 Ok(meta) if req.body.len() == meta.prefix_len + meta.len + meta.padding => {
302 req.body.advance(meta.prefix_len);
303 req.body.truncate(meta.len);
304 }
305 Ok(_) => {
306 tracing::debug!("malformed external message broadcast");
307 return BoxFutureOrNoop::Noop;
308 }
309 Err(e) => {
310 tracing::debug!("failed to deserialize external message broadcast: {e:?}");
311 return BoxFutureOrNoop::Noop;
312 }
313 }
314
315 let inner = self.inner.clone();
316 metrics::counter!("tycho_rpc_broadcast_external_message_rx_bytes_total")
317 .increment(req.body.len() as u64);
318 BoxFutureOrNoop::future(async move {
319 if let Err(e) = validate_external_message(&req.body).await {
320 tracing::debug!("invalid external message: {e:?}");
321 return;
322 }
323
324 inner
325 .broadcast_listener
326 .handle_message(req.metadata, req.body)
327 .await;
328 })
329 }
330 constructor => {
331 tracing::debug!("unknown broadcast constructor: {constructor:08x}");
332 BoxFutureOrNoop::Noop
333 }
334 }
335 }
336}
337
338struct Inner<B> {
339 storage: CoreStorage,
340 config: BlockchainRpcServiceConfig,
341 broadcast_listener: B,
342}
343
344impl<B> Inner<B> {
345 fn storage(&self) -> &CoreStorage {
346 &self.storage
347 }
348
349 fn handle_get_next_key_block_ids(
350 &self,
351 req: &rpc::GetNextKeyBlockIds,
352 ) -> overlay::Response<KeyBlockIds> {
353 let label = [("method", "getNextKeyBlockIds")];
354 let _hist = HistogramGuard::begin_with_labels(RPC_METHOD_TIMINGS_METRIC, &label);
355
356 let block_handle_storage = self.storage().block_handle_storage();
357
358 let limit = std::cmp::min(req.max_size as usize, self.config.max_key_blocks_list_len);
359
360 let get_next_key_block_ids = || {
361 if !req.block_id.shard.is_masterchain() {
362 anyhow::bail!("first block id is not from masterchain");
363 }
364
365 let mut iterator = block_handle_storage
366 .key_blocks_iterator(KeyBlocksDirection::ForwardFrom(req.block_id.seqno))
367 .take(limit + 1);
368
369 if let Some(id) = iterator.next() {
370 anyhow::ensure!(
371 id.root_hash == req.block_id.root_hash,
372 "first block root hash mismatch"
373 );
374 anyhow::ensure!(
375 id.file_hash == req.block_id.file_hash,
376 "first block file hash mismatch"
377 );
378 }
379
380 Ok::<_, anyhow::Error>(iterator.take(limit).collect::<Vec<_>>())
381 };
382
383 match get_next_key_block_ids() {
384 Ok(ids) => {
385 let incomplete = ids.len() < limit;
386 overlay::Response::Ok(KeyBlockIds {
387 block_ids: ids,
388 incomplete,
389 })
390 }
391 Err(e) => {
392 tracing::warn!("get_next_key_block_ids failed: {e:?}");
393 overlay::Response::Err(INTERNAL_ERROR_CODE)
394 }
395 }
396 }
397
398 async fn handle_get_block_full(&self, req: &rpc::GetBlockFull) -> overlay::Response<BlockFull> {
399 let label = [("method", "getBlockFull")];
400 let _hist = HistogramGuard::begin_with_labels(RPC_METHOD_TIMINGS_METRIC, &label);
401
402 match self.get_block_full(&req.block_id).await {
403 Ok(block_full) => overlay::Response::Ok(block_full),
404 Err(e) => {
405 tracing::warn!("get_block_full failed: {e:?}");
406 overlay::Response::Err(INTERNAL_ERROR_CODE)
407 }
408 }
409 }
410
411 async fn handle_get_next_block_full(
412 &self,
413 req: &rpc::GetNextBlockFull,
414 ) -> overlay::Response<BlockFull> {
415 let label = [("method", "getNextBlockFull")];
416 let _hist = HistogramGuard::begin_with_labels(RPC_METHOD_TIMINGS_METRIC, &label);
417
418 let block_handle_storage = self.storage().block_handle_storage();
419 let block_connection_storage = self.storage().block_connection_storage();
420
421 let get_next_block_full = async {
422 let next_block_id = match block_handle_storage.load_handle(&req.prev_block_id) {
423 Some(handle) if handle.has_next1() => block_connection_storage
424 .load_connection(&req.prev_block_id, BlockConnection::Next1)
425 .context("connection not found")?,
426 _ => return Ok(BlockFull::NotFound),
427 };
428
429 self.get_block_full(&next_block_id).await
430 };
431
432 match get_next_block_full.await {
433 Ok(block_full) => overlay::Response::Ok(block_full),
434 Err(e) => {
435 tracing::warn!("get_next_block_full failed: {e:?}");
436 overlay::Response::Err(INTERNAL_ERROR_CODE)
437 }
438 }
439 }
440
441 fn handle_get_block_data_chunk(&self, req: &rpc::GetBlockDataChunk) -> overlay::Response<Data> {
442 let label = [("method", "getBlockDataChunk")];
443 let _hist = HistogramGuard::begin_with_labels(RPC_METHOD_TIMINGS_METRIC, &label);
444
445 let block_storage = self.storage.block_storage();
446 match block_storage.get_block_data_chunk(&req.block_id, req.offset) {
447 Ok(Some(data)) => overlay::Response::Ok(Data {
448 data: Bytes::from_owner(data),
449 }),
450 Ok(None) => overlay::Response::Err(NOT_FOUND_ERROR_CODE),
451 Err(e) => {
452 tracing::warn!("get_block_data_chunk failed: {e:?}");
453 overlay::Response::Err(INTERNAL_ERROR_CODE)
454 }
455 }
456 }
457
458 async fn handle_get_key_block_proof(
459 &self,
460 req: &rpc::GetKeyBlockProof,
461 ) -> overlay::Response<KeyBlockProof> {
462 let label = [("method", "getKeyBlockProof")];
463 let _hist = HistogramGuard::begin_with_labels(RPC_METHOD_TIMINGS_METRIC, &label);
464
465 let block_handle_storage = self.storage().block_handle_storage();
466 let block_storage = self.storage().block_storage();
467
468 let get_key_block_proof = async {
469 match block_handle_storage.load_handle(&req.block_id) {
470 Some(handle) if handle.has_proof() => {
471 let data = block_storage.load_block_proof_raw(&handle).await?;
472 Ok::<_, anyhow::Error>(KeyBlockProof::Found {
473 proof: Bytes::from_owner(data),
474 })
475 }
476 _ => Ok(KeyBlockProof::NotFound),
477 }
478 };
479
480 match get_key_block_proof.await {
481 Ok(key_block_proof) => overlay::Response::Ok(key_block_proof),
482 Err(e) => {
483 tracing::warn!("get_key_block_proof failed: {e:?}");
484 overlay::Response::Err(INTERNAL_ERROR_CODE)
485 }
486 }
487 }
488
489 async fn handle_get_archive_info(
490 &self,
491 req: &rpc::GetArchiveInfo,
492 ) -> overlay::Response<ArchiveInfo> {
493 let mc_seqno = req.mc_seqno;
494 let node_state = self.storage.node_state();
495
496 let label = [("method", "getArchiveInfo")];
497 let _hist = HistogramGuard::begin_with_labels(RPC_METHOD_TIMINGS_METRIC, &label);
498
499 match node_state.load_last_mc_block_id() {
500 Some(last_applied_mc_block) => {
501 if mc_seqno > last_applied_mc_block.seqno {
502 return overlay::Response::Ok(ArchiveInfo::TooNew);
503 }
504
505 let block_storage = self.storage().block_storage();
506
507 let id = block_storage.get_archive_id(mc_seqno);
508 let size_res = match id {
509 ArchiveId::Found(id) => block_storage.get_archive_size(id),
510 ArchiveId::TooNew | ArchiveId::NotFound => Ok(None),
511 };
512
513 overlay::Response::Ok(match (id, size_res) {
514 (ArchiveId::Found(id), Ok(Some(size))) if size > 0 => ArchiveInfo::Found {
515 id: id as u64,
516 size: NonZeroU64::new(size as _).unwrap(),
517 chunk_size: block_storage.archive_chunk_size(),
518 },
519 (ArchiveId::TooNew, Ok(None)) => ArchiveInfo::TooNew,
520 _ => ArchiveInfo::NotFound,
521 })
522 }
523 None => {
524 tracing::warn!("get_archive_id failed: no blocks applied");
525 overlay::Response::Err(INTERNAL_ERROR_CODE)
526 }
527 }
528 }
529
530 async fn handle_get_archive_chunk(
531 &self,
532 req: &rpc::GetArchiveChunk,
533 ) -> overlay::Response<Data> {
534 let label = [("method", "getArchiveChunk")];
535 let _hist = HistogramGuard::begin_with_labels(RPC_METHOD_TIMINGS_METRIC, &label);
536
537 let block_storage = self.storage.block_storage();
538
539 let get_archive_chunk = || async {
540 let archive_slice = block_storage
541 .get_archive_chunk(req.archive_id as u32, req.offset)
542 .await?;
543
544 Ok::<_, anyhow::Error>(archive_slice)
545 };
546
547 match get_archive_chunk().await {
548 Ok(data) => overlay::Response::Ok(Data {
549 data: Bytes::from_owner(data),
550 }),
551 Err(e) => {
552 tracing::warn!("get_archive_chunk failed: {e:?}");
553 overlay::Response::Err(INTERNAL_ERROR_CODE)
554 }
555 }
556 }
557
558 fn handle_get_persistent_state_info(
559 &self,
560 req: &rpc::GetPersistentShardStateInfo,
561 ) -> overlay::Response<PersistentStateInfo> {
562 let label = [("method", "getPersistentShardStateInfo")];
563 let _hist = HistogramGuard::begin_with_labels(RPC_METHOD_TIMINGS_METRIC, &label);
564 let res = self.read_persistent_state_info(&req.block_id, PersistentStateKind::Shard);
565 overlay::Response::Ok(res)
566 }
567
568 fn handle_get_queue_persistent_state_info(
569 &self,
570 req: &rpc::GetPersistentQueueStateInfo,
571 ) -> overlay::Response<PersistentStateInfo> {
572 let label = [("method", "getQueuePersistentStateInfo")];
573 let _hist = HistogramGuard::begin_with_labels(RPC_METHOD_TIMINGS_METRIC, &label);
574 let res = self.read_persistent_state_info(&req.block_id, PersistentStateKind::Queue);
575 overlay::Response::Ok(res)
576 }
577
578 async fn handle_get_persistent_shard_state_chunk(
579 &self,
580 req: &rpc::GetPersistentShardStateChunk,
581 ) -> overlay::Response<Data> {
582 let label = [("method", "getPersistentShardStateChunk")];
583 let _hist = HistogramGuard::begin_with_labels(RPC_METHOD_TIMINGS_METRIC, &label);
584 self.read_persistent_state_chunk(&req.block_id, req.offset, PersistentStateKind::Shard)
585 .await
586 }
587
588 async fn handle_get_persistent_queue_state_chunk(
589 &self,
590 req: &rpc::GetPersistentQueueStateChunk,
591 ) -> overlay::Response<Data> {
592 let label = [("method", "getPersistentQueueStateChunk")];
593 let _hist = HistogramGuard::begin_with_labels(RPC_METHOD_TIMINGS_METRIC, &label);
594 self.read_persistent_state_chunk(&req.block_id, req.offset, PersistentStateKind::Queue)
595 .await
596 }
597}
598
599impl<B> Inner<B> {
600 async fn get_block_full(&self, block_id: &BlockId) -> anyhow::Result<BlockFull> {
601 let block_handle_storage = self.storage().block_handle_storage();
602 let block_storage = self.storage().block_storage();
603
604 let handle = match block_handle_storage.load_handle(block_id) {
605 Some(handle) if handle.has_all_block_parts() => handle,
606 _ => return Ok(BlockFull::NotFound),
607 };
608
609 let Some(data) = block_storage.get_block_data_chunk(block_id, 0)? else {
610 return Ok(BlockFull::NotFound);
611 };
612
613 let data_chunk_size = block_storage.block_data_chunk_size();
614 let data_size = if data.len() < data_chunk_size.get() as usize {
615 data.len() as u32
618 } else {
619 match block_storage.get_block_data_size(block_id)? {
620 Some(size) => size,
621 None => return Ok(BlockFull::NotFound),
622 }
623 };
624
625 let block = BlockData {
626 data: Bytes::from_owner(data),
627 size: NonZeroU32::new(data_size).expect("shouldn't happen"),
628 chunk_size: data_chunk_size,
629 };
630
631 let (proof, queue_diff) = tokio::join!(
632 block_storage.load_block_proof_raw(&handle),
633 block_storage.load_queue_diff_raw(&handle)
634 );
635
636 Ok(BlockFull::Found {
637 block_id: *block_id,
638 block,
639 proof: Bytes::from_owner(proof?),
640 queue_diff: Bytes::from_owner(queue_diff?),
641 })
642 }
643
644 fn read_persistent_state_info(
645 &self,
646 block_id: &BlockId,
647 state_kind: PersistentStateKind,
648 ) -> PersistentStateInfo {
649 let persistent_state_storage = self.storage().persistent_state_storage();
650 if self.config.serve_persistent_states {
651 if let Some(info) = persistent_state_storage.get_state_info(block_id, state_kind) {
652 return PersistentStateInfo::Found {
653 size: info.size,
654 chunk_size: info.chunk_size,
655 };
656 }
657 }
658 PersistentStateInfo::NotFound
659 }
660
661 async fn read_persistent_state_chunk(
662 &self,
663 block_id: &BlockId,
664 offset: u64,
665 state_kind: PersistentStateKind,
666 ) -> overlay::Response<Data> {
667 let persistent_state_storage = self.storage().persistent_state_storage();
668
669 let persistent_state_request_validation = || {
670 anyhow::ensure!(
671 self.config.serve_persistent_states,
672 "persistent states are disabled"
673 );
674 Ok::<_, anyhow::Error>(())
675 };
676
677 if let Err(e) = persistent_state_request_validation() {
678 tracing::debug!("persistent state request validation failed: {e:?}");
679 return overlay::Response::Err(BAD_REQUEST_ERROR_CODE);
680 }
681
682 match persistent_state_storage
683 .read_state_part(block_id, offset, state_kind)
684 .await
685 {
686 Some(data) => overlay::Response::Ok(Data { data: data.into() }),
687 None => {
688 tracing::debug!("failed to read persistent state part");
689 overlay::Response::Err(NOT_FOUND_ERROR_CODE)
690 }
691 }
692 }
693}