Skip to main content

tycho_rpc/state/
mod.rs

1use std::path::PathBuf;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::time::Duration;
5
6use anyhow::{Context, Result};
7use arc_swap::ArcSwap;
8use axum::extract::FromRef;
9use futures_util::future::BoxFuture;
10use parking_lot::RwLock;
11use tokio::net::TcpListener;
12use tokio::sync::{Notify, Semaphore, watch};
13use tokio::task::JoinHandle;
14use tycho_block_util::block::BlockStuff;
15use tycho_block_util::state::{RefMcStateHandle, ShardStateStuff};
16use tycho_core::block_strider::{
17    BlockSubscriber, BlockSubscriberContext, StateSubscriber, StateSubscriberContext,
18};
19use tycho_core::blockchain_rpc::BlockchainRpcClient;
20use tycho_core::global_config::ZerostateId;
21use tycho_core::storage::{CoreStorage, KeyBlocksDirection};
22use tycho_rpc_subscriptions::SubscriberManagerConfig;
23use tycho_types::models::*;
24use tycho_types::prelude::*;
25use tycho_util::FastHashMap;
26use tycho_util::metrics::HistogramGuard;
27use tycho_util::time::now_sec;
28
29pub use self::storage::*;
30pub use self::subscriptions::{AccountUpdate, RegisterError, RpcSubscriptions};
31use crate::config::{BlackListConfig, RpcConfig, RpcStorageConfig, TransactionsGcConfig};
32use crate::endpoint::{JrpcEndpointCache, ProtoEndpointCache, RpcEndpoint, jrpc};
33use crate::models::{GenTimings, StateTimings};
34
35impl FromRef<RpcState> for Arc<RpcSubscriptions> {
36    fn from_ref(state: &RpcState) -> Self {
37        state.inner.subscriptions.clone()
38    }
39}
40
41impl FromRef<RpcState> for jrpc::SubscriptionsState {
42    fn from_ref(state: &RpcState) -> Self {
43        Arc::new(jrpc::StreamContext {
44            subs: state.inner.subscriptions.clone(),
45        })
46    }
47}
48
49mod db;
50mod storage;
51mod subscriptions;
52pub mod tables;
53
54const RPC_DB_SUBDIR: &str = "rpc";
55
56pub struct RpcStateBuilder<MandatoryFields = (CoreStorage, BlockchainRpcClient, ZerostateId)> {
57    config: RpcConfig,
58    mandatory_fields: MandatoryFields,
59}
60
61impl RpcStateBuilder {
62    pub fn build(self) -> Result<RpcState> {
63        let (core_storage, blockchain_rpc_client, zerostate_id) = self.mandatory_fields;
64        let config = self.config;
65
66        let gc_notify = Arc::new(Notify::new());
67
68        let mut gc_handle = None;
69        let mut blacklisted_accounts = None::<BlacklistedAccounts>;
70        let mut blacklist_watcher_handle = None;
71
72        let rpc_storage = match &config.storage {
73            RpcStorageConfig::Full {
74                gc, blacklist_path, ..
75            } => {
76                let db = core_storage.context().open_preconfigured(RPC_DB_SUBDIR)?;
77                let rpc_storage = Arc::new(RpcStorage::new(db));
78
79                if let Some(config) = gc {
80                    gc_handle = Some(tokio::spawn(transactions_gc(
81                        config.clone(),
82                        core_storage.clone(),
83                        rpc_storage.clone(),
84                        gc_notify.clone(),
85                    )));
86                }
87
88                if let Some(path) = blacklist_path {
89                    let accounts = BlacklistedAccounts::default();
90                    blacklisted_accounts = Some(accounts.clone());
91                    blacklist_watcher_handle = Some(tokio::spawn(watch_blacklisted_accounts(
92                        path.clone(),
93                        accounts,
94                    )));
95                }
96
97                Some(rpc_storage)
98            }
99            RpcStorageConfig::StateOnly => None,
100        };
101
102        let download_block_semaphore = Semaphore::new(config.max_parallel_block_downloads);
103
104        let run_get_method_semaphore = Arc::new(Semaphore::new(config.run_get_method.max_vms));
105
106        let subscriptions = Arc::new(RpcSubscriptions::new(
107            SubscriberManagerConfig::new(
108                config.subscriptions.max_addrs,
109                config.subscriptions.max_clients,
110            ),
111            config.subscriptions.queue_depth,
112        ));
113
114        // NOTE: Only a stub here.
115        let parsed_config = Arc::new(LatestBlockchainConfig::default());
116
117        let timings = GenTimings {
118            gen_lt: 0,
119            gen_utime: 0,
120        };
121        let tick = McTick {
122            seqno: 0,
123            lt: timings.gen_lt,
124            utime: timings.gen_utime,
125        };
126        let (mc_tick_tx, _) = watch::channel(tick);
127        let mc_info = LatestMcInfo {
128            block_id: Arc::new(BlockId {
129                shard: ShardIdent::MASTERCHAIN,
130                seqno: 0,
131                ..Default::default()
132            }),
133            timings,
134            state_hash: HashBytes::ZERO,
135        };
136
137        Ok(RpcState {
138            inner: Arc::new(Inner {
139                config,
140                core_storage,
141                rpc_storage,
142                blockchain_rpc_client,
143                mc_info: RwLock::new(mc_info),
144                mc_tick_tx,
145                mc_accounts: Default::default(),
146                sc_accounts: Default::default(),
147                run_get_method_semaphore,
148                download_block_semaphore,
149                is_ready: AtomicBool::new(false),
150                timings: ArcSwap::new(Default::default()),
151                blockchain_config: ArcSwap::new(parsed_config),
152                jrpc_cache: Default::default(),
153                proto_cache: Default::default(),
154                subscriptions,
155                zerostate_id,
156                gc_notify,
157                gc_handle,
158                blacklisted_accounts,
159                blacklist_watcher_handle,
160            }),
161        })
162    }
163}
164
165impl<T2, T3> RpcStateBuilder<((), T2, T3)> {
166    pub fn with_storage(self, storage: CoreStorage) -> RpcStateBuilder<(CoreStorage, T2, T3)> {
167        let (_, bc_rpc_client, zerostate_id) = self.mandatory_fields;
168
169        RpcStateBuilder {
170            config: self.config,
171            mandatory_fields: (storage, bc_rpc_client, zerostate_id),
172        }
173    }
174}
175
176impl<T1, T3> RpcStateBuilder<(T1, (), T3)> {
177    pub fn with_blockchain_rpc_client(
178        self,
179        client: BlockchainRpcClient,
180    ) -> RpcStateBuilder<(T1, BlockchainRpcClient, T3)> {
181        let (storage, _, zerostate_id) = self.mandatory_fields;
182
183        RpcStateBuilder {
184            config: self.config,
185            mandatory_fields: (storage, client, zerostate_id),
186        }
187    }
188}
189
190impl<T1, T2> RpcStateBuilder<(T1, T2, ())> {
191    pub fn with_zerostate_id(
192        self,
193        zerostate_id: ZerostateId,
194    ) -> RpcStateBuilder<(T1, T2, ZerostateId)> {
195        let (storage, client, _) = self.mandatory_fields;
196
197        RpcStateBuilder {
198            config: self.config,
199            mandatory_fields: (storage, client, zerostate_id),
200        }
201    }
202}
203
204impl<T1, T2, T3> RpcStateBuilder<(T1, T2, T3)> {
205    pub fn with_config(self, config: RpcConfig) -> RpcStateBuilder<(T1, T2, T3)> {
206        RpcStateBuilder { config, ..self }
207    }
208}
209
210#[derive(Clone)]
211#[repr(transparent)]
212pub struct RpcState {
213    inner: Arc<Inner>,
214}
215
216impl RpcState {
217    pub fn builder() -> RpcStateBuilder<((), (), ())> {
218        RpcStateBuilder {
219            config: RpcConfig::default(),
220            mandatory_fields: ((), (), ()),
221        }
222    }
223
224    pub fn split(self) -> (RpcBlockSubscriber, RpcStateSubscriber) {
225        let block_subscriber = RpcBlockSubscriber {
226            inner: self.inner.clone(),
227        };
228
229        let state_subscriber = RpcStateSubscriber { inner: self.inner };
230
231        (block_subscriber, state_subscriber)
232    }
233
234    pub async fn init(&self, mc_block_id: &BlockId) -> Result<()> {
235        self.inner.init(mc_block_id).await
236    }
237
238    pub async fn acquire_download_block_permit(&self) -> tokio::sync::SemaphorePermit<'_> {
239        // NOTE: We are not closing this semaphore explicitly.
240        self.inner.download_block_semaphore.acquire().await.unwrap()
241    }
242
243    pub async fn acquire_run_get_method_permit(&self) -> RunGetMethodPermit {
244        let config = &self.config().run_get_method;
245        if config.max_vms == 0 {
246            return RunGetMethodPermit::Disabled;
247        }
248
249        let fut = self.inner.run_get_method_semaphore.clone().acquire_owned();
250        match tokio::time::timeout(config.max_wait_for_vm, fut).await {
251            Ok(Ok(permit)) => RunGetMethodPermit::Acquired(permit),
252            Ok(Err(_)) => RunGetMethodPermit::Disabled,
253            Err(_) => RunGetMethodPermit::Timeout,
254        }
255    }
256
257    pub async fn bind_socket(&self) -> std::io::Result<TcpListener> {
258        TcpListener::bind(self.config().listen_addr).await
259    }
260
261    pub async fn bind_endpoint(&self) -> Result<RpcEndpoint> {
262        RpcEndpoint::builder().bind(self.clone()).await
263    }
264
265    pub fn config(&self) -> &RpcConfig {
266        &self.inner.config
267    }
268
269    pub fn is_ready(&self) -> bool {
270        self.inner.is_ready.load(Ordering::Acquire)
271    }
272
273    pub fn is_full(&self) -> bool {
274        self.inner.rpc_storage.is_some()
275    }
276
277    pub fn load_timings(&self) -> arc_swap::Guard<Arc<StateTimings>> {
278        self.inner.timings.load()
279    }
280
281    pub fn jrpc_cache(&self) -> &JrpcEndpointCache {
282        &self.inner.jrpc_cache
283    }
284
285    pub fn proto_cache(&self) -> &ProtoEndpointCache {
286        &self.inner.proto_cache
287    }
288
289    pub fn subscriptions(&self) -> &RpcSubscriptions {
290        &self.inner.subscriptions
291    }
292
293    pub fn zerostate_id(&self) -> &ZerostateId {
294        &self.inner.zerostate_id
295    }
296
297    pub fn get_latest_mc_info(&self) -> LatestMcInfo {
298        self.inner.mc_info.read().clone()
299    }
300
301    pub(crate) fn subscribe_mc_tick(&self) -> watch::Receiver<McTick> {
302        self.inner.mc_tick_tx.subscribe()
303    }
304
305    pub fn rpc_storage_snapshot(&self) -> Option<RpcSnapshot> {
306        let rpc_storage = self.inner.rpc_storage.as_ref()?;
307        rpc_storage.load_snapshot()
308    }
309
310    pub async fn broadcast_external_message(&self, message: &[u8]) {
311        metrics::counter!("tycho_rpc_broadcast_external_message_tx_bytes_total")
312            .increment(message.len() as u64);
313        self.inner
314            .blockchain_rpc_client
315            .broadcast_external_message(message)
316            .await;
317    }
318
319    pub fn get_unpacked_blockchain_config(&self) -> Arc<LatestBlockchainConfig> {
320        self.inner.blockchain_config.load_full()
321    }
322
323    pub fn get_brief_block_info(
324        &self,
325        block_id: &BlockIdShort,
326        snapshot: Option<&RpcSnapshot>,
327    ) -> Result<Option<(BlockId, u32, BriefBlockInfo)>, RpcStateError> {
328        let Some(storage) = &self.inner.rpc_storage else {
329            return Err(RpcStateError::NotSupported);
330        };
331        storage
332            .get_brief_block_info(block_id, snapshot)
333            .map_err(RpcStateError::Internal)
334    }
335
336    pub fn get_brief_shards_descr(
337        &self,
338        mc_seqno: u32,
339        snapshot: Option<&RpcSnapshot>,
340    ) -> Result<Option<Vec<BriefShardDescr>>, RpcStateError> {
341        let Some(storage) = &self.inner.rpc_storage else {
342            return Err(RpcStateError::NotSupported);
343        };
344        storage
345            .get_brief_shards_descr(mc_seqno, snapshot)
346            .map_err(RpcStateError::Internal)
347    }
348
349    pub fn get_libraries(&self) -> Dict<HashBytes, LibDescr> {
350        match self.inner.mc_accounts.read().as_ref() {
351            Some(cache) => cache.libraries.clone(),
352            None => Dict::new(),
353        }
354    }
355
356    pub fn get_raw_library(&self, hash: &HashBytes) -> Result<Option<Cell>> {
357        let guard = self.inner.mc_accounts.read();
358        match guard.as_ref() {
359            Some(cache) => Ok(cache.libraries.get(hash)?.map(|x| x.lib)),
360            None => Ok(None),
361        }
362    }
363
364    pub fn get_account_state(
365        &self,
366        address: &StdAddr,
367    ) -> Result<LoadedAccountState, RpcStateError> {
368        self.inner.get_account_state(address)
369    }
370
371    pub fn get_accounts_by_code_hash(
372        &self,
373        code_hash: &HashBytes,
374        continuation: Option<&StdAddr>,
375        snapshot: Option<RpcSnapshot>,
376    ) -> Result<CodeHashesIter<'_>, RpcStateError> {
377        let Some(storage) = &self.inner.rpc_storage else {
378            return Err(RpcStateError::NotSupported);
379        };
380        storage
381            .get_accounts_by_code_hash(code_hash, continuation, snapshot)
382            .map_err(RpcStateError::Internal)
383    }
384
385    pub fn get_known_mc_blocks_range(
386        &self,
387        snapshot: Option<&RpcSnapshot>,
388    ) -> Result<Option<(u32, u32)>, RpcStateError> {
389        let Some(storage) = &self.inner.rpc_storage else {
390            return Err(RpcStateError::NotSupported);
391        };
392        storage
393            .get_known_mc_blocks_range(snapshot)
394            .map_err(RpcStateError::Internal)
395    }
396
397    pub fn get_blocks_by_mc_seqno(
398        &self,
399        mc_seqno: u32,
400        snapshot: Option<RpcSnapshot>,
401    ) -> Result<Option<BlocksByMcSeqnoIter>, RpcStateError> {
402        let Some(storage) = &self.inner.rpc_storage else {
403            return Err(RpcStateError::NotSupported);
404        };
405        storage
406            .get_blocks_by_mc_seqno(mc_seqno, snapshot)
407            .map_err(RpcStateError::Internal)
408    }
409
410    pub fn get_block_transactions(
411        &self,
412        block_id: &BlockIdShort,
413        reverse: bool,
414        cursor: Option<&BlockTransactionsCursor>,
415        snapshot: Option<RpcSnapshot>,
416    ) -> Result<Option<BlockTransactionsIterBuilder>, RpcStateError> {
417        let Some(storage) = &self.inner.rpc_storage else {
418            return Err(RpcStateError::NotSupported);
419        };
420        storage
421            .get_block_transactions(block_id, reverse, cursor, snapshot)
422            .map_err(RpcStateError::Internal)
423    }
424
425    pub fn get_block_transaction_ids(
426        &self,
427        block_id: &BlockIdShort,
428        reverse: bool,
429        cursor: Option<&BlockTransactionsCursor>,
430        snapshot: Option<RpcSnapshot>,
431    ) -> Result<Option<BlockTransactionIdsIter>, RpcStateError> {
432        let Some(storage) = &self.inner.rpc_storage else {
433            return Err(RpcStateError::NotSupported);
434        };
435        storage
436            .get_block_transaction_ids(block_id, reverse, cursor, snapshot)
437            .map_err(RpcStateError::Internal)
438    }
439
440    pub fn get_transactions(
441        &self,
442        account: &StdAddr,
443        start_lt: Option<u64>,
444        end_lt: Option<u64>,
445        reverse: bool,
446        snapshot: Option<RpcSnapshot>,
447    ) -> Result<TransactionsIterBuilder, RpcStateError> {
448        let Some(storage) = &self.inner.rpc_storage else {
449            return Err(RpcStateError::NotSupported);
450        };
451        storage
452            .get_transactions(account, start_lt, end_lt, reverse, snapshot)
453            .map_err(RpcStateError::Internal)
454    }
455
456    pub fn get_transaction(
457        &self,
458        hash: &HashBytes,
459        snapshot: Option<&RpcSnapshot>,
460    ) -> Result<Option<TransactionData<'_>>, RpcStateError> {
461        let Some(storage) = &self.inner.rpc_storage else {
462            return Err(RpcStateError::NotSupported);
463        };
464        storage
465            .get_transaction(hash, snapshot)
466            .map_err(RpcStateError::Internal)
467    }
468
469    pub fn get_transaction_ext<'a>(
470        &'a self,
471        hash: &HashBytes,
472        snapshot: Option<&RpcSnapshot>,
473    ) -> Result<Option<TransactionDataExt<'a>>, RpcStateError> {
474        let Some(storage) = &self.inner.rpc_storage else {
475            return Err(RpcStateError::NotSupported);
476        };
477        storage
478            .get_transaction_ext(hash, snapshot)
479            .map_err(RpcStateError::Internal)
480    }
481
482    pub fn get_transaction_info(
483        &self,
484        hash: &HashBytes,
485        snapshot: Option<&RpcSnapshot>,
486    ) -> Result<Option<TransactionInfo>, RpcStateError> {
487        let Some(storage) = &self.inner.rpc_storage else {
488            return Err(RpcStateError::NotSupported);
489        };
490        storage
491            .get_transaction_info(hash, snapshot)
492            .map_err(RpcStateError::Internal)
493    }
494
495    pub fn get_src_transaction<'a>(
496        &'a self,
497        account: &StdAddr,
498        message_lt: u64,
499        snapshot: Option<&RpcSnapshot>,
500    ) -> Result<Option<impl AsRef<[u8]> + 'a>, RpcStateError> {
501        let Some(storage) = &self.inner.rpc_storage else {
502            return Err(RpcStateError::NotSupported);
503        };
504        storage
505            .get_src_transaction(account, message_lt, snapshot)
506            .map_err(RpcStateError::Internal)
507    }
508
509    pub fn get_dst_transaction<'a>(
510        &'a self,
511        in_msg_hash: &HashBytes,
512        snapshot: Option<&RpcSnapshot>,
513    ) -> Result<Option<impl AsRef<[u8]> + 'a>, RpcStateError> {
514        let Some(storage) = &self.inner.rpc_storage else {
515            return Err(RpcStateError::NotSupported);
516        };
517        storage
518            .get_dst_transaction(in_msg_hash, snapshot)
519            .map_err(RpcStateError::Internal)
520    }
521
522    // TODO: Add snapshot support.
523    pub async fn get_key_block_proof(
524        &self,
525        key_block_seqno: u32,
526    ) -> Option<(BlockId, impl AsRef<[u8]> + Send + Sync + 'static)> {
527        let blocks = self.inner.core_storage.block_storage();
528        let handles = self.inner.core_storage.block_handle_storage();
529
530        let handle = handles.load_key_block_handle(key_block_seqno)?;
531        let data = blocks.load_block_proof_raw(&handle).await.ok()?;
532        Some((*handle.id(), data))
533    }
534
535    // TODO: Remove.
536    pub async fn get_block_proof(
537        &self,
538        block_id: &BlockId,
539    ) -> Option<impl AsRef<[u8]> + Send + Sync + 'static> {
540        let blocks = self.inner.core_storage.block_storage();
541        let handles = self.inner.core_storage.block_handle_storage();
542
543        let handle = handles.load_handle(block_id)?;
544        blocks.load_block_proof_raw(&handle).await.ok()
545    }
546
547    // TODO: Remove.
548    pub async fn get_block_data(
549        &self,
550        block_id: &BlockId,
551    ) -> Option<impl AsRef<[u8]> + Send + Sync + 'static> {
552        let blocks = self.inner.core_storage.block_storage();
553        let handles = self.inner.core_storage.block_handle_storage();
554
555        let handle = handles.load_handle(block_id)?;
556        blocks.load_block_data_decompressed(&handle).await.ok()
557    }
558}
559
560pub struct RpcStateSubscriber {
561    inner: Arc<Inner>,
562}
563
564impl StateSubscriber for RpcStateSubscriber {
565    type HandleStateFut<'a> = futures_util::future::Ready<Result<()>>;
566
567    fn handle_state<'a>(&'a self, cx: &'a StateSubscriberContext) -> Self::HandleStateFut<'a> {
568        futures_util::future::ready(self.inner.update_accounts_cache(&cx.block, &cx.state))
569    }
570}
571
572pub struct RpcBlockSubscriber {
573    inner: Arc<Inner>,
574}
575
576impl BlockSubscriber for RpcBlockSubscriber {
577    type Prepared = JoinHandle<Result<()>>;
578
579    type PrepareBlockFut<'a> = futures_util::future::Ready<Result<Self::Prepared>>;
580    type HandleBlockFut<'a> = BoxFuture<'a, Result<()>>;
581
582    fn prepare_block<'a>(&'a self, cx: &'a BlockSubscriberContext) -> Self::PrepareBlockFut<'a> {
583        let handle = tokio::task::spawn({
584            let inner = self.inner.clone();
585            let mc_block_id = cx.mc_block_id;
586            let block = cx.block.clone();
587            async move { inner.update(&mc_block_id, &block).await }
588        });
589
590        futures_util::future::ready(Ok(handle))
591    }
592
593    fn handle_block<'a>(
594        &'a self,
595        ctx: &'a BlockSubscriberContext,
596        prepared: Self::Prepared,
597    ) -> Self::HandleBlockFut<'a> {
598        Box::pin(async move {
599            prepared.await??;
600            if ctx.block.id().is_masterchain() {
601                self.inner.update_mc_info(&ctx.block)?;
602
603                // NOTE: Update snapshot only for masterchain because it is handled last.
604                // It is updated only after processing all shards and mc block.
605                if let Some(rpc_storage) = &self.inner.rpc_storage {
606                    rpc_storage.update_snapshot();
607                }
608            }
609            Ok(())
610        })
611    }
612}
613
614pub enum RunGetMethodPermit {
615    Acquired(tokio::sync::OwnedSemaphorePermit),
616    Timeout,
617    Disabled,
618}
619
620struct Inner {
621    config: RpcConfig,
622    core_storage: CoreStorage,
623    rpc_storage: Option<Arc<RpcStorage>>,
624    blockchain_rpc_client: BlockchainRpcClient,
625    mc_info: RwLock<LatestMcInfo>,
626    mc_tick_tx: watch::Sender<McTick>,
627    mc_accounts: RwLock<Option<CachedAccounts>>,
628    sc_accounts: RwLock<FastHashMap<ShardIdent, CachedAccounts>>,
629    download_block_semaphore: Semaphore,
630    run_get_method_semaphore: Arc<Semaphore>,
631    is_ready: AtomicBool,
632    timings: ArcSwap<StateTimings>,
633    blockchain_config: ArcSwap<LatestBlockchainConfig>,
634    jrpc_cache: JrpcEndpointCache,
635    proto_cache: ProtoEndpointCache,
636    subscriptions: Arc<RpcSubscriptions>,
637    zerostate_id: ZerostateId,
638    // GC
639    gc_notify: Arc<Notify>,
640    gc_handle: Option<JoinHandle<()>>,
641    // RPC blacklist
642    blacklisted_accounts: Option<BlacklistedAccounts>,
643    blacklist_watcher_handle: Option<JoinHandle<()>>,
644}
645
646#[derive(Clone)]
647pub struct LatestMcInfo {
648    pub block_id: Arc<BlockId>,
649    pub timings: GenTimings,
650    pub state_hash: HashBytes,
651}
652
653#[derive(Clone, Copy, Debug)]
654pub(crate) struct McTick {
655    pub seqno: u32,
656    pub lt: u64,
657    pub utime: u32,
658}
659
660pub struct LatestBlockchainConfig {
661    pub raw: BlockchainConfigParams,
662    pub unpacked: tycho_vm::UnpackedConfig,
663    pub modifiers: tycho_vm::BehaviourModifiers,
664}
665
666impl Default for LatestBlockchainConfig {
667    fn default() -> Self {
668        Self {
669            raw: BlockchainConfigParams::from_raw(Cell::empty_cell()),
670            unpacked: tycho_vm::UnpackedConfig {
671                latest_storage_prices: None,
672                global_id: None,
673                mc_gas_prices: None,
674                gas_prices: None,
675                mc_fwd_prices: None,
676                fwd_prices: None,
677                size_limits_config: None,
678            },
679            modifiers: Default::default(),
680        }
681    }
682}
683
684impl Inner {
685    async fn init(self: &Arc<Self>, mc_block_id: &BlockId) -> Result<()> {
686        anyhow::ensure!(mc_block_id.is_masterchain(), "not a masterchain state");
687
688        let blocks = self.core_storage.block_storage();
689        let block_handles = self.core_storage.block_handle_storage();
690        let shard_states = self.core_storage.shard_state_storage();
691
692        // Try to init the latest known key block cache
693        'key_block: {
694            // NOTE: `+ 1` here because the `mc_block_id` might be a key block and we should use it
695            let Some(handle) = block_handles.find_prev_key_block(mc_block_id.seqno + 1) else {
696                break 'key_block;
697            };
698
699            if handle.has_data() {
700                let key_block = blocks.load_block_data(&handle).await?;
701                self.update_mc_block_cache(&key_block)?;
702            } else if handle.is_zerostate()
703                && let Some(proof) = self.core_storage.node_state().load_zerostate_proof()
704            {
705                let state = proof
706                    .virtualize()
707                    .parse::<ShardStateUnsplit>()
708                    .context("failed to deserialize zerostate proof")?;
709
710                let Some(extra) = state.load_custom()? else {
711                    anyhow::bail!("masterchain state without extra");
712                };
713
714                self.update_config(state.global_id, state.seqno, &extra.config);
715                tracing::warn!("no key block found during initialization");
716            } else {
717                let state = shard_states
718                    .load_state(handle.id().seqno, handle.id())
719                    .await
720                    .context("failed to load key block state on rpc init")?;
721                let state = state.as_ref();
722
723                let Some(extra) = state.load_custom()? else {
724                    anyhow::bail!("masterchain state without extra");
725                };
726
727                self.update_config(state.global_id, state.seqno, &extra.config);
728                tracing::warn!("no key block found during initialization");
729            }
730        }
731
732        let mut mc_state = shard_states
733            .load_state(mc_block_id.seqno, mc_block_id)
734            .await
735            .context("failed to load state on rpc init")?;
736        self.update_timings(mc_state.as_ref().gen_utime, mc_state.as_ref().seqno);
737
738        if let Some(rpc_storage) = &self.rpc_storage {
739            let node_instance_id = self.core_storage.node_state().load_instance_id();
740            let rpc_instance_id = rpc_storage.load_instance_id();
741
742            let make_cached_accounts = |state: &ShardStateStuff| -> Result<CachedAccounts> {
743                let state_info = state.as_ref();
744                Ok(CachedAccounts {
745                    libraries: Default::default(),
746                    accounts: state_info.load_accounts()?.dict().clone(),
747                    mc_ref_hanlde: state.ref_mc_state_handle().clone(),
748                    timings: GenTimings {
749                        gen_lt: state_info.gen_lt,
750                        gen_utime: state_info.gen_utime,
751                    },
752                })
753            };
754
755            let shards = mc_state.shards()?.clone();
756
757            if node_instance_id != rpc_instance_id || self.config.storage.is_force_reindex() {
758                // Reset masterchain accounts.
759                // NOTE: Consume shard state to prevent if from being fully loaded.
760                rpc_storage
761                    .reset_accounts(mc_state, self.config.shard_split_depth)
762                    .await?;
763
764                for item in shards.latest_blocks() {
765                    let block_id = item?;
766
767                    let state = shard_states
768                        .load_state(mc_block_id.seqno, &block_id)
769                        .await
770                        .context("failed to load shard state on init")?;
771
772                    // Reset shard accounts.
773                    // NOTE: Consume shard state to prevent if from being fully loaded.
774                    rpc_storage
775                        .reset_accounts(state, self.config.shard_split_depth)
776                        .await?;
777                }
778
779                // Rewrite RPC instance id
780                rpc_storage.store_instance_id(node_instance_id);
781
782                // Reload mc state.
783                mc_state = shard_states
784                    .load_state(mc_block_id.seqno, mc_block_id)
785                    .await
786                    .context("failed to reload mc state for rpc")?;
787            }
788
789            // Fill config.
790            if let Some(config) = load_blockchain_config(&mc_state) {
791                self.blockchain_config.store(config);
792            }
793
794            // Fill masterchain cache
795            *self.mc_accounts.write() = Some(make_cached_accounts(&mc_state)?);
796
797            for item in shards.latest_blocks() {
798                let block_id = item?;
799                let state = shard_states
800                    .load_state(mc_block_id.seqno, &block_id)
801                    .await
802                    .context("failed to load shard state to fill cache")?;
803
804                // Fill accounts cache.
805                self.sc_accounts
806                    .write()
807                    .insert(block_id.shard, make_cached_accounts(&state)?);
808            }
809        }
810
811        self.is_ready.store(true, Ordering::Release);
812        Ok(())
813    }
814
815    fn get_account_state(&self, address: &StdAddr) -> Result<LoadedAccountState, RpcStateError> {
816        let is_masterchain = address.is_masterchain();
817
818        if is_masterchain {
819            // Search in masterchain cache
820            match &*self.mc_accounts.read() {
821                None => Err(RpcStateError::NotReady),
822                Some(cache) => {
823                    let mc_info = self.mc_info.read().clone();
824                    cache.get(mc_info, &address.address)
825                }
826            }
827        } else {
828            let cache = self.sc_accounts.read();
829            let mc_info = self.mc_info.read().clone();
830
831            let mut state = Err(RpcStateError::NotReady);
832
833            // Search in all shard caches
834            let mut gen_utime = 0;
835            let mut found = false;
836            for (shard, cache) in &*cache {
837                if !shard.contains_account(&address.address) || cache.timings.gen_utime < gen_utime
838                {
839                    continue;
840                }
841
842                gen_utime = cache.timings.gen_utime;
843                state = cache.get(mc_info.clone(), &address.address);
844                found = true;
845            }
846
847            // Handle case when account is not found in any shard
848            if !found && gen_utime > 0 {
849                state = Ok(LoadedAccountState::NotFound {
850                    mc_block_id: mc_info.block_id,
851                    timings: mc_info.timings,
852                });
853            }
854
855            // Done
856            state
857        }
858    }
859
860    async fn update(&self, mc_block_id: &BlockId, block: &BlockStuff) -> Result<()> {
861        let _histogram = HistogramGuard::begin("tycho_rpc_state_update_time");
862
863        let is_masterchain = block.id().is_masterchain();
864        if is_masterchain {
865            self.update_mc_block_cache(block)?;
866        }
867
868        if let Some(rpc_storage) = &self.rpc_storage {
869            rpc_storage
870                .update(
871                    mc_block_id,
872                    block.clone(),
873                    self.blacklisted_accounts.as_ref(),
874                    &self.subscriptions,
875                )
876                .await?;
877        } else {
878            // with present storage we reuse parsed updates, so this step can't be a common case
879            let updates = Self::collect_updates_without_storage(
880                block.clone(),
881                self.blacklisted_accounts.as_ref(),
882            )
883            .await?;
884            if !updates.is_empty() {
885                self.subscriptions.fanout_updates(updates).await;
886            }
887        }
888        Ok(())
889    }
890
891    async fn collect_updates_without_storage(
892        block: BlockStuff,
893        rpc_blacklist: Option<&BlacklistedAccounts>,
894    ) -> Result<Vec<AccountUpdate>> {
895        let rpc_blacklist = rpc_blacklist.cloned();
896        tokio::task::spawn_blocking(move || -> Result<_> {
897            let Ok(workchain) = i8::try_from(block.id().shard.workchain()) else {
898                return Ok(Vec::new());
899            };
900
901            let info = block.load_info()?;
902            let extra = block.load_extra()?;
903            let account_blocks = extra.account_blocks.load()?;
904            if account_blocks.is_empty() {
905                return Ok(Vec::new());
906            }
907
908            let mut updates = FastHashMap::<HashBytes, u64>::default();
909            let rpc_blacklist = rpc_blacklist.map(|x| x.load());
910
911            for item in account_blocks.iter() {
912                let (account, _, account_block) = item?;
913
914                let is_blacklisted = rpc_blacklist.as_ref().is_some_and(|set| {
915                    let mut key = [0u8; 33];
916                    key[0] = workchain as u8;
917                    key[1..].copy_from_slice(account.as_slice());
918                    set.contains(&key)
919                });
920
921                if is_blacklisted {
922                    continue;
923                }
924
925                for tx_item in account_block.transactions.values() {
926                    let (_, tx_cell) = tx_item?;
927                    let tx = tx_cell.load()?;
928                    updates
929                        .entry(account)
930                        .and_modify(|lt| {
931                            if tx.lt > *lt {
932                                *lt = tx.lt;
933                            }
934                        })
935                        .or_insert(tx.lt);
936                }
937            }
938
939            let updates = updates
940                .into_iter()
941                .map(|(address, max_lt)| AccountUpdate {
942                    address: StdAddr::new(workchain, address),
943                    max_lt,
944                    gen_utime: info.gen_utime,
945                })
946                .collect();
947
948            Ok(updates)
949        })
950        .await?
951    }
952
953    fn update_mc_block_cache(&self, block: &BlockStuff) -> Result<()> {
954        // Update timings
955        {
956            // TODO: Add `OnceLock` for block `info` and `custom``
957            let info = block.load_info()?;
958            self.update_timings(info.gen_utime, info.seqno);
959
960            if !info.key_block {
961                return Ok(());
962            }
963        }
964
965        // Send a new KeyBlock notification to run GC
966        if self.config.storage.gc_is_enabled() {
967            self.gc_notify.notify_waiters();
968        }
969
970        let custom = block.load_custom()?;
971
972        // Try to update cached config:
973        if let Some(ref config) = custom.config {
974            self.update_config(block.as_ref().global_id, block.id().seqno, config);
975        } else {
976            tracing::error!("key block without config");
977        }
978
979        self.jrpc_cache.handle_key_block(block.as_ref());
980        self.proto_cache.handle_key_block(block.as_ref());
981        Ok(())
982    }
983
984    fn update_mc_info(&self, block: &BlockStuff) -> Result<()> {
985        let info = block.load_info()?;
986        let state_update = block.block().state_update.load()?;
987
988        let seqno = block.id().seqno;
989        let timings = GenTimings {
990            gen_lt: info.end_lt,
991            gen_utime: info.gen_utime,
992        };
993        let block_id = Arc::new(*block.id());
994        let tick = McTick {
995            seqno,
996            lt: timings.gen_lt,
997            utime: timings.gen_utime,
998        };
999        *self.mc_info.write() = LatestMcInfo {
1000            block_id,
1001            timings,
1002            state_hash: state_update.new_hash,
1003        };
1004        self.mc_tick_tx.send_replace(tick);
1005        Ok(())
1006    }
1007
1008    fn update_timings(&self, mc_gen_utime: u32, seqno: u32) {
1009        let time_diff = now_sec() as i64 - mc_gen_utime as i64;
1010        self.timings.store(Arc::new(StateTimings {
1011            last_mc_block_seqno: seqno,
1012            last_mc_utime: mc_gen_utime,
1013            mc_time_diff: time_diff,
1014            smallest_known_lt: self.rpc_storage.as_ref().map(|s| s.min_tx_lt()),
1015        }));
1016    }
1017
1018    fn update_config(&self, global_id: i32, seqno: u32, config: &BlockchainConfig) {
1019        self.jrpc_cache.handle_config(global_id, seqno, config);
1020        self.proto_cache.handle_config(global_id, seqno, config);
1021    }
1022
1023    fn update_accounts_cache(&self, block: &BlockStuff, state: &ShardStateStuff) -> Result<()> {
1024        let _histogram = HistogramGuard::begin("tycho_rpc_state_update_accounts_cache_time");
1025
1026        let shard = block.id().shard;
1027
1028        // TODO: Get `gen_utime` from somewhere else.
1029        let block_info = block.load_info()?;
1030
1031        let accounts = state.state().load_accounts()?.dict().clone();
1032        let libraries = state.state().libraries.clone();
1033
1034        let cached = CachedAccounts {
1035            libraries,
1036            accounts,
1037            mc_ref_hanlde: state.ref_mc_state_handle().clone(),
1038            timings: GenTimings {
1039                gen_lt: block_info.end_lt,
1040                gen_utime: block_info.gen_utime,
1041            },
1042        };
1043
1044        if shard.is_masterchain() {
1045            // Fill config.
1046            if let Some(config) = load_blockchain_config(state) {
1047                self.blockchain_config.store(config);
1048            }
1049
1050            // Update accounts cache.
1051            *self.mc_accounts.write() = Some(cached);
1052        } else {
1053            let mut cache = self.sc_accounts.write();
1054
1055            cache.insert(shard, cached);
1056            if block_info.after_merge || block_info.after_split {
1057                tracing::debug!("clearing shard states cache after shards merge/split");
1058
1059                match block_info.load_prev_ref()? {
1060                    // Block after split
1061                    //       |
1062                    //       *  - block A
1063                    //      / \
1064                    //     *   *  - blocks B', B"
1065                    PrevBlockRef::Single(..) => {
1066                        // Compute parent shard of the B' or B"
1067                        let parent = shard
1068                            .merge()
1069                            .ok_or(tycho_types::error::Error::InvalidData)?;
1070
1071                        let opposite = shard.opposite().expect("after split");
1072
1073                        // Remove parent shard state
1074                        if cache.contains_key(&shard) && cache.contains_key(&opposite) {
1075                            cache.remove(&parent);
1076                        }
1077                    }
1078
1079                    // Block after merge
1080                    //     *   *  - blocks A', A"
1081                    //      \ /
1082                    //       *  - block B
1083                    //       |
1084                    PrevBlockRef::AfterMerge { .. } => {
1085                        // Compute parent shard of the B' or B"
1086                        let (left, right) = shard
1087                            .split()
1088                            .ok_or(tycho_types::error::Error::InvalidData)?;
1089
1090                        // Find and remove all parent shards
1091                        cache.remove(&left);
1092                        cache.remove(&right);
1093                    }
1094                }
1095            }
1096        }
1097
1098        Ok(())
1099    }
1100}
1101
1102impl Drop for Inner {
1103    fn drop(&mut self) {
1104        if let Some(handle) = self.gc_handle.take() {
1105            handle.abort();
1106        }
1107
1108        if let Some(handle) = self.blacklist_watcher_handle.take() {
1109            handle.abort();
1110        }
1111    }
1112}
1113
1114fn load_blockchain_config(mc_state: &ShardStateStuff) -> Option<Arc<LatestBlockchainConfig>> {
1115    let extra = mc_state.state_extra().ok()?;
1116
1117    // Fill config.
1118    let now = mc_state.as_ref().gen_utime;
1119    match tycho_vm::SmcInfoTonV6::unpack_config_partial(&extra.config.params, now) {
1120        Ok(unpacked) => {
1121            let mut modifiers = tycho_vm::BehaviourModifiers::default();
1122            if let Ok(global_id) = extra.config.params.get_global_id()
1123                && let Ok(global) = extra.config.params.get_global_version()
1124            {
1125                modifiers.signature_with_id = global
1126                    .capabilities
1127                    .contains(GlobalCapability::CapSignatureWithId)
1128                    .then_some(global_id);
1129            }
1130
1131            Some(Arc::new(LatestBlockchainConfig {
1132                raw: extra.config.params.clone(),
1133                unpacked,
1134                modifiers,
1135            }))
1136        }
1137        Err(e) => {
1138            tracing::error!(
1139                block_id = %mc_state.block_id(),
1140                "failed to unpack blockchain config: {e:?}",
1141            );
1142            None
1143        }
1144    }
1145}
1146
1147pub enum LoadedAccountState {
1148    NotFound {
1149        mc_block_id: Arc<BlockId>,
1150        timings: GenTimings,
1151    },
1152    Found {
1153        mc_block_id: Arc<BlockId>,
1154        state: ShardAccount,
1155        mc_ref_handle: RefMcStateHandle,
1156        timings: GenTimings,
1157    },
1158}
1159
1160struct CachedAccounts {
1161    libraries: Dict<HashBytes, LibDescr>,
1162    accounts: ShardAccountsDict,
1163    mc_ref_hanlde: RefMcStateHandle,
1164    timings: GenTimings,
1165}
1166
1167impl CachedAccounts {
1168    fn get(
1169        &self,
1170        mc_info: LatestMcInfo,
1171        account: &HashBytes,
1172    ) -> Result<LoadedAccountState, RpcStateError> {
1173        match self.accounts.get(account) {
1174            Ok(Some((_, state))) => Ok(LoadedAccountState::Found {
1175                mc_block_id: mc_info.block_id,
1176                state,
1177                mc_ref_handle: self.mc_ref_hanlde.clone(),
1178                timings: self.timings.max_by_lt(mc_info.timings),
1179            }),
1180            Ok(None) => Ok(LoadedAccountState::NotFound {
1181                mc_block_id: mc_info.block_id,
1182                timings: self.timings.max_by_lt(mc_info.timings),
1183            }),
1184            Err(e) => Err(RpcStateError::Internal(e.into())),
1185        }
1186    }
1187}
1188
1189type ShardAccountsDict = Dict<HashBytes, (DepthBalanceInfo, ShardAccount)>;
1190
1191// TODO: Use only rpc storage to find closest key block LT.
1192async fn transactions_gc(
1193    config: TransactionsGcConfig,
1194    core_storage: CoreStorage,
1195    rpc_storage: Arc<RpcStorage>,
1196    gc_notify: Arc<Notify>,
1197) {
1198    let Ok(tx_ttl_sec) = config.tx_ttl.as_secs().try_into() else {
1199        return;
1200    };
1201
1202    loop {
1203        // Wait for a new KeyBlock notification
1204        gc_notify.notified().await;
1205
1206        let target_utime = now_sec().saturating_sub(tx_ttl_sec);
1207        let gc_range = match find_closest_key_block_lt(&core_storage, target_utime).await {
1208            Ok(lt) => lt,
1209            Err(e) => {
1210                tracing::error!(
1211                    target_utime,
1212                    "failed to find the closest key block lt: {e:?}"
1213                );
1214                continue;
1215            }
1216        };
1217
1218        if let Err(e) = rpc_storage
1219            .remove_old_transactions(gc_range.mc_seqno, gc_range.lt, config.keep_tx_per_account)
1220            .await
1221        {
1222            tracing::error!(
1223                target_utime,
1224                mc_seqno = gc_range.mc_seqno,
1225                min_lt = gc_range.lt,
1226                "failed to remove old transactions: {e:?}"
1227            );
1228        }
1229    }
1230}
1231
1232pub async fn watch_blacklisted_accounts(config_path: PathBuf, accounts: BlacklistedAccounts) {
1233    tracing::info!(
1234        config_path = %config_path.display(),
1235        "started watching for changes in rpc blacklist config"
1236    );
1237
1238    let get_metadata = || {
1239        std::fs::metadata(&config_path)
1240            .ok()
1241            .and_then(|m| m.modified().ok())
1242    };
1243
1244    let mut last_modified = None;
1245
1246    let mut interval = tokio::time::interval(Duration::from_secs(10));
1247    loop {
1248        interval.tick().await;
1249
1250        let modified = get_metadata();
1251        if last_modified == modified {
1252            continue;
1253        }
1254        last_modified = modified;
1255
1256        // Handle
1257        match BlackListConfig::load_from(&config_path) {
1258            Ok(config) => accounts.update(config.accounts),
1259            Err(e) => {
1260                tracing::error!("failed to load blacklist config: {e:?}");
1261            }
1262        }
1263    }
1264}
1265
1266async fn find_closest_key_block_lt(storage: &CoreStorage, utime: u32) -> Result<GcRange> {
1267    let block_handle_storage = storage.block_handle_storage();
1268
1269    // Find the key block with max seqno which was preduced not later than `utime`
1270    let handle = 'last_key_block: {
1271        let iter = block_handle_storage.key_blocks_iterator(KeyBlocksDirection::Backward);
1272        for key_block_id in iter {
1273            let handle = block_handle_storage
1274                .load_handle(&key_block_id)
1275                .with_context(|| format!("key block not found: {key_block_id}"))?;
1276
1277            if handle.gen_utime() <= utime {
1278                break 'last_key_block handle;
1279            }
1280        }
1281
1282        return Ok(GcRange::default());
1283    };
1284
1285    // Load block proof
1286    let block_proof = storage.block_storage().load_block_proof(&handle).await?;
1287
1288    // Read `start_lt` from virtual block info
1289    let (virt_block, _) = block_proof.virtualize_block()?;
1290    let info = virt_block.info.load()?;
1291    Ok(GcRange {
1292        mc_seqno: info.seqno,
1293        lt: info.start_lt,
1294    })
1295}
1296
1297#[derive(Default)]
1298struct GcRange {
1299    mc_seqno: u32,
1300    lt: u64,
1301}
1302
1303#[derive(Debug, thiserror::Error)]
1304pub enum RpcStateError {
1305    #[error("not ready")]
1306    NotReady,
1307    #[error("not supported")]
1308    NotSupported,
1309    #[error("internal: {0}")]
1310    Internal(#[from] anyhow::Error),
1311    #[error(transparent)]
1312    BadRequest(#[from] BadRequestError),
1313}
1314
1315impl RpcStateError {
1316    pub fn internal<E: Into<anyhow::Error>>(error: E) -> Self {
1317        Self::Internal(error.into())
1318    }
1319
1320    pub fn bad_request<E: Into<anyhow::Error>>(error: E) -> Self {
1321        Self::BadRequest(BadRequestError(error.into()))
1322    }
1323}
1324
1325#[derive(Debug, thiserror::Error)]
1326#[error(transparent)]
1327pub struct BadRequestError(anyhow::Error);
1328
1329impl From<anyhow::Error> for BadRequestError {
1330    #[inline]
1331    fn from(value: anyhow::Error) -> Self {
1332        Self(value)
1333    }
1334}
1335
1336impl From<axum::extract::rejection::QueryRejection> for BadRequestError {
1337    #[inline]
1338    fn from(value: axum::extract::rejection::QueryRejection) -> Self {
1339        Self(anyhow::Error::msg(value.body_text()))
1340    }
1341}
1342
1343impl From<axum::extract::rejection::JsonRejection> for BadRequestError {
1344    #[inline]
1345    fn from(value: axum::extract::rejection::JsonRejection) -> Self {
1346        Self(anyhow::Error::msg(value.body_text()))
1347    }
1348}
1349
1350#[cfg(test)]
1351mod test {
1352    use std::str::FromStr;
1353
1354    use tycho_block_util::block::BlockStuffAug;
1355    use tycho_core::block_strider::DelayedTasks;
1356    use tycho_core::blockchain_rpc::BlockchainRpcService;
1357    use tycho_core::overlay_client::{PublicOverlayClient, PublicOverlayClientConfig};
1358    use tycho_core::storage::CoreStorageConfig;
1359    use tycho_network::{
1360        BoxCloneService, Network, NetworkConfig, OverlayId, PublicOverlay, Response, ServiceExt,
1361        ServiceRequest, service_query_fn,
1362    };
1363    use tycho_storage::StorageContext;
1364
1365    use super::*;
1366
1367    fn echo_service() -> BoxCloneService<ServiceRequest, Response> {
1368        let handle = |request: ServiceRequest| async move {
1369            tracing::trace!("received: {}", request.body.escape_ascii());
1370            let response = Response {
1371                version: Default::default(),
1372                body: request.body,
1373            };
1374            Some(response)
1375        };
1376        service_query_fn(handle).boxed_clone()
1377    }
1378
1379    fn make_network() -> Result<Network> {
1380        Network::builder()
1381            .with_config(NetworkConfig::default())
1382            .with_random_private_key()
1383            .build("127.0.0.1:0", echo_service())
1384    }
1385
1386    fn get_block() -> BlockStuffAug {
1387        let block_data = include_bytes!("../../../core/tests/data/block.bin");
1388
1389        let root = Boc::decode(block_data).unwrap();
1390        let block = root.parse::<Block>().unwrap();
1391
1392        let block_id = {
1393            let block_id_str = include_str!("../../../core/tests/data/block_id.txt");
1394            let block_id_str = block_id_str.trim_end();
1395            BlockId::from_str(block_id_str).unwrap()
1396        };
1397
1398        BlockStuff::from_block_and_root(&block_id, block, root, block_data.len())
1399            .with_archive_data(block_data.as_slice())
1400    }
1401
1402    fn get_empty_block() -> BlockStuffAug {
1403        let block_data = include_bytes!("../../../core/tests/data/empty_block.bin");
1404
1405        let root = Boc::decode(block_data).unwrap();
1406        let block = root.parse::<Block>().unwrap();
1407
1408        let block_id = BlockId {
1409            root_hash: *root.repr_hash(),
1410            ..Default::default()
1411        };
1412
1413        BlockStuff::from_block_and_root(&block_id, block, root, block_data.len())
1414            .with_archive_data(block_data.as_slice())
1415    }
1416
1417    #[tokio::test]
1418    async fn rpc_state_handle_block() -> Result<()> {
1419        tycho_util::test::init_logger("rpc_state_handle_block", "debug");
1420
1421        let (ctx, _tmp_dir) = StorageContext::new_temp().await?;
1422        let storage = CoreStorage::open(ctx, CoreStorageConfig::new_potato()).await?;
1423
1424        let config = RpcConfig::default();
1425
1426        let network = make_network()?;
1427
1428        let public_overlay = PublicOverlay::builder(PUBLIC_OVERLAY_ID).build(
1429            BlockchainRpcService::builder()
1430                .with_storage(storage.clone())
1431                .without_broadcast_listener()
1432                .build(),
1433        );
1434
1435        let blockchain_rpc_client = BlockchainRpcClient::builder()
1436            .with_public_overlay_client(PublicOverlayClient::new(
1437                network,
1438                public_overlay,
1439                PublicOverlayClientConfig::default(),
1440            ))
1441            .build();
1442
1443        let rpc_state = RpcState::builder()
1444            .with_config(config)
1445            .with_storage(storage)
1446            .with_blockchain_rpc_client(blockchain_rpc_client)
1447            .with_zerostate_id(ZerostateId::default())
1448            .build()?;
1449
1450        let block = get_block();
1451
1452        let (delayed_handle, delayed) = DelayedTasks::new();
1453        let ctx = BlockSubscriberContext {
1454            mc_block_id: BlockId::default(),
1455            mc_is_key_block: false,
1456            is_key_block: false,
1457            block: block.data,
1458            archive_data: block.archive_data,
1459            delayed,
1460        };
1461
1462        let (block_subscriber, _) = rpc_state.clone().split();
1463        let delayed_handle = delayed_handle.spawn();
1464        let prepared = block_subscriber.prepare_block(&ctx).await?;
1465
1466        block_subscriber.handle_block(&ctx, prepared).await?;
1467        delayed_handle.join().await?;
1468
1469        let account = HashBytes::from_str(
1470            "b06c29df56964af1aeb3bbda73ea5685bc54f4131c1c8559ba2c6f971976cd2b",
1471        )?;
1472
1473        let new_code_hash = HashBytes::from_str(
1474            "fc42205fe8c1c08846c1222c81eb416bdbf403253f6079691e04d52ce4400f8f",
1475        )?;
1476
1477        let account_by_code_hash = rpc_state
1478            .get_accounts_by_code_hash(&new_code_hash, None, None)?
1479            .last()
1480            .unwrap();
1481
1482        assert_eq!(account, account_by_code_hash.address);
1483
1484        Ok(())
1485    }
1486
1487    #[tokio::test]
1488    async fn rpc_state_handle_empty_block() -> Result<()> {
1489        tycho_util::test::init_logger("rpc_state_handle_empty_block", "debug");
1490
1491        let config = RpcConfig::default();
1492
1493        let (ctx, _tmp_dir) = StorageContext::new_temp().await?;
1494        let storage = CoreStorage::open(ctx, CoreStorageConfig::new_potato()).await?;
1495
1496        let network = make_network()?;
1497
1498        let public_overlay = PublicOverlay::builder(PUBLIC_OVERLAY_ID).build(
1499            BlockchainRpcService::builder()
1500                .with_storage(storage.clone())
1501                .without_broadcast_listener()
1502                .build(),
1503        );
1504
1505        let blockchain_rpc_client = BlockchainRpcClient::builder()
1506            .with_public_overlay_client(PublicOverlayClient::new(
1507                network,
1508                public_overlay,
1509                PublicOverlayClientConfig::default(),
1510            ))
1511            .build();
1512
1513        let rpc_state = RpcState::builder()
1514            .with_config(config)
1515            .with_storage(storage)
1516            .with_blockchain_rpc_client(blockchain_rpc_client)
1517            .with_zerostate_id(ZerostateId::default())
1518            .build()?;
1519
1520        let block = get_empty_block();
1521
1522        let (delayed_handle, delayed) = DelayedTasks::new();
1523        let ctx = BlockSubscriberContext {
1524            mc_block_id: BlockId::default(),
1525            mc_is_key_block: false,
1526            is_key_block: false,
1527            block: block.data,
1528            archive_data: block.archive_data,
1529            delayed,
1530        };
1531
1532        let (block_subscriber, _) = rpc_state.clone().split();
1533        let delayed_handle = delayed_handle.spawn();
1534        let prepared = block_subscriber.prepare_block(&ctx).await?;
1535
1536        block_subscriber.handle_block(&ctx, prepared).await?;
1537        delayed_handle.join().await?;
1538
1539        Ok(())
1540    }
1541
1542    static PUBLIC_OVERLAY_ID: OverlayId = OverlayId([1; 32]);
1543}