1use std::path::PathBuf;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::time::Duration;
5
6use anyhow::{Context, Result};
7use arc_swap::ArcSwap;
8use axum::extract::FromRef;
9use futures_util::future::BoxFuture;
10use parking_lot::RwLock;
11use tokio::net::TcpListener;
12use tokio::sync::{Notify, Semaphore, watch};
13use tokio::task::JoinHandle;
14use tycho_block_util::block::BlockStuff;
15use tycho_block_util::state::{RefMcStateHandle, ShardStateStuff};
16use tycho_core::block_strider::{
17 BlockSubscriber, BlockSubscriberContext, StateSubscriber, StateSubscriberContext,
18};
19use tycho_core::blockchain_rpc::BlockchainRpcClient;
20use tycho_core::global_config::ZerostateId;
21use tycho_core::storage::{CoreStorage, KeyBlocksDirection};
22use tycho_rpc_subscriptions::SubscriberManagerConfig;
23use tycho_types::models::*;
24use tycho_types::prelude::*;
25use tycho_util::FastHashMap;
26use tycho_util::metrics::HistogramGuard;
27use tycho_util::time::now_sec;
28
29pub use self::storage::*;
30pub use self::subscriptions::{AccountUpdate, RegisterError, RpcSubscriptions};
31use crate::config::{BlackListConfig, RpcConfig, RpcStorageConfig, TransactionsGcConfig};
32use crate::endpoint::{JrpcEndpointCache, ProtoEndpointCache, RpcEndpoint, jrpc};
33use crate::models::{GenTimings, StateTimings};
34
35impl FromRef<RpcState> for Arc<RpcSubscriptions> {
36 fn from_ref(state: &RpcState) -> Self {
37 state.inner.subscriptions.clone()
38 }
39}
40
41impl FromRef<RpcState> for jrpc::SubscriptionsState {
42 fn from_ref(state: &RpcState) -> Self {
43 Arc::new(jrpc::StreamContext {
44 subs: state.inner.subscriptions.clone(),
45 })
46 }
47}
48
49mod db;
50mod storage;
51mod subscriptions;
52pub mod tables;
53
54const RPC_DB_SUBDIR: &str = "rpc";
55
56pub struct RpcStateBuilder<MandatoryFields = (CoreStorage, BlockchainRpcClient, ZerostateId)> {
57 config: RpcConfig,
58 mandatory_fields: MandatoryFields,
59}
60
61impl RpcStateBuilder {
62 pub fn build(self) -> Result<RpcState> {
63 let (core_storage, blockchain_rpc_client, zerostate_id) = self.mandatory_fields;
64 let config = self.config;
65
66 let gc_notify = Arc::new(Notify::new());
67
68 let mut gc_handle = None;
69 let mut blacklisted_accounts = None::<BlacklistedAccounts>;
70 let mut blacklist_watcher_handle = None;
71
72 let rpc_storage = match &config.storage {
73 RpcStorageConfig::Full {
74 gc, blacklist_path, ..
75 } => {
76 let db = core_storage.context().open_preconfigured(RPC_DB_SUBDIR)?;
77 let rpc_storage = Arc::new(RpcStorage::new(db));
78
79 if let Some(config) = gc {
80 gc_handle = Some(tokio::spawn(transactions_gc(
81 config.clone(),
82 core_storage.clone(),
83 rpc_storage.clone(),
84 gc_notify.clone(),
85 )));
86 }
87
88 if let Some(path) = blacklist_path {
89 let accounts = BlacklistedAccounts::default();
90 blacklisted_accounts = Some(accounts.clone());
91 blacklist_watcher_handle = Some(tokio::spawn(watch_blacklisted_accounts(
92 path.clone(),
93 accounts,
94 )));
95 }
96
97 Some(rpc_storage)
98 }
99 RpcStorageConfig::StateOnly => None,
100 };
101
102 let download_block_semaphore = Semaphore::new(config.max_parallel_block_downloads);
103
104 let run_get_method_semaphore = Arc::new(Semaphore::new(config.run_get_method.max_vms));
105
106 let subscriptions = Arc::new(RpcSubscriptions::new(
107 SubscriberManagerConfig::new(
108 config.subscriptions.max_addrs,
109 config.subscriptions.max_clients,
110 ),
111 config.subscriptions.queue_depth,
112 ));
113
114 let parsed_config = Arc::new(LatestBlockchainConfig::default());
116
117 let timings = GenTimings {
118 gen_lt: 0,
119 gen_utime: 0,
120 };
121 let tick = McTick {
122 seqno: 0,
123 lt: timings.gen_lt,
124 utime: timings.gen_utime,
125 };
126 let (mc_tick_tx, _) = watch::channel(tick);
127 let mc_info = LatestMcInfo {
128 block_id: Arc::new(BlockId {
129 shard: ShardIdent::MASTERCHAIN,
130 seqno: 0,
131 ..Default::default()
132 }),
133 timings,
134 state_hash: HashBytes::ZERO,
135 };
136
137 Ok(RpcState {
138 inner: Arc::new(Inner {
139 config,
140 core_storage,
141 rpc_storage,
142 blockchain_rpc_client,
143 mc_info: RwLock::new(mc_info),
144 mc_tick_tx,
145 mc_accounts: Default::default(),
146 sc_accounts: Default::default(),
147 run_get_method_semaphore,
148 download_block_semaphore,
149 is_ready: AtomicBool::new(false),
150 timings: ArcSwap::new(Default::default()),
151 blockchain_config: ArcSwap::new(parsed_config),
152 jrpc_cache: Default::default(),
153 proto_cache: Default::default(),
154 subscriptions,
155 zerostate_id,
156 gc_notify,
157 gc_handle,
158 blacklisted_accounts,
159 blacklist_watcher_handle,
160 }),
161 })
162 }
163}
164
165impl<T2, T3> RpcStateBuilder<((), T2, T3)> {
166 pub fn with_storage(self, storage: CoreStorage) -> RpcStateBuilder<(CoreStorage, T2, T3)> {
167 let (_, bc_rpc_client, zerostate_id) = self.mandatory_fields;
168
169 RpcStateBuilder {
170 config: self.config,
171 mandatory_fields: (storage, bc_rpc_client, zerostate_id),
172 }
173 }
174}
175
176impl<T1, T3> RpcStateBuilder<(T1, (), T3)> {
177 pub fn with_blockchain_rpc_client(
178 self,
179 client: BlockchainRpcClient,
180 ) -> RpcStateBuilder<(T1, BlockchainRpcClient, T3)> {
181 let (storage, _, zerostate_id) = self.mandatory_fields;
182
183 RpcStateBuilder {
184 config: self.config,
185 mandatory_fields: (storage, client, zerostate_id),
186 }
187 }
188}
189
190impl<T1, T2> RpcStateBuilder<(T1, T2, ())> {
191 pub fn with_zerostate_id(
192 self,
193 zerostate_id: ZerostateId,
194 ) -> RpcStateBuilder<(T1, T2, ZerostateId)> {
195 let (storage, client, _) = self.mandatory_fields;
196
197 RpcStateBuilder {
198 config: self.config,
199 mandatory_fields: (storage, client, zerostate_id),
200 }
201 }
202}
203
204impl<T1, T2, T3> RpcStateBuilder<(T1, T2, T3)> {
205 pub fn with_config(self, config: RpcConfig) -> RpcStateBuilder<(T1, T2, T3)> {
206 RpcStateBuilder { config, ..self }
207 }
208}
209
210#[derive(Clone)]
211#[repr(transparent)]
212pub struct RpcState {
213 inner: Arc<Inner>,
214}
215
216impl RpcState {
217 pub fn builder() -> RpcStateBuilder<((), (), ())> {
218 RpcStateBuilder {
219 config: RpcConfig::default(),
220 mandatory_fields: ((), (), ()),
221 }
222 }
223
224 pub fn split(self) -> (RpcBlockSubscriber, RpcStateSubscriber) {
225 let block_subscriber = RpcBlockSubscriber {
226 inner: self.inner.clone(),
227 };
228
229 let state_subscriber = RpcStateSubscriber { inner: self.inner };
230
231 (block_subscriber, state_subscriber)
232 }
233
234 pub async fn init(&self, mc_block_id: &BlockId) -> Result<()> {
235 self.inner.init(mc_block_id).await
236 }
237
238 pub async fn acquire_download_block_permit(&self) -> tokio::sync::SemaphorePermit<'_> {
239 self.inner.download_block_semaphore.acquire().await.unwrap()
241 }
242
243 pub async fn acquire_run_get_method_permit(&self) -> RunGetMethodPermit {
244 let config = &self.config().run_get_method;
245 if config.max_vms == 0 {
246 return RunGetMethodPermit::Disabled;
247 }
248
249 let fut = self.inner.run_get_method_semaphore.clone().acquire_owned();
250 match tokio::time::timeout(config.max_wait_for_vm, fut).await {
251 Ok(Ok(permit)) => RunGetMethodPermit::Acquired(permit),
252 Ok(Err(_)) => RunGetMethodPermit::Disabled,
253 Err(_) => RunGetMethodPermit::Timeout,
254 }
255 }
256
257 pub async fn bind_socket(&self) -> std::io::Result<TcpListener> {
258 TcpListener::bind(self.config().listen_addr).await
259 }
260
261 pub async fn bind_endpoint(&self) -> Result<RpcEndpoint> {
262 RpcEndpoint::builder().bind(self.clone()).await
263 }
264
265 pub fn config(&self) -> &RpcConfig {
266 &self.inner.config
267 }
268
269 pub fn is_ready(&self) -> bool {
270 self.inner.is_ready.load(Ordering::Acquire)
271 }
272
273 pub fn is_full(&self) -> bool {
274 self.inner.rpc_storage.is_some()
275 }
276
277 pub fn load_timings(&self) -> arc_swap::Guard<Arc<StateTimings>> {
278 self.inner.timings.load()
279 }
280
281 pub fn jrpc_cache(&self) -> &JrpcEndpointCache {
282 &self.inner.jrpc_cache
283 }
284
285 pub fn proto_cache(&self) -> &ProtoEndpointCache {
286 &self.inner.proto_cache
287 }
288
289 pub fn subscriptions(&self) -> &RpcSubscriptions {
290 &self.inner.subscriptions
291 }
292
293 pub fn zerostate_id(&self) -> &ZerostateId {
294 &self.inner.zerostate_id
295 }
296
297 pub fn get_latest_mc_info(&self) -> LatestMcInfo {
298 self.inner.mc_info.read().clone()
299 }
300
301 pub(crate) fn subscribe_mc_tick(&self) -> watch::Receiver<McTick> {
302 self.inner.mc_tick_tx.subscribe()
303 }
304
305 pub fn rpc_storage_snapshot(&self) -> Option<RpcSnapshot> {
306 let rpc_storage = self.inner.rpc_storage.as_ref()?;
307 rpc_storage.load_snapshot()
308 }
309
310 pub async fn broadcast_external_message(&self, message: &[u8]) {
311 metrics::counter!("tycho_rpc_broadcast_external_message_tx_bytes_total")
312 .increment(message.len() as u64);
313 self.inner
314 .blockchain_rpc_client
315 .broadcast_external_message(message)
316 .await;
317 }
318
319 pub fn get_unpacked_blockchain_config(&self) -> Arc<LatestBlockchainConfig> {
320 self.inner.blockchain_config.load_full()
321 }
322
323 pub fn get_brief_block_info(
324 &self,
325 block_id: &BlockIdShort,
326 snapshot: Option<&RpcSnapshot>,
327 ) -> Result<Option<(BlockId, u32, BriefBlockInfo)>, RpcStateError> {
328 let Some(storage) = &self.inner.rpc_storage else {
329 return Err(RpcStateError::NotSupported);
330 };
331 storage
332 .get_brief_block_info(block_id, snapshot)
333 .map_err(RpcStateError::Internal)
334 }
335
336 pub fn get_brief_shards_descr(
337 &self,
338 mc_seqno: u32,
339 snapshot: Option<&RpcSnapshot>,
340 ) -> Result<Option<Vec<BriefShardDescr>>, RpcStateError> {
341 let Some(storage) = &self.inner.rpc_storage else {
342 return Err(RpcStateError::NotSupported);
343 };
344 storage
345 .get_brief_shards_descr(mc_seqno, snapshot)
346 .map_err(RpcStateError::Internal)
347 }
348
349 pub fn get_libraries(&self) -> Dict<HashBytes, LibDescr> {
350 match self.inner.mc_accounts.read().as_ref() {
351 Some(cache) => cache.libraries.clone(),
352 None => Dict::new(),
353 }
354 }
355
356 pub fn get_raw_library(&self, hash: &HashBytes) -> Result<Option<Cell>> {
357 let guard = self.inner.mc_accounts.read();
358 match guard.as_ref() {
359 Some(cache) => Ok(cache.libraries.get(hash)?.map(|x| x.lib)),
360 None => Ok(None),
361 }
362 }
363
364 pub fn get_account_state(
365 &self,
366 address: &StdAddr,
367 ) -> Result<LoadedAccountState, RpcStateError> {
368 self.inner.get_account_state(address)
369 }
370
371 pub fn get_accounts_by_code_hash(
372 &self,
373 code_hash: &HashBytes,
374 continuation: Option<&StdAddr>,
375 snapshot: Option<RpcSnapshot>,
376 ) -> Result<CodeHashesIter<'_>, RpcStateError> {
377 let Some(storage) = &self.inner.rpc_storage else {
378 return Err(RpcStateError::NotSupported);
379 };
380 storage
381 .get_accounts_by_code_hash(code_hash, continuation, snapshot)
382 .map_err(RpcStateError::Internal)
383 }
384
385 pub fn get_known_mc_blocks_range(
386 &self,
387 snapshot: Option<&RpcSnapshot>,
388 ) -> Result<Option<(u32, u32)>, RpcStateError> {
389 let Some(storage) = &self.inner.rpc_storage else {
390 return Err(RpcStateError::NotSupported);
391 };
392 storage
393 .get_known_mc_blocks_range(snapshot)
394 .map_err(RpcStateError::Internal)
395 }
396
397 pub fn get_blocks_by_mc_seqno(
398 &self,
399 mc_seqno: u32,
400 snapshot: Option<RpcSnapshot>,
401 ) -> Result<Option<BlocksByMcSeqnoIter>, RpcStateError> {
402 let Some(storage) = &self.inner.rpc_storage else {
403 return Err(RpcStateError::NotSupported);
404 };
405 storage
406 .get_blocks_by_mc_seqno(mc_seqno, snapshot)
407 .map_err(RpcStateError::Internal)
408 }
409
410 pub fn get_block_transactions(
411 &self,
412 block_id: &BlockIdShort,
413 reverse: bool,
414 cursor: Option<&BlockTransactionsCursor>,
415 snapshot: Option<RpcSnapshot>,
416 ) -> Result<Option<BlockTransactionsIterBuilder>, RpcStateError> {
417 let Some(storage) = &self.inner.rpc_storage else {
418 return Err(RpcStateError::NotSupported);
419 };
420 storage
421 .get_block_transactions(block_id, reverse, cursor, snapshot)
422 .map_err(RpcStateError::Internal)
423 }
424
425 pub fn get_block_transaction_ids(
426 &self,
427 block_id: &BlockIdShort,
428 reverse: bool,
429 cursor: Option<&BlockTransactionsCursor>,
430 snapshot: Option<RpcSnapshot>,
431 ) -> Result<Option<BlockTransactionIdsIter>, RpcStateError> {
432 let Some(storage) = &self.inner.rpc_storage else {
433 return Err(RpcStateError::NotSupported);
434 };
435 storage
436 .get_block_transaction_ids(block_id, reverse, cursor, snapshot)
437 .map_err(RpcStateError::Internal)
438 }
439
440 pub fn get_transactions(
441 &self,
442 account: &StdAddr,
443 start_lt: Option<u64>,
444 end_lt: Option<u64>,
445 reverse: bool,
446 snapshot: Option<RpcSnapshot>,
447 ) -> Result<TransactionsIterBuilder, RpcStateError> {
448 let Some(storage) = &self.inner.rpc_storage else {
449 return Err(RpcStateError::NotSupported);
450 };
451 storage
452 .get_transactions(account, start_lt, end_lt, reverse, snapshot)
453 .map_err(RpcStateError::Internal)
454 }
455
456 pub fn get_transaction(
457 &self,
458 hash: &HashBytes,
459 snapshot: Option<&RpcSnapshot>,
460 ) -> Result<Option<TransactionData<'_>>, RpcStateError> {
461 let Some(storage) = &self.inner.rpc_storage else {
462 return Err(RpcStateError::NotSupported);
463 };
464 storage
465 .get_transaction(hash, snapshot)
466 .map_err(RpcStateError::Internal)
467 }
468
469 pub fn get_transaction_ext<'a>(
470 &'a self,
471 hash: &HashBytes,
472 snapshot: Option<&RpcSnapshot>,
473 ) -> Result<Option<TransactionDataExt<'a>>, RpcStateError> {
474 let Some(storage) = &self.inner.rpc_storage else {
475 return Err(RpcStateError::NotSupported);
476 };
477 storage
478 .get_transaction_ext(hash, snapshot)
479 .map_err(RpcStateError::Internal)
480 }
481
482 pub fn get_transaction_info(
483 &self,
484 hash: &HashBytes,
485 snapshot: Option<&RpcSnapshot>,
486 ) -> Result<Option<TransactionInfo>, RpcStateError> {
487 let Some(storage) = &self.inner.rpc_storage else {
488 return Err(RpcStateError::NotSupported);
489 };
490 storage
491 .get_transaction_info(hash, snapshot)
492 .map_err(RpcStateError::Internal)
493 }
494
495 pub fn get_src_transaction<'a>(
496 &'a self,
497 account: &StdAddr,
498 message_lt: u64,
499 snapshot: Option<&RpcSnapshot>,
500 ) -> Result<Option<impl AsRef<[u8]> + 'a>, RpcStateError> {
501 let Some(storage) = &self.inner.rpc_storage else {
502 return Err(RpcStateError::NotSupported);
503 };
504 storage
505 .get_src_transaction(account, message_lt, snapshot)
506 .map_err(RpcStateError::Internal)
507 }
508
509 pub fn get_dst_transaction<'a>(
510 &'a self,
511 in_msg_hash: &HashBytes,
512 snapshot: Option<&RpcSnapshot>,
513 ) -> Result<Option<impl AsRef<[u8]> + 'a>, RpcStateError> {
514 let Some(storage) = &self.inner.rpc_storage else {
515 return Err(RpcStateError::NotSupported);
516 };
517 storage
518 .get_dst_transaction(in_msg_hash, snapshot)
519 .map_err(RpcStateError::Internal)
520 }
521
522 pub async fn get_key_block_proof(
524 &self,
525 key_block_seqno: u32,
526 ) -> Option<(BlockId, impl AsRef<[u8]> + Send + Sync + 'static)> {
527 let blocks = self.inner.core_storage.block_storage();
528 let handles = self.inner.core_storage.block_handle_storage();
529
530 let handle = handles.load_key_block_handle(key_block_seqno)?;
531 let data = blocks.load_block_proof_raw(&handle).await.ok()?;
532 Some((*handle.id(), data))
533 }
534
535 pub async fn get_block_proof(
537 &self,
538 block_id: &BlockId,
539 ) -> Option<impl AsRef<[u8]> + Send + Sync + 'static> {
540 let blocks = self.inner.core_storage.block_storage();
541 let handles = self.inner.core_storage.block_handle_storage();
542
543 let handle = handles.load_handle(block_id)?;
544 blocks.load_block_proof_raw(&handle).await.ok()
545 }
546
547 pub async fn get_block_data(
549 &self,
550 block_id: &BlockId,
551 ) -> Option<impl AsRef<[u8]> + Send + Sync + 'static> {
552 let blocks = self.inner.core_storage.block_storage();
553 let handles = self.inner.core_storage.block_handle_storage();
554
555 let handle = handles.load_handle(block_id)?;
556 blocks.load_block_data_decompressed(&handle).await.ok()
557 }
558}
559
560pub struct RpcStateSubscriber {
561 inner: Arc<Inner>,
562}
563
564impl StateSubscriber for RpcStateSubscriber {
565 type HandleStateFut<'a> = futures_util::future::Ready<Result<()>>;
566
567 fn handle_state<'a>(&'a self, cx: &'a StateSubscriberContext) -> Self::HandleStateFut<'a> {
568 futures_util::future::ready(self.inner.update_accounts_cache(&cx.block, &cx.state))
569 }
570}
571
572pub struct RpcBlockSubscriber {
573 inner: Arc<Inner>,
574}
575
576impl BlockSubscriber for RpcBlockSubscriber {
577 type Prepared = JoinHandle<Result<()>>;
578
579 type PrepareBlockFut<'a> = futures_util::future::Ready<Result<Self::Prepared>>;
580 type HandleBlockFut<'a> = BoxFuture<'a, Result<()>>;
581
582 fn prepare_block<'a>(&'a self, cx: &'a BlockSubscriberContext) -> Self::PrepareBlockFut<'a> {
583 let handle = tokio::task::spawn({
584 let inner = self.inner.clone();
585 let mc_block_id = cx.mc_block_id;
586 let block = cx.block.clone();
587 async move { inner.update(&mc_block_id, &block).await }
588 });
589
590 futures_util::future::ready(Ok(handle))
591 }
592
593 fn handle_block<'a>(
594 &'a self,
595 ctx: &'a BlockSubscriberContext,
596 prepared: Self::Prepared,
597 ) -> Self::HandleBlockFut<'a> {
598 Box::pin(async move {
599 prepared.await??;
600 if ctx.block.id().is_masterchain() {
601 self.inner.update_mc_info(&ctx.block)?;
602
603 if let Some(rpc_storage) = &self.inner.rpc_storage {
606 rpc_storage.update_snapshot();
607 }
608 }
609 Ok(())
610 })
611 }
612}
613
614pub enum RunGetMethodPermit {
615 Acquired(tokio::sync::OwnedSemaphorePermit),
616 Timeout,
617 Disabled,
618}
619
620struct Inner {
621 config: RpcConfig,
622 core_storage: CoreStorage,
623 rpc_storage: Option<Arc<RpcStorage>>,
624 blockchain_rpc_client: BlockchainRpcClient,
625 mc_info: RwLock<LatestMcInfo>,
626 mc_tick_tx: watch::Sender<McTick>,
627 mc_accounts: RwLock<Option<CachedAccounts>>,
628 sc_accounts: RwLock<FastHashMap<ShardIdent, CachedAccounts>>,
629 download_block_semaphore: Semaphore,
630 run_get_method_semaphore: Arc<Semaphore>,
631 is_ready: AtomicBool,
632 timings: ArcSwap<StateTimings>,
633 blockchain_config: ArcSwap<LatestBlockchainConfig>,
634 jrpc_cache: JrpcEndpointCache,
635 proto_cache: ProtoEndpointCache,
636 subscriptions: Arc<RpcSubscriptions>,
637 zerostate_id: ZerostateId,
638 gc_notify: Arc<Notify>,
640 gc_handle: Option<JoinHandle<()>>,
641 blacklisted_accounts: Option<BlacklistedAccounts>,
643 blacklist_watcher_handle: Option<JoinHandle<()>>,
644}
645
646#[derive(Clone)]
647pub struct LatestMcInfo {
648 pub block_id: Arc<BlockId>,
649 pub timings: GenTimings,
650 pub state_hash: HashBytes,
651}
652
653#[derive(Clone, Copy, Debug)]
654pub(crate) struct McTick {
655 pub seqno: u32,
656 pub lt: u64,
657 pub utime: u32,
658}
659
660pub struct LatestBlockchainConfig {
661 pub raw: BlockchainConfigParams,
662 pub unpacked: tycho_vm::UnpackedConfig,
663 pub modifiers: tycho_vm::BehaviourModifiers,
664}
665
666impl Default for LatestBlockchainConfig {
667 fn default() -> Self {
668 Self {
669 raw: BlockchainConfigParams::from_raw(Cell::empty_cell()),
670 unpacked: tycho_vm::UnpackedConfig {
671 latest_storage_prices: None,
672 global_id: None,
673 mc_gas_prices: None,
674 gas_prices: None,
675 mc_fwd_prices: None,
676 fwd_prices: None,
677 size_limits_config: None,
678 },
679 modifiers: Default::default(),
680 }
681 }
682}
683
684impl Inner {
685 async fn init(self: &Arc<Self>, mc_block_id: &BlockId) -> Result<()> {
686 anyhow::ensure!(mc_block_id.is_masterchain(), "not a masterchain state");
687
688 let blocks = self.core_storage.block_storage();
689 let block_handles = self.core_storage.block_handle_storage();
690 let shard_states = self.core_storage.shard_state_storage();
691
692 'key_block: {
694 let Some(handle) = block_handles.find_prev_key_block(mc_block_id.seqno + 1) else {
696 break 'key_block;
697 };
698
699 if handle.has_data() {
700 let key_block = blocks.load_block_data(&handle).await?;
701 self.update_mc_block_cache(&key_block)?;
702 } else if handle.is_zerostate()
703 && let Some(proof) = self.core_storage.node_state().load_zerostate_proof()
704 {
705 let state = proof
706 .virtualize()
707 .parse::<ShardStateUnsplit>()
708 .context("failed to deserialize zerostate proof")?;
709
710 let Some(extra) = state.load_custom()? else {
711 anyhow::bail!("masterchain state without extra");
712 };
713
714 self.update_config(state.global_id, state.seqno, &extra.config);
715 tracing::warn!("no key block found during initialization");
716 } else {
717 let state = shard_states
718 .load_state(handle.id().seqno, handle.id())
719 .await
720 .context("failed to load key block state on rpc init")?;
721 let state = state.as_ref();
722
723 let Some(extra) = state.load_custom()? else {
724 anyhow::bail!("masterchain state without extra");
725 };
726
727 self.update_config(state.global_id, state.seqno, &extra.config);
728 tracing::warn!("no key block found during initialization");
729 }
730 }
731
732 let mut mc_state = shard_states
733 .load_state(mc_block_id.seqno, mc_block_id)
734 .await
735 .context("failed to load state on rpc init")?;
736 self.update_timings(mc_state.as_ref().gen_utime, mc_state.as_ref().seqno);
737
738 if let Some(rpc_storage) = &self.rpc_storage {
739 let node_instance_id = self.core_storage.node_state().load_instance_id();
740 let rpc_instance_id = rpc_storage.load_instance_id();
741
742 let make_cached_accounts = |state: &ShardStateStuff| -> Result<CachedAccounts> {
743 let state_info = state.as_ref();
744 Ok(CachedAccounts {
745 libraries: Default::default(),
746 accounts: state_info.load_accounts()?.dict().clone(),
747 mc_ref_hanlde: state.ref_mc_state_handle().clone(),
748 timings: GenTimings {
749 gen_lt: state_info.gen_lt,
750 gen_utime: state_info.gen_utime,
751 },
752 })
753 };
754
755 let shards = mc_state.shards()?.clone();
756
757 if node_instance_id != rpc_instance_id || self.config.storage.is_force_reindex() {
758 rpc_storage
761 .reset_accounts(mc_state, self.config.shard_split_depth)
762 .await?;
763
764 for item in shards.latest_blocks() {
765 let block_id = item?;
766
767 let state = shard_states
768 .load_state(mc_block_id.seqno, &block_id)
769 .await
770 .context("failed to load shard state on init")?;
771
772 rpc_storage
775 .reset_accounts(state, self.config.shard_split_depth)
776 .await?;
777 }
778
779 rpc_storage.store_instance_id(node_instance_id);
781
782 mc_state = shard_states
784 .load_state(mc_block_id.seqno, mc_block_id)
785 .await
786 .context("failed to reload mc state for rpc")?;
787 }
788
789 if let Some(config) = load_blockchain_config(&mc_state) {
791 self.blockchain_config.store(config);
792 }
793
794 *self.mc_accounts.write() = Some(make_cached_accounts(&mc_state)?);
796
797 for item in shards.latest_blocks() {
798 let block_id = item?;
799 let state = shard_states
800 .load_state(mc_block_id.seqno, &block_id)
801 .await
802 .context("failed to load shard state to fill cache")?;
803
804 self.sc_accounts
806 .write()
807 .insert(block_id.shard, make_cached_accounts(&state)?);
808 }
809 }
810
811 self.is_ready.store(true, Ordering::Release);
812 Ok(())
813 }
814
815 fn get_account_state(&self, address: &StdAddr) -> Result<LoadedAccountState, RpcStateError> {
816 let is_masterchain = address.is_masterchain();
817
818 if is_masterchain {
819 match &*self.mc_accounts.read() {
821 None => Err(RpcStateError::NotReady),
822 Some(cache) => {
823 let mc_info = self.mc_info.read().clone();
824 cache.get(mc_info, &address.address)
825 }
826 }
827 } else {
828 let cache = self.sc_accounts.read();
829 let mc_info = self.mc_info.read().clone();
830
831 let mut state = Err(RpcStateError::NotReady);
832
833 let mut gen_utime = 0;
835 let mut found = false;
836 for (shard, cache) in &*cache {
837 if !shard.contains_account(&address.address) || cache.timings.gen_utime < gen_utime
838 {
839 continue;
840 }
841
842 gen_utime = cache.timings.gen_utime;
843 state = cache.get(mc_info.clone(), &address.address);
844 found = true;
845 }
846
847 if !found && gen_utime > 0 {
849 state = Ok(LoadedAccountState::NotFound {
850 mc_block_id: mc_info.block_id,
851 timings: mc_info.timings,
852 });
853 }
854
855 state
857 }
858 }
859
860 async fn update(&self, mc_block_id: &BlockId, block: &BlockStuff) -> Result<()> {
861 let _histogram = HistogramGuard::begin("tycho_rpc_state_update_time");
862
863 let is_masterchain = block.id().is_masterchain();
864 if is_masterchain {
865 self.update_mc_block_cache(block)?;
866 }
867
868 if let Some(rpc_storage) = &self.rpc_storage {
869 rpc_storage
870 .update(
871 mc_block_id,
872 block.clone(),
873 self.blacklisted_accounts.as_ref(),
874 &self.subscriptions,
875 )
876 .await?;
877 } else {
878 let updates = Self::collect_updates_without_storage(
880 block.clone(),
881 self.blacklisted_accounts.as_ref(),
882 )
883 .await?;
884 if !updates.is_empty() {
885 self.subscriptions.fanout_updates(updates).await;
886 }
887 }
888 Ok(())
889 }
890
891 async fn collect_updates_without_storage(
892 block: BlockStuff,
893 rpc_blacklist: Option<&BlacklistedAccounts>,
894 ) -> Result<Vec<AccountUpdate>> {
895 let rpc_blacklist = rpc_blacklist.cloned();
896 tokio::task::spawn_blocking(move || -> Result<_> {
897 let Ok(workchain) = i8::try_from(block.id().shard.workchain()) else {
898 return Ok(Vec::new());
899 };
900
901 let info = block.load_info()?;
902 let extra = block.load_extra()?;
903 let account_blocks = extra.account_blocks.load()?;
904 if account_blocks.is_empty() {
905 return Ok(Vec::new());
906 }
907
908 let mut updates = FastHashMap::<HashBytes, u64>::default();
909 let rpc_blacklist = rpc_blacklist.map(|x| x.load());
910
911 for item in account_blocks.iter() {
912 let (account, _, account_block) = item?;
913
914 let is_blacklisted = rpc_blacklist.as_ref().is_some_and(|set| {
915 let mut key = [0u8; 33];
916 key[0] = workchain as u8;
917 key[1..].copy_from_slice(account.as_slice());
918 set.contains(&key)
919 });
920
921 if is_blacklisted {
922 continue;
923 }
924
925 for tx_item in account_block.transactions.values() {
926 let (_, tx_cell) = tx_item?;
927 let tx = tx_cell.load()?;
928 updates
929 .entry(account)
930 .and_modify(|lt| {
931 if tx.lt > *lt {
932 *lt = tx.lt;
933 }
934 })
935 .or_insert(tx.lt);
936 }
937 }
938
939 let updates = updates
940 .into_iter()
941 .map(|(address, max_lt)| AccountUpdate {
942 address: StdAddr::new(workchain, address),
943 max_lt,
944 gen_utime: info.gen_utime,
945 })
946 .collect();
947
948 Ok(updates)
949 })
950 .await?
951 }
952
953 fn update_mc_block_cache(&self, block: &BlockStuff) -> Result<()> {
954 {
956 let info = block.load_info()?;
958 self.update_timings(info.gen_utime, info.seqno);
959
960 if !info.key_block {
961 return Ok(());
962 }
963 }
964
965 if self.config.storage.gc_is_enabled() {
967 self.gc_notify.notify_waiters();
968 }
969
970 let custom = block.load_custom()?;
971
972 if let Some(ref config) = custom.config {
974 self.update_config(block.as_ref().global_id, block.id().seqno, config);
975 } else {
976 tracing::error!("key block without config");
977 }
978
979 self.jrpc_cache.handle_key_block(block.as_ref());
980 self.proto_cache.handle_key_block(block.as_ref());
981 Ok(())
982 }
983
984 fn update_mc_info(&self, block: &BlockStuff) -> Result<()> {
985 let info = block.load_info()?;
986 let state_update = block.block().state_update.load()?;
987
988 let seqno = block.id().seqno;
989 let timings = GenTimings {
990 gen_lt: info.end_lt,
991 gen_utime: info.gen_utime,
992 };
993 let block_id = Arc::new(*block.id());
994 let tick = McTick {
995 seqno,
996 lt: timings.gen_lt,
997 utime: timings.gen_utime,
998 };
999 *self.mc_info.write() = LatestMcInfo {
1000 block_id,
1001 timings,
1002 state_hash: state_update.new_hash,
1003 };
1004 self.mc_tick_tx.send_replace(tick);
1005 Ok(())
1006 }
1007
1008 fn update_timings(&self, mc_gen_utime: u32, seqno: u32) {
1009 let time_diff = now_sec() as i64 - mc_gen_utime as i64;
1010 self.timings.store(Arc::new(StateTimings {
1011 last_mc_block_seqno: seqno,
1012 last_mc_utime: mc_gen_utime,
1013 mc_time_diff: time_diff,
1014 smallest_known_lt: self.rpc_storage.as_ref().map(|s| s.min_tx_lt()),
1015 }));
1016 }
1017
1018 fn update_config(&self, global_id: i32, seqno: u32, config: &BlockchainConfig) {
1019 self.jrpc_cache.handle_config(global_id, seqno, config);
1020 self.proto_cache.handle_config(global_id, seqno, config);
1021 }
1022
1023 fn update_accounts_cache(&self, block: &BlockStuff, state: &ShardStateStuff) -> Result<()> {
1024 let _histogram = HistogramGuard::begin("tycho_rpc_state_update_accounts_cache_time");
1025
1026 let shard = block.id().shard;
1027
1028 let block_info = block.load_info()?;
1030
1031 let accounts = state.state().load_accounts()?.dict().clone();
1032 let libraries = state.state().libraries.clone();
1033
1034 let cached = CachedAccounts {
1035 libraries,
1036 accounts,
1037 mc_ref_hanlde: state.ref_mc_state_handle().clone(),
1038 timings: GenTimings {
1039 gen_lt: block_info.end_lt,
1040 gen_utime: block_info.gen_utime,
1041 },
1042 };
1043
1044 if shard.is_masterchain() {
1045 if let Some(config) = load_blockchain_config(state) {
1047 self.blockchain_config.store(config);
1048 }
1049
1050 *self.mc_accounts.write() = Some(cached);
1052 } else {
1053 let mut cache = self.sc_accounts.write();
1054
1055 cache.insert(shard, cached);
1056 if block_info.after_merge || block_info.after_split {
1057 tracing::debug!("clearing shard states cache after shards merge/split");
1058
1059 match block_info.load_prev_ref()? {
1060 PrevBlockRef::Single(..) => {
1066 let parent = shard
1068 .merge()
1069 .ok_or(tycho_types::error::Error::InvalidData)?;
1070
1071 let opposite = shard.opposite().expect("after split");
1072
1073 if cache.contains_key(&shard) && cache.contains_key(&opposite) {
1075 cache.remove(&parent);
1076 }
1077 }
1078
1079 PrevBlockRef::AfterMerge { .. } => {
1085 let (left, right) = shard
1087 .split()
1088 .ok_or(tycho_types::error::Error::InvalidData)?;
1089
1090 cache.remove(&left);
1092 cache.remove(&right);
1093 }
1094 }
1095 }
1096 }
1097
1098 Ok(())
1099 }
1100}
1101
1102impl Drop for Inner {
1103 fn drop(&mut self) {
1104 if let Some(handle) = self.gc_handle.take() {
1105 handle.abort();
1106 }
1107
1108 if let Some(handle) = self.blacklist_watcher_handle.take() {
1109 handle.abort();
1110 }
1111 }
1112}
1113
1114fn load_blockchain_config(mc_state: &ShardStateStuff) -> Option<Arc<LatestBlockchainConfig>> {
1115 let extra = mc_state.state_extra().ok()?;
1116
1117 let now = mc_state.as_ref().gen_utime;
1119 match tycho_vm::SmcInfoTonV6::unpack_config_partial(&extra.config.params, now) {
1120 Ok(unpacked) => {
1121 let mut modifiers = tycho_vm::BehaviourModifiers::default();
1122 if let Ok(global_id) = extra.config.params.get_global_id()
1123 && let Ok(global) = extra.config.params.get_global_version()
1124 {
1125 modifiers.signature_with_id = global
1126 .capabilities
1127 .contains(GlobalCapability::CapSignatureWithId)
1128 .then_some(global_id);
1129 }
1130
1131 Some(Arc::new(LatestBlockchainConfig {
1132 raw: extra.config.params.clone(),
1133 unpacked,
1134 modifiers,
1135 }))
1136 }
1137 Err(e) => {
1138 tracing::error!(
1139 block_id = %mc_state.block_id(),
1140 "failed to unpack blockchain config: {e:?}",
1141 );
1142 None
1143 }
1144 }
1145}
1146
1147pub enum LoadedAccountState {
1148 NotFound {
1149 mc_block_id: Arc<BlockId>,
1150 timings: GenTimings,
1151 },
1152 Found {
1153 mc_block_id: Arc<BlockId>,
1154 state: ShardAccount,
1155 mc_ref_handle: RefMcStateHandle,
1156 timings: GenTimings,
1157 },
1158}
1159
1160struct CachedAccounts {
1161 libraries: Dict<HashBytes, LibDescr>,
1162 accounts: ShardAccountsDict,
1163 mc_ref_hanlde: RefMcStateHandle,
1164 timings: GenTimings,
1165}
1166
1167impl CachedAccounts {
1168 fn get(
1169 &self,
1170 mc_info: LatestMcInfo,
1171 account: &HashBytes,
1172 ) -> Result<LoadedAccountState, RpcStateError> {
1173 match self.accounts.get(account) {
1174 Ok(Some((_, state))) => Ok(LoadedAccountState::Found {
1175 mc_block_id: mc_info.block_id,
1176 state,
1177 mc_ref_handle: self.mc_ref_hanlde.clone(),
1178 timings: self.timings.max_by_lt(mc_info.timings),
1179 }),
1180 Ok(None) => Ok(LoadedAccountState::NotFound {
1181 mc_block_id: mc_info.block_id,
1182 timings: self.timings.max_by_lt(mc_info.timings),
1183 }),
1184 Err(e) => Err(RpcStateError::Internal(e.into())),
1185 }
1186 }
1187}
1188
1189type ShardAccountsDict = Dict<HashBytes, (DepthBalanceInfo, ShardAccount)>;
1190
1191async fn transactions_gc(
1193 config: TransactionsGcConfig,
1194 core_storage: CoreStorage,
1195 rpc_storage: Arc<RpcStorage>,
1196 gc_notify: Arc<Notify>,
1197) {
1198 let Ok(tx_ttl_sec) = config.tx_ttl.as_secs().try_into() else {
1199 return;
1200 };
1201
1202 loop {
1203 gc_notify.notified().await;
1205
1206 let target_utime = now_sec().saturating_sub(tx_ttl_sec);
1207 let gc_range = match find_closest_key_block_lt(&core_storage, target_utime).await {
1208 Ok(lt) => lt,
1209 Err(e) => {
1210 tracing::error!(
1211 target_utime,
1212 "failed to find the closest key block lt: {e:?}"
1213 );
1214 continue;
1215 }
1216 };
1217
1218 if let Err(e) = rpc_storage
1219 .remove_old_transactions(gc_range.mc_seqno, gc_range.lt, config.keep_tx_per_account)
1220 .await
1221 {
1222 tracing::error!(
1223 target_utime,
1224 mc_seqno = gc_range.mc_seqno,
1225 min_lt = gc_range.lt,
1226 "failed to remove old transactions: {e:?}"
1227 );
1228 }
1229 }
1230}
1231
1232pub async fn watch_blacklisted_accounts(config_path: PathBuf, accounts: BlacklistedAccounts) {
1233 tracing::info!(
1234 config_path = %config_path.display(),
1235 "started watching for changes in rpc blacklist config"
1236 );
1237
1238 let get_metadata = || {
1239 std::fs::metadata(&config_path)
1240 .ok()
1241 .and_then(|m| m.modified().ok())
1242 };
1243
1244 let mut last_modified = None;
1245
1246 let mut interval = tokio::time::interval(Duration::from_secs(10));
1247 loop {
1248 interval.tick().await;
1249
1250 let modified = get_metadata();
1251 if last_modified == modified {
1252 continue;
1253 }
1254 last_modified = modified;
1255
1256 match BlackListConfig::load_from(&config_path) {
1258 Ok(config) => accounts.update(config.accounts),
1259 Err(e) => {
1260 tracing::error!("failed to load blacklist config: {e:?}");
1261 }
1262 }
1263 }
1264}
1265
1266async fn find_closest_key_block_lt(storage: &CoreStorage, utime: u32) -> Result<GcRange> {
1267 let block_handle_storage = storage.block_handle_storage();
1268
1269 let handle = 'last_key_block: {
1271 let iter = block_handle_storage.key_blocks_iterator(KeyBlocksDirection::Backward);
1272 for key_block_id in iter {
1273 let handle = block_handle_storage
1274 .load_handle(&key_block_id)
1275 .with_context(|| format!("key block not found: {key_block_id}"))?;
1276
1277 if handle.gen_utime() <= utime {
1278 break 'last_key_block handle;
1279 }
1280 }
1281
1282 return Ok(GcRange::default());
1283 };
1284
1285 let block_proof = storage.block_storage().load_block_proof(&handle).await?;
1287
1288 let (virt_block, _) = block_proof.virtualize_block()?;
1290 let info = virt_block.info.load()?;
1291 Ok(GcRange {
1292 mc_seqno: info.seqno,
1293 lt: info.start_lt,
1294 })
1295}
1296
1297#[derive(Default)]
1298struct GcRange {
1299 mc_seqno: u32,
1300 lt: u64,
1301}
1302
1303#[derive(Debug, thiserror::Error)]
1304pub enum RpcStateError {
1305 #[error("not ready")]
1306 NotReady,
1307 #[error("not supported")]
1308 NotSupported,
1309 #[error("internal: {0}")]
1310 Internal(#[from] anyhow::Error),
1311 #[error(transparent)]
1312 BadRequest(#[from] BadRequestError),
1313}
1314
1315impl RpcStateError {
1316 pub fn internal<E: Into<anyhow::Error>>(error: E) -> Self {
1317 Self::Internal(error.into())
1318 }
1319
1320 pub fn bad_request<E: Into<anyhow::Error>>(error: E) -> Self {
1321 Self::BadRequest(BadRequestError(error.into()))
1322 }
1323}
1324
1325#[derive(Debug, thiserror::Error)]
1326#[error(transparent)]
1327pub struct BadRequestError(anyhow::Error);
1328
1329impl From<anyhow::Error> for BadRequestError {
1330 #[inline]
1331 fn from(value: anyhow::Error) -> Self {
1332 Self(value)
1333 }
1334}
1335
1336impl From<axum::extract::rejection::QueryRejection> for BadRequestError {
1337 #[inline]
1338 fn from(value: axum::extract::rejection::QueryRejection) -> Self {
1339 Self(anyhow::Error::msg(value.body_text()))
1340 }
1341}
1342
1343impl From<axum::extract::rejection::JsonRejection> for BadRequestError {
1344 #[inline]
1345 fn from(value: axum::extract::rejection::JsonRejection) -> Self {
1346 Self(anyhow::Error::msg(value.body_text()))
1347 }
1348}
1349
1350#[cfg(test)]
1351mod test {
1352 use std::str::FromStr;
1353
1354 use tycho_block_util::block::BlockStuffAug;
1355 use tycho_core::block_strider::DelayedTasks;
1356 use tycho_core::blockchain_rpc::BlockchainRpcService;
1357 use tycho_core::overlay_client::{PublicOverlayClient, PublicOverlayClientConfig};
1358 use tycho_core::storage::CoreStorageConfig;
1359 use tycho_network::{
1360 BoxCloneService, Network, NetworkConfig, OverlayId, PublicOverlay, Response, ServiceExt,
1361 ServiceRequest, service_query_fn,
1362 };
1363 use tycho_storage::StorageContext;
1364
1365 use super::*;
1366
1367 fn echo_service() -> BoxCloneService<ServiceRequest, Response> {
1368 let handle = |request: ServiceRequest| async move {
1369 tracing::trace!("received: {}", request.body.escape_ascii());
1370 let response = Response {
1371 version: Default::default(),
1372 body: request.body,
1373 };
1374 Some(response)
1375 };
1376 service_query_fn(handle).boxed_clone()
1377 }
1378
1379 fn make_network() -> Result<Network> {
1380 Network::builder()
1381 .with_config(NetworkConfig::default())
1382 .with_random_private_key()
1383 .build("127.0.0.1:0", echo_service())
1384 }
1385
1386 fn get_block() -> BlockStuffAug {
1387 let block_data = include_bytes!("../../../core/tests/data/block.bin");
1388
1389 let root = Boc::decode(block_data).unwrap();
1390 let block = root.parse::<Block>().unwrap();
1391
1392 let block_id = {
1393 let block_id_str = include_str!("../../../core/tests/data/block_id.txt");
1394 let block_id_str = block_id_str.trim_end();
1395 BlockId::from_str(block_id_str).unwrap()
1396 };
1397
1398 BlockStuff::from_block_and_root(&block_id, block, root, block_data.len())
1399 .with_archive_data(block_data.as_slice())
1400 }
1401
1402 fn get_empty_block() -> BlockStuffAug {
1403 let block_data = include_bytes!("../../../core/tests/data/empty_block.bin");
1404
1405 let root = Boc::decode(block_data).unwrap();
1406 let block = root.parse::<Block>().unwrap();
1407
1408 let block_id = BlockId {
1409 root_hash: *root.repr_hash(),
1410 ..Default::default()
1411 };
1412
1413 BlockStuff::from_block_and_root(&block_id, block, root, block_data.len())
1414 .with_archive_data(block_data.as_slice())
1415 }
1416
1417 #[tokio::test]
1418 async fn rpc_state_handle_block() -> Result<()> {
1419 tycho_util::test::init_logger("rpc_state_handle_block", "debug");
1420
1421 let (ctx, _tmp_dir) = StorageContext::new_temp().await?;
1422 let storage = CoreStorage::open(ctx, CoreStorageConfig::new_potato()).await?;
1423
1424 let config = RpcConfig::default();
1425
1426 let network = make_network()?;
1427
1428 let public_overlay = PublicOverlay::builder(PUBLIC_OVERLAY_ID).build(
1429 BlockchainRpcService::builder()
1430 .with_storage(storage.clone())
1431 .without_broadcast_listener()
1432 .build(),
1433 );
1434
1435 let blockchain_rpc_client = BlockchainRpcClient::builder()
1436 .with_public_overlay_client(PublicOverlayClient::new(
1437 network,
1438 public_overlay,
1439 PublicOverlayClientConfig::default(),
1440 ))
1441 .build();
1442
1443 let rpc_state = RpcState::builder()
1444 .with_config(config)
1445 .with_storage(storage)
1446 .with_blockchain_rpc_client(blockchain_rpc_client)
1447 .with_zerostate_id(ZerostateId::default())
1448 .build()?;
1449
1450 let block = get_block();
1451
1452 let (delayed_handle, delayed) = DelayedTasks::new();
1453 let ctx = BlockSubscriberContext {
1454 mc_block_id: BlockId::default(),
1455 mc_is_key_block: false,
1456 is_key_block: false,
1457 block: block.data,
1458 archive_data: block.archive_data,
1459 delayed,
1460 };
1461
1462 let (block_subscriber, _) = rpc_state.clone().split();
1463 let delayed_handle = delayed_handle.spawn();
1464 let prepared = block_subscriber.prepare_block(&ctx).await?;
1465
1466 block_subscriber.handle_block(&ctx, prepared).await?;
1467 delayed_handle.join().await?;
1468
1469 let account = HashBytes::from_str(
1470 "b06c29df56964af1aeb3bbda73ea5685bc54f4131c1c8559ba2c6f971976cd2b",
1471 )?;
1472
1473 let new_code_hash = HashBytes::from_str(
1474 "fc42205fe8c1c08846c1222c81eb416bdbf403253f6079691e04d52ce4400f8f",
1475 )?;
1476
1477 let account_by_code_hash = rpc_state
1478 .get_accounts_by_code_hash(&new_code_hash, None, None)?
1479 .last()
1480 .unwrap();
1481
1482 assert_eq!(account, account_by_code_hash.address);
1483
1484 Ok(())
1485 }
1486
1487 #[tokio::test]
1488 async fn rpc_state_handle_empty_block() -> Result<()> {
1489 tycho_util::test::init_logger("rpc_state_handle_empty_block", "debug");
1490
1491 let config = RpcConfig::default();
1492
1493 let (ctx, _tmp_dir) = StorageContext::new_temp().await?;
1494 let storage = CoreStorage::open(ctx, CoreStorageConfig::new_potato()).await?;
1495
1496 let network = make_network()?;
1497
1498 let public_overlay = PublicOverlay::builder(PUBLIC_OVERLAY_ID).build(
1499 BlockchainRpcService::builder()
1500 .with_storage(storage.clone())
1501 .without_broadcast_listener()
1502 .build(),
1503 );
1504
1505 let blockchain_rpc_client = BlockchainRpcClient::builder()
1506 .with_public_overlay_client(PublicOverlayClient::new(
1507 network,
1508 public_overlay,
1509 PublicOverlayClientConfig::default(),
1510 ))
1511 .build();
1512
1513 let rpc_state = RpcState::builder()
1514 .with_config(config)
1515 .with_storage(storage)
1516 .with_blockchain_rpc_client(blockchain_rpc_client)
1517 .with_zerostate_id(ZerostateId::default())
1518 .build()?;
1519
1520 let block = get_empty_block();
1521
1522 let (delayed_handle, delayed) = DelayedTasks::new();
1523 let ctx = BlockSubscriberContext {
1524 mc_block_id: BlockId::default(),
1525 mc_is_key_block: false,
1526 is_key_block: false,
1527 block: block.data,
1528 archive_data: block.archive_data,
1529 delayed,
1530 };
1531
1532 let (block_subscriber, _) = rpc_state.clone().split();
1533 let delayed_handle = delayed_handle.spawn();
1534 let prepared = block_subscriber.prepare_block(&ctx).await?;
1535
1536 block_subscriber.handle_block(&ctx, prepared).await?;
1537 delayed_handle.join().await?;
1538
1539 Ok(())
1540 }
1541
1542 static PUBLIC_OVERLAY_ID: OverlayId = OverlayId([1; 32]);
1543}