use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use anyhow::{Context, Result};
use arc_swap::ArcSwap;
use axum::extract::FromRef;
use futures_util::future::BoxFuture;
use parking_lot::RwLock;
use tokio::net::TcpListener;
use tokio::sync::{Notify, Semaphore, watch};
use tokio::task::JoinHandle;
use tycho_block_util::block::BlockStuff;
use tycho_block_util::state::{RefMcStateHandle, ShardStateStuff};
use tycho_core::block_strider::{
BlockSubscriber, BlockSubscriberContext, StateSubscriber, StateSubscriberContext,
};
use tycho_core::blockchain_rpc::BlockchainRpcClient;
use tycho_core::global_config::ZerostateId;
use tycho_core::storage::{CoreStorage, KeyBlocksDirection};
use tycho_rpc_subscriptions::SubscriberManagerConfig;
use tycho_types::models::*;
use tycho_types::prelude::*;
use tycho_util::FastHashMap;
use tycho_util::metrics::HistogramGuard;
use tycho_util::time::now_sec;
pub use self::storage::*;
pub use self::subscriptions::{AccountUpdate, RegisterError, RpcSubscriptions};
use crate::config::{BlackListConfig, RpcConfig, RpcStorageConfig, TransactionsGcConfig};
use crate::endpoint::{JrpcEndpointCache, ProtoEndpointCache, RpcEndpoint, jrpc};
use crate::models::{GenTimings, StateTimings};
impl FromRef<RpcState> for Arc<RpcSubscriptions> {
fn from_ref(state: &RpcState) -> Self {
state.inner.subscriptions.clone()
}
}
impl FromRef<RpcState> for jrpc::SubscriptionsState {
fn from_ref(state: &RpcState) -> Self {
Arc::new(jrpc::StreamContext {
subs: state.inner.subscriptions.clone(),
})
}
}
mod db;
mod storage;
mod subscriptions;
pub mod tables;
const RPC_DB_SUBDIR: &str = "rpc";
pub struct RpcStateBuilder<MandatoryFields = (CoreStorage, BlockchainRpcClient, ZerostateId)> {
config: RpcConfig,
mandatory_fields: MandatoryFields,
}
impl RpcStateBuilder {
pub fn build(self) -> Result<RpcState> {
let (core_storage, blockchain_rpc_client, zerostate_id) = self.mandatory_fields;
let config = self.config;
let gc_notify = Arc::new(Notify::new());
let mut gc_handle = None;
let mut blacklisted_accounts = None::<BlacklistedAccounts>;
let mut blacklist_watcher_handle = None;
let rpc_storage = match &config.storage {
RpcStorageConfig::Full {
gc, blacklist_path, ..
} => {
let db = core_storage.context().open_preconfigured(RPC_DB_SUBDIR)?;
let rpc_storage = Arc::new(RpcStorage::new(db));
if let Some(config) = gc {
gc_handle = Some(tokio::spawn(transactions_gc(
config.clone(),
core_storage.clone(),
rpc_storage.clone(),
gc_notify.clone(),
)));
}
if let Some(path) = blacklist_path {
let accounts = BlacklistedAccounts::default();
blacklisted_accounts = Some(accounts.clone());
blacklist_watcher_handle = Some(tokio::spawn(watch_blacklisted_accounts(
path.clone(),
accounts,
)));
}
Some(rpc_storage)
}
RpcStorageConfig::StateOnly => None,
};
let download_block_semaphore = Semaphore::new(config.max_parallel_block_downloads);
let run_get_method_semaphore = Arc::new(Semaphore::new(config.run_get_method.max_vms));
let subscriptions = Arc::new(RpcSubscriptions::new(
SubscriberManagerConfig::new(
config.subscriptions.max_addrs,
config.subscriptions.max_clients,
),
config.subscriptions.queue_depth,
));
let parsed_config = Arc::new(LatestBlockchainConfig::default());
let timings = GenTimings {
gen_lt: 0,
gen_utime: 0,
};
let tick = McTick {
seqno: 0,
lt: timings.gen_lt,
utime: timings.gen_utime,
};
let (mc_tick_tx, _) = watch::channel(tick);
let mc_info = LatestMcInfo {
block_id: Arc::new(BlockId {
shard: ShardIdent::MASTERCHAIN,
seqno: 0,
..Default::default()
}),
timings,
state_hash: HashBytes::ZERO,
};
Ok(RpcState {
inner: Arc::new(Inner {
config,
core_storage,
rpc_storage,
blockchain_rpc_client,
mc_info: RwLock::new(mc_info),
mc_tick_tx,
mc_accounts: Default::default(),
sc_accounts: Default::default(),
run_get_method_semaphore,
download_block_semaphore,
is_ready: AtomicBool::new(false),
timings: ArcSwap::new(Default::default()),
blockchain_config: ArcSwap::new(parsed_config),
jrpc_cache: Default::default(),
proto_cache: Default::default(),
subscriptions,
zerostate_id,
gc_notify,
gc_handle,
blacklisted_accounts,
blacklist_watcher_handle,
}),
})
}
}
impl<T2, T3> RpcStateBuilder<((), T2, T3)> {
pub fn with_storage(self, storage: CoreStorage) -> RpcStateBuilder<(CoreStorage, T2, T3)> {
let (_, bc_rpc_client, zerostate_id) = self.mandatory_fields;
RpcStateBuilder {
config: self.config,
mandatory_fields: (storage, bc_rpc_client, zerostate_id),
}
}
}
impl<T1, T3> RpcStateBuilder<(T1, (), T3)> {
pub fn with_blockchain_rpc_client(
self,
client: BlockchainRpcClient,
) -> RpcStateBuilder<(T1, BlockchainRpcClient, T3)> {
let (storage, _, zerostate_id) = self.mandatory_fields;
RpcStateBuilder {
config: self.config,
mandatory_fields: (storage, client, zerostate_id),
}
}
}
impl<T1, T2> RpcStateBuilder<(T1, T2, ())> {
pub fn with_zerostate_id(
self,
zerostate_id: ZerostateId,
) -> RpcStateBuilder<(T1, T2, ZerostateId)> {
let (storage, client, _) = self.mandatory_fields;
RpcStateBuilder {
config: self.config,
mandatory_fields: (storage, client, zerostate_id),
}
}
}
impl<T1, T2, T3> RpcStateBuilder<(T1, T2, T3)> {
pub fn with_config(self, config: RpcConfig) -> RpcStateBuilder<(T1, T2, T3)> {
RpcStateBuilder { config, ..self }
}
}
#[derive(Clone)]
#[repr(transparent)]
pub struct RpcState {
inner: Arc<Inner>,
}
impl RpcState {
pub fn builder() -> RpcStateBuilder<((), (), ())> {
RpcStateBuilder {
config: RpcConfig::default(),
mandatory_fields: ((), (), ()),
}
}
pub fn split(self) -> (RpcBlockSubscriber, RpcStateSubscriber) {
let block_subscriber = RpcBlockSubscriber {
inner: self.inner.clone(),
};
let state_subscriber = RpcStateSubscriber { inner: self.inner };
(block_subscriber, state_subscriber)
}
pub async fn init(&self, mc_block_id: &BlockId) -> Result<()> {
self.inner.init(mc_block_id).await
}
pub async fn acquire_download_block_permit(&self) -> tokio::sync::SemaphorePermit<'_> {
self.inner.download_block_semaphore.acquire().await.unwrap()
}
pub async fn acquire_run_get_method_permit(&self) -> RunGetMethodPermit {
let config = &self.config().run_get_method;
if config.max_vms == 0 {
return RunGetMethodPermit::Disabled;
}
let fut = self.inner.run_get_method_semaphore.clone().acquire_owned();
match tokio::time::timeout(config.max_wait_for_vm, fut).await {
Ok(Ok(permit)) => RunGetMethodPermit::Acquired(permit),
Ok(Err(_)) => RunGetMethodPermit::Disabled,
Err(_) => RunGetMethodPermit::Timeout,
}
}
pub async fn bind_socket(&self) -> std::io::Result<TcpListener> {
TcpListener::bind(self.config().listen_addr).await
}
pub async fn bind_endpoint(&self) -> Result<RpcEndpoint> {
RpcEndpoint::builder().bind(self.clone()).await
}
pub fn config(&self) -> &RpcConfig {
&self.inner.config
}
pub fn is_ready(&self) -> bool {
self.inner.is_ready.load(Ordering::Acquire)
}
pub fn is_full(&self) -> bool {
self.inner.rpc_storage.is_some()
}
pub fn load_timings(&self) -> arc_swap::Guard<Arc<StateTimings>> {
self.inner.timings.load()
}
pub fn jrpc_cache(&self) -> &JrpcEndpointCache {
&self.inner.jrpc_cache
}
pub fn proto_cache(&self) -> &ProtoEndpointCache {
&self.inner.proto_cache
}
pub fn subscriptions(&self) -> &RpcSubscriptions {
&self.inner.subscriptions
}
pub fn zerostate_id(&self) -> &ZerostateId {
&self.inner.zerostate_id
}
pub fn get_latest_mc_info(&self) -> LatestMcInfo {
self.inner.mc_info.read().clone()
}
pub(crate) fn subscribe_mc_tick(&self) -> watch::Receiver<McTick> {
self.inner.mc_tick_tx.subscribe()
}
pub fn rpc_storage_snapshot(&self) -> Option<RpcSnapshot> {
let rpc_storage = self.inner.rpc_storage.as_ref()?;
rpc_storage.load_snapshot()
}
pub async fn broadcast_external_message(&self, message: &[u8]) {
metrics::counter!("tycho_rpc_broadcast_external_message_tx_bytes_total")
.increment(message.len() as u64);
self.inner
.blockchain_rpc_client
.broadcast_external_message(message)
.await;
}
pub fn get_unpacked_blockchain_config(&self) -> Arc<LatestBlockchainConfig> {
self.inner.blockchain_config.load_full()
}
pub fn get_brief_block_info(
&self,
block_id: &BlockIdShort,
snapshot: Option<&RpcSnapshot>,
) -> Result<Option<(BlockId, u32, BriefBlockInfo)>, RpcStateError> {
let Some(storage) = &self.inner.rpc_storage else {
return Err(RpcStateError::NotSupported);
};
storage
.get_brief_block_info(block_id, snapshot)
.map_err(RpcStateError::Internal)
}
pub fn get_brief_shards_descr(
&self,
mc_seqno: u32,
snapshot: Option<&RpcSnapshot>,
) -> Result<Option<Vec<BriefShardDescr>>, RpcStateError> {
let Some(storage) = &self.inner.rpc_storage else {
return Err(RpcStateError::NotSupported);
};
storage
.get_brief_shards_descr(mc_seqno, snapshot)
.map_err(RpcStateError::Internal)
}
pub fn get_libraries(&self) -> Dict<HashBytes, LibDescr> {
match self.inner.mc_accounts.read().as_ref() {
Some(cache) => cache.libraries.clone(),
None => Dict::new(),
}
}
pub fn get_raw_library(&self, hash: &HashBytes) -> Result<Option<Cell>> {
let guard = self.inner.mc_accounts.read();
match guard.as_ref() {
Some(cache) => Ok(cache.libraries.get(hash)?.map(|x| x.lib)),
None => Ok(None),
}
}
pub fn get_account_state(
&self,
address: &StdAddr,
) -> Result<LoadedAccountState, RpcStateError> {
self.inner.get_account_state(address)
}
pub fn get_accounts_by_code_hash(
&self,
code_hash: &HashBytes,
continuation: Option<&StdAddr>,
snapshot: Option<RpcSnapshot>,
) -> Result<CodeHashesIter<'_>, RpcStateError> {
let Some(storage) = &self.inner.rpc_storage else {
return Err(RpcStateError::NotSupported);
};
storage
.get_accounts_by_code_hash(code_hash, continuation, snapshot)
.map_err(RpcStateError::Internal)
}
pub fn get_known_mc_blocks_range(
&self,
snapshot: Option<&RpcSnapshot>,
) -> Result<Option<(u32, u32)>, RpcStateError> {
let Some(storage) = &self.inner.rpc_storage else {
return Err(RpcStateError::NotSupported);
};
storage
.get_known_mc_blocks_range(snapshot)
.map_err(RpcStateError::Internal)
}
pub fn get_blocks_by_mc_seqno(
&self,
mc_seqno: u32,
snapshot: Option<RpcSnapshot>,
) -> Result<Option<BlocksByMcSeqnoIter>, RpcStateError> {
let Some(storage) = &self.inner.rpc_storage else {
return Err(RpcStateError::NotSupported);
};
storage
.get_blocks_by_mc_seqno(mc_seqno, snapshot)
.map_err(RpcStateError::Internal)
}
pub fn get_block_transactions(
&self,
block_id: &BlockIdShort,
reverse: bool,
cursor: Option<&BlockTransactionsCursor>,
snapshot: Option<RpcSnapshot>,
) -> Result<Option<BlockTransactionsIterBuilder>, RpcStateError> {
let Some(storage) = &self.inner.rpc_storage else {
return Err(RpcStateError::NotSupported);
};
storage
.get_block_transactions(block_id, reverse, cursor, snapshot)
.map_err(RpcStateError::Internal)
}
pub fn get_block_transaction_ids(
&self,
block_id: &BlockIdShort,
reverse: bool,
cursor: Option<&BlockTransactionsCursor>,
snapshot: Option<RpcSnapshot>,
) -> Result<Option<BlockTransactionIdsIter>, RpcStateError> {
let Some(storage) = &self.inner.rpc_storage else {
return Err(RpcStateError::NotSupported);
};
storage
.get_block_transaction_ids(block_id, reverse, cursor, snapshot)
.map_err(RpcStateError::Internal)
}
pub fn get_transactions(
&self,
account: &StdAddr,
start_lt: Option<u64>,
end_lt: Option<u64>,
reverse: bool,
snapshot: Option<RpcSnapshot>,
) -> Result<TransactionsIterBuilder, RpcStateError> {
let Some(storage) = &self.inner.rpc_storage else {
return Err(RpcStateError::NotSupported);
};
storage
.get_transactions(account, start_lt, end_lt, reverse, snapshot)
.map_err(RpcStateError::Internal)
}
pub fn get_transaction(
&self,
hash: &HashBytes,
snapshot: Option<&RpcSnapshot>,
) -> Result<Option<TransactionData<'_>>, RpcStateError> {
let Some(storage) = &self.inner.rpc_storage else {
return Err(RpcStateError::NotSupported);
};
storage
.get_transaction(hash, snapshot)
.map_err(RpcStateError::Internal)
}
pub fn get_transaction_ext<'a>(
&'a self,
hash: &HashBytes,
snapshot: Option<&RpcSnapshot>,
) -> Result<Option<TransactionDataExt<'a>>, RpcStateError> {
let Some(storage) = &self.inner.rpc_storage else {
return Err(RpcStateError::NotSupported);
};
storage
.get_transaction_ext(hash, snapshot)
.map_err(RpcStateError::Internal)
}
pub fn get_transaction_info(
&self,
hash: &HashBytes,
snapshot: Option<&RpcSnapshot>,
) -> Result<Option<TransactionInfo>, RpcStateError> {
let Some(storage) = &self.inner.rpc_storage else {
return Err(RpcStateError::NotSupported);
};
storage
.get_transaction_info(hash, snapshot)
.map_err(RpcStateError::Internal)
}
pub fn get_src_transaction<'a>(
&'a self,
account: &StdAddr,
message_lt: u64,
snapshot: Option<&RpcSnapshot>,
) -> Result<Option<impl AsRef<[u8]> + 'a>, RpcStateError> {
let Some(storage) = &self.inner.rpc_storage else {
return Err(RpcStateError::NotSupported);
};
storage
.get_src_transaction(account, message_lt, snapshot)
.map_err(RpcStateError::Internal)
}
pub fn get_dst_transaction<'a>(
&'a self,
in_msg_hash: &HashBytes,
snapshot: Option<&RpcSnapshot>,
) -> Result<Option<impl AsRef<[u8]> + 'a>, RpcStateError> {
let Some(storage) = &self.inner.rpc_storage else {
return Err(RpcStateError::NotSupported);
};
storage
.get_dst_transaction(in_msg_hash, snapshot)
.map_err(RpcStateError::Internal)
}
pub async fn get_key_block_proof(
&self,
key_block_seqno: u32,
) -> Option<(BlockId, impl AsRef<[u8]> + Send + Sync + 'static)> {
let blocks = self.inner.core_storage.block_storage();
let handles = self.inner.core_storage.block_handle_storage();
let handle = handles.load_key_block_handle(key_block_seqno)?;
let data = blocks.load_block_proof_raw(&handle).await.ok()?;
Some((*handle.id(), data))
}
pub async fn get_block_proof(
&self,
block_id: &BlockId,
) -> Option<impl AsRef<[u8]> + Send + Sync + 'static> {
let blocks = self.inner.core_storage.block_storage();
let handles = self.inner.core_storage.block_handle_storage();
let handle = handles.load_handle(block_id)?;
blocks.load_block_proof_raw(&handle).await.ok()
}
pub async fn get_block_data(
&self,
block_id: &BlockId,
) -> Option<impl AsRef<[u8]> + Send + Sync + 'static> {
let blocks = self.inner.core_storage.block_storage();
let handles = self.inner.core_storage.block_handle_storage();
let handle = handles.load_handle(block_id)?;
blocks.load_block_data_decompressed(&handle).await.ok()
}
}
pub struct RpcStateSubscriber {
inner: Arc<Inner>,
}
impl StateSubscriber for RpcStateSubscriber {
type HandleStateFut<'a> = futures_util::future::Ready<Result<()>>;
fn handle_state<'a>(&'a self, cx: &'a StateSubscriberContext) -> Self::HandleStateFut<'a> {
futures_util::future::ready(self.inner.update_accounts_cache(&cx.block, &cx.state))
}
}
pub struct RpcBlockSubscriber {
inner: Arc<Inner>,
}
impl BlockSubscriber for RpcBlockSubscriber {
type Prepared = JoinHandle<Result<()>>;
type PrepareBlockFut<'a> = futures_util::future::Ready<Result<Self::Prepared>>;
type HandleBlockFut<'a> = BoxFuture<'a, Result<()>>;
fn prepare_block<'a>(&'a self, cx: &'a BlockSubscriberContext) -> Self::PrepareBlockFut<'a> {
let handle = tokio::task::spawn({
let inner = self.inner.clone();
let mc_block_id = cx.mc_block_id;
let block = cx.block.clone();
async move { inner.update(&mc_block_id, &block).await }
});
futures_util::future::ready(Ok(handle))
}
fn handle_block<'a>(
&'a self,
ctx: &'a BlockSubscriberContext,
prepared: Self::Prepared,
) -> Self::HandleBlockFut<'a> {
Box::pin(async move {
prepared.await??;
if ctx.block.id().is_masterchain() {
self.inner.update_mc_info(&ctx.block)?;
if let Some(rpc_storage) = &self.inner.rpc_storage {
rpc_storage.update_snapshot();
}
}
Ok(())
})
}
}
pub enum RunGetMethodPermit {
Acquired(tokio::sync::OwnedSemaphorePermit),
Timeout,
Disabled,
}
struct Inner {
config: RpcConfig,
core_storage: CoreStorage,
rpc_storage: Option<Arc<RpcStorage>>,
blockchain_rpc_client: BlockchainRpcClient,
mc_info: RwLock<LatestMcInfo>,
mc_tick_tx: watch::Sender<McTick>,
mc_accounts: RwLock<Option<CachedAccounts>>,
sc_accounts: RwLock<FastHashMap<ShardIdent, CachedAccounts>>,
download_block_semaphore: Semaphore,
run_get_method_semaphore: Arc<Semaphore>,
is_ready: AtomicBool,
timings: ArcSwap<StateTimings>,
blockchain_config: ArcSwap<LatestBlockchainConfig>,
jrpc_cache: JrpcEndpointCache,
proto_cache: ProtoEndpointCache,
subscriptions: Arc<RpcSubscriptions>,
zerostate_id: ZerostateId,
gc_notify: Arc<Notify>,
gc_handle: Option<JoinHandle<()>>,
blacklisted_accounts: Option<BlacklistedAccounts>,
blacklist_watcher_handle: Option<JoinHandle<()>>,
}
#[derive(Clone)]
pub struct LatestMcInfo {
pub block_id: Arc<BlockId>,
pub timings: GenTimings,
pub state_hash: HashBytes,
}
#[derive(Clone, Copy, Debug)]
pub(crate) struct McTick {
pub seqno: u32,
pub lt: u64,
pub utime: u32,
}
pub struct LatestBlockchainConfig {
pub raw: BlockchainConfigParams,
pub unpacked: tycho_vm::UnpackedConfig,
pub modifiers: tycho_vm::BehaviourModifiers,
}
impl Default for LatestBlockchainConfig {
fn default() -> Self {
Self {
raw: BlockchainConfigParams::from_raw(Cell::empty_cell()),
unpacked: tycho_vm::UnpackedConfig {
latest_storage_prices: None,
global_id: None,
mc_gas_prices: None,
gas_prices: None,
mc_fwd_prices: None,
fwd_prices: None,
size_limits_config: None,
},
modifiers: Default::default(),
}
}
}
impl Inner {
async fn init(self: &Arc<Self>, mc_block_id: &BlockId) -> Result<()> {
anyhow::ensure!(mc_block_id.is_masterchain(), "not a masterchain state");
let blocks = self.core_storage.block_storage();
let block_handles = self.core_storage.block_handle_storage();
let shard_states = self.core_storage.shard_state_storage();
'key_block: {
let Some(handle) = block_handles.find_prev_key_block(mc_block_id.seqno + 1) else {
break 'key_block;
};
if handle.has_data() {
let key_block = blocks.load_block_data(&handle).await?;
self.update_mc_block_cache(&key_block)?;
} else if handle.is_zerostate()
&& let Some(proof) = self.core_storage.node_state().load_zerostate_proof()
{
let state = proof
.virtualize()
.parse::<ShardStateUnsplit>()
.context("failed to deserialize zerostate proof")?;
let Some(extra) = state.load_custom()? else {
anyhow::bail!("masterchain state without extra");
};
self.update_config(state.global_id, state.seqno, &extra.config);
tracing::warn!("no key block found during initialization");
} else {
let state = shard_states
.load_state(handle.id().seqno, handle.id())
.await
.context("failed to load key block state on rpc init")?;
let state = state.as_ref();
let Some(extra) = state.load_custom()? else {
anyhow::bail!("masterchain state without extra");
};
self.update_config(state.global_id, state.seqno, &extra.config);
tracing::warn!("no key block found during initialization");
}
}
let mut mc_state = shard_states
.load_state(mc_block_id.seqno, mc_block_id)
.await
.context("failed to load state on rpc init")?;
self.update_timings(mc_state.as_ref().gen_utime, mc_state.as_ref().seqno);
if let Some(rpc_storage) = &self.rpc_storage {
let node_instance_id = self.core_storage.node_state().load_instance_id();
let rpc_instance_id = rpc_storage.load_instance_id();
let make_cached_accounts = |state: &ShardStateStuff| -> Result<CachedAccounts> {
let state_info = state.as_ref();
Ok(CachedAccounts {
libraries: Default::default(),
accounts: state_info.load_accounts()?.dict().clone(),
mc_ref_hanlde: state.ref_mc_state_handle().clone(),
timings: GenTimings {
gen_lt: state_info.gen_lt,
gen_utime: state_info.gen_utime,
},
})
};
let shards = mc_state.shards()?.clone();
if node_instance_id != rpc_instance_id || self.config.storage.is_force_reindex() {
rpc_storage
.reset_accounts(mc_state, self.config.shard_split_depth)
.await?;
for item in shards.latest_blocks() {
let block_id = item?;
let state = shard_states
.load_state(mc_block_id.seqno, &block_id)
.await
.context("failed to load shard state on init")?;
rpc_storage
.reset_accounts(state, self.config.shard_split_depth)
.await?;
}
rpc_storage.store_instance_id(node_instance_id);
mc_state = shard_states
.load_state(mc_block_id.seqno, mc_block_id)
.await
.context("failed to reload mc state for rpc")?;
}
if let Some(config) = load_blockchain_config(&mc_state) {
self.blockchain_config.store(config);
}
*self.mc_accounts.write() = Some(make_cached_accounts(&mc_state)?);
for item in shards.latest_blocks() {
let block_id = item?;
let state = shard_states
.load_state(mc_block_id.seqno, &block_id)
.await
.context("failed to load shard state to fill cache")?;
self.sc_accounts
.write()
.insert(block_id.shard, make_cached_accounts(&state)?);
}
}
self.is_ready.store(true, Ordering::Release);
Ok(())
}
fn get_account_state(&self, address: &StdAddr) -> Result<LoadedAccountState, RpcStateError> {
let is_masterchain = address.is_masterchain();
if is_masterchain {
match &*self.mc_accounts.read() {
None => Err(RpcStateError::NotReady),
Some(cache) => {
let mc_info = self.mc_info.read().clone();
cache.get(mc_info, &address.address)
}
}
} else {
let cache = self.sc_accounts.read();
let mc_info = self.mc_info.read().clone();
let mut state = Err(RpcStateError::NotReady);
let mut gen_utime = 0;
let mut found = false;
for (shard, cache) in &*cache {
if !shard.contains_account(&address.address) || cache.timings.gen_utime < gen_utime
{
continue;
}
gen_utime = cache.timings.gen_utime;
state = cache.get(mc_info.clone(), &address.address);
found = true;
}
if !found && gen_utime > 0 {
state = Ok(LoadedAccountState::NotFound {
mc_block_id: mc_info.block_id,
timings: mc_info.timings,
});
}
state
}
}
async fn update(&self, mc_block_id: &BlockId, block: &BlockStuff) -> Result<()> {
let _histogram = HistogramGuard::begin("tycho_rpc_state_update_time");
let is_masterchain = block.id().is_masterchain();
if is_masterchain {
self.update_mc_block_cache(block)?;
}
if let Some(rpc_storage) = &self.rpc_storage {
rpc_storage
.update(
mc_block_id,
block.clone(),
self.blacklisted_accounts.as_ref(),
&self.subscriptions,
)
.await?;
} else {
let updates = Self::collect_updates_without_storage(
block.clone(),
self.blacklisted_accounts.as_ref(),
)
.await?;
if !updates.is_empty() {
self.subscriptions.fanout_updates(updates).await;
}
}
Ok(())
}
async fn collect_updates_without_storage(
block: BlockStuff,
rpc_blacklist: Option<&BlacklistedAccounts>,
) -> Result<Vec<AccountUpdate>> {
let rpc_blacklist = rpc_blacklist.cloned();
tokio::task::spawn_blocking(move || -> Result<_> {
let Ok(workchain) = i8::try_from(block.id().shard.workchain()) else {
return Ok(Vec::new());
};
let info = block.load_info()?;
let extra = block.load_extra()?;
let account_blocks = extra.account_blocks.load()?;
if account_blocks.is_empty() {
return Ok(Vec::new());
}
let mut updates = FastHashMap::<HashBytes, u64>::default();
let rpc_blacklist = rpc_blacklist.map(|x| x.load());
for item in account_blocks.iter() {
let (account, _, account_block) = item?;
let is_blacklisted = rpc_blacklist.as_ref().is_some_and(|set| {
let mut key = [0u8; 33];
key[0] = workchain as u8;
key[1..].copy_from_slice(account.as_slice());
set.contains(&key)
});
if is_blacklisted {
continue;
}
for tx_item in account_block.transactions.values() {
let (_, tx_cell) = tx_item?;
let tx = tx_cell.load()?;
updates
.entry(account)
.and_modify(|lt| {
if tx.lt > *lt {
*lt = tx.lt;
}
})
.or_insert(tx.lt);
}
}
let updates = updates
.into_iter()
.map(|(address, max_lt)| AccountUpdate {
address: StdAddr::new(workchain, address),
max_lt,
gen_utime: info.gen_utime,
})
.collect();
Ok(updates)
})
.await?
}
fn update_mc_block_cache(&self, block: &BlockStuff) -> Result<()> {
{
let info = block.load_info()?;
self.update_timings(info.gen_utime, info.seqno);
if !info.key_block {
return Ok(());
}
}
if self.config.storage.gc_is_enabled() {
self.gc_notify.notify_waiters();
}
let custom = block.load_custom()?;
if let Some(ref config) = custom.config {
self.update_config(block.as_ref().global_id, block.id().seqno, config);
} else {
tracing::error!("key block without config");
}
self.jrpc_cache.handle_key_block(block.as_ref());
self.proto_cache.handle_key_block(block.as_ref());
Ok(())
}
fn update_mc_info(&self, block: &BlockStuff) -> Result<()> {
let info = block.load_info()?;
let state_update = block.block().state_update.load()?;
let seqno = block.id().seqno;
let timings = GenTimings {
gen_lt: info.end_lt,
gen_utime: info.gen_utime,
};
let block_id = Arc::new(*block.id());
let tick = McTick {
seqno,
lt: timings.gen_lt,
utime: timings.gen_utime,
};
*self.mc_info.write() = LatestMcInfo {
block_id,
timings,
state_hash: state_update.new_hash,
};
self.mc_tick_tx.send_replace(tick);
Ok(())
}
fn update_timings(&self, mc_gen_utime: u32, seqno: u32) {
let time_diff = now_sec() as i64 - mc_gen_utime as i64;
self.timings.store(Arc::new(StateTimings {
last_mc_block_seqno: seqno,
last_mc_utime: mc_gen_utime,
mc_time_diff: time_diff,
smallest_known_lt: self.rpc_storage.as_ref().map(|s| s.min_tx_lt()),
}));
}
fn update_config(&self, global_id: i32, seqno: u32, config: &BlockchainConfig) {
self.jrpc_cache.handle_config(global_id, seqno, config);
self.proto_cache.handle_config(global_id, seqno, config);
}
fn update_accounts_cache(&self, block: &BlockStuff, state: &ShardStateStuff) -> Result<()> {
let _histogram = HistogramGuard::begin("tycho_rpc_state_update_accounts_cache_time");
let shard = block.id().shard;
let block_info = block.load_info()?;
let accounts = state.state().load_accounts()?.dict().clone();
let libraries = state.state().libraries.clone();
let cached = CachedAccounts {
libraries,
accounts,
mc_ref_hanlde: state.ref_mc_state_handle().clone(),
timings: GenTimings {
gen_lt: block_info.end_lt,
gen_utime: block_info.gen_utime,
},
};
if shard.is_masterchain() {
if let Some(config) = load_blockchain_config(state) {
self.blockchain_config.store(config);
}
*self.mc_accounts.write() = Some(cached);
} else {
let mut cache = self.sc_accounts.write();
cache.insert(shard, cached);
if block_info.after_merge || block_info.after_split {
tracing::debug!("clearing shard states cache after shards merge/split");
match block_info.load_prev_ref()? {
PrevBlockRef::Single(..) => {
let parent = shard
.merge()
.ok_or(tycho_types::error::Error::InvalidData)?;
let opposite = shard.opposite().expect("after split");
if cache.contains_key(&shard) && cache.contains_key(&opposite) {
cache.remove(&parent);
}
}
PrevBlockRef::AfterMerge { .. } => {
let (left, right) = shard
.split()
.ok_or(tycho_types::error::Error::InvalidData)?;
cache.remove(&left);
cache.remove(&right);
}
}
}
}
Ok(())
}
}
impl Drop for Inner {
fn drop(&mut self) {
if let Some(handle) = self.gc_handle.take() {
handle.abort();
}
if let Some(handle) = self.blacklist_watcher_handle.take() {
handle.abort();
}
}
}
fn load_blockchain_config(mc_state: &ShardStateStuff) -> Option<Arc<LatestBlockchainConfig>> {
let extra = mc_state.state_extra().ok()?;
let now = mc_state.as_ref().gen_utime;
match tycho_vm::SmcInfoTonV6::unpack_config_partial(&extra.config.params, now) {
Ok(unpacked) => {
let mut modifiers = tycho_vm::BehaviourModifiers::default();
if let Ok(global_id) = extra.config.params.get_global_id()
&& let Ok(global) = extra.config.params.get_global_version()
{
modifiers.signature_with_id = global
.capabilities
.contains(GlobalCapability::CapSignatureWithId)
.then_some(global_id);
}
Some(Arc::new(LatestBlockchainConfig {
raw: extra.config.params.clone(),
unpacked,
modifiers,
}))
}
Err(e) => {
tracing::error!(
block_id = %mc_state.block_id(),
"failed to unpack blockchain config: {e:?}",
);
None
}
}
}
pub enum LoadedAccountState {
NotFound {
mc_block_id: Arc<BlockId>,
timings: GenTimings,
},
Found {
mc_block_id: Arc<BlockId>,
state: ShardAccount,
mc_ref_handle: RefMcStateHandle,
timings: GenTimings,
},
}
struct CachedAccounts {
libraries: Dict<HashBytes, LibDescr>,
accounts: ShardAccountsDict,
mc_ref_hanlde: RefMcStateHandle,
timings: GenTimings,
}
impl CachedAccounts {
fn get(
&self,
mc_info: LatestMcInfo,
account: &HashBytes,
) -> Result<LoadedAccountState, RpcStateError> {
match self.accounts.get(account) {
Ok(Some((_, state))) => Ok(LoadedAccountState::Found {
mc_block_id: mc_info.block_id,
state,
mc_ref_handle: self.mc_ref_hanlde.clone(),
timings: self.timings.max_by_lt(mc_info.timings),
}),
Ok(None) => Ok(LoadedAccountState::NotFound {
mc_block_id: mc_info.block_id,
timings: self.timings.max_by_lt(mc_info.timings),
}),
Err(e) => Err(RpcStateError::Internal(e.into())),
}
}
}
type ShardAccountsDict = Dict<HashBytes, (DepthBalanceInfo, ShardAccount)>;
async fn transactions_gc(
config: TransactionsGcConfig,
core_storage: CoreStorage,
rpc_storage: Arc<RpcStorage>,
gc_notify: Arc<Notify>,
) {
let Ok(tx_ttl_sec) = config.tx_ttl.as_secs().try_into() else {
return;
};
loop {
gc_notify.notified().await;
let target_utime = now_sec().saturating_sub(tx_ttl_sec);
let gc_range = match find_closest_key_block_lt(&core_storage, target_utime).await {
Ok(lt) => lt,
Err(e) => {
tracing::error!(
target_utime,
"failed to find the closest key block lt: {e:?}"
);
continue;
}
};
if let Err(e) = rpc_storage
.remove_old_transactions(gc_range.mc_seqno, gc_range.lt, config.keep_tx_per_account)
.await
{
tracing::error!(
target_utime,
mc_seqno = gc_range.mc_seqno,
min_lt = gc_range.lt,
"failed to remove old transactions: {e:?}"
);
}
}
}
pub async fn watch_blacklisted_accounts(config_path: PathBuf, accounts: BlacklistedAccounts) {
tracing::info!(
config_path = %config_path.display(),
"started watching for changes in rpc blacklist config"
);
let get_metadata = || {
std::fs::metadata(&config_path)
.ok()
.and_then(|m| m.modified().ok())
};
let mut last_modified = None;
let mut interval = tokio::time::interval(Duration::from_secs(10));
loop {
interval.tick().await;
let modified = get_metadata();
if last_modified == modified {
continue;
}
last_modified = modified;
match BlackListConfig::load_from(&config_path) {
Ok(config) => accounts.update(config.accounts),
Err(e) => {
tracing::error!("failed to load blacklist config: {e:?}");
}
}
}
}
async fn find_closest_key_block_lt(storage: &CoreStorage, utime: u32) -> Result<GcRange> {
let block_handle_storage = storage.block_handle_storage();
let handle = 'last_key_block: {
let iter = block_handle_storage.key_blocks_iterator(KeyBlocksDirection::Backward);
for key_block_id in iter {
let handle = block_handle_storage
.load_handle(&key_block_id)
.with_context(|| format!("key block not found: {key_block_id}"))?;
if handle.gen_utime() <= utime {
break 'last_key_block handle;
}
}
return Ok(GcRange::default());
};
let block_proof = storage.block_storage().load_block_proof(&handle).await?;
let (virt_block, _) = block_proof.virtualize_block()?;
let info = virt_block.info.load()?;
Ok(GcRange {
mc_seqno: info.seqno,
lt: info.start_lt,
})
}
#[derive(Default)]
struct GcRange {
mc_seqno: u32,
lt: u64,
}
#[derive(Debug, thiserror::Error)]
pub enum RpcStateError {
#[error("not ready")]
NotReady,
#[error("not supported")]
NotSupported,
#[error("internal: {0}")]
Internal(#[from] anyhow::Error),
#[error(transparent)]
BadRequest(#[from] BadRequestError),
}
impl RpcStateError {
pub fn internal<E: Into<anyhow::Error>>(error: E) -> Self {
Self::Internal(error.into())
}
pub fn bad_request<E: Into<anyhow::Error>>(error: E) -> Self {
Self::BadRequest(BadRequestError(error.into()))
}
}
#[derive(Debug, thiserror::Error)]
#[error(transparent)]
pub struct BadRequestError(anyhow::Error);
impl From<anyhow::Error> for BadRequestError {
#[inline]
fn from(value: anyhow::Error) -> Self {
Self(value)
}
}
impl From<axum::extract::rejection::QueryRejection> for BadRequestError {
#[inline]
fn from(value: axum::extract::rejection::QueryRejection) -> Self {
Self(anyhow::Error::msg(value.body_text()))
}
}
impl From<axum::extract::rejection::JsonRejection> for BadRequestError {
#[inline]
fn from(value: axum::extract::rejection::JsonRejection) -> Self {
Self(anyhow::Error::msg(value.body_text()))
}
}
#[cfg(test)]
mod test {
use std::str::FromStr;
use tycho_block_util::block::BlockStuffAug;
use tycho_core::block_strider::DelayedTasks;
use tycho_core::blockchain_rpc::BlockchainRpcService;
use tycho_core::overlay_client::{PublicOverlayClient, PublicOverlayClientConfig};
use tycho_core::storage::CoreStorageConfig;
use tycho_network::{
BoxCloneService, Network, NetworkConfig, OverlayId, PublicOverlay, Response, ServiceExt,
ServiceRequest, service_query_fn,
};
use tycho_storage::StorageContext;
use super::*;
fn echo_service() -> BoxCloneService<ServiceRequest, Response> {
let handle = |request: ServiceRequest| async move {
tracing::trace!("received: {}", request.body.escape_ascii());
let response = Response {
version: Default::default(),
body: request.body,
};
Some(response)
};
service_query_fn(handle).boxed_clone()
}
fn make_network() -> Result<Network> {
Network::builder()
.with_config(NetworkConfig::default())
.with_random_private_key()
.build("127.0.0.1:0", echo_service())
}
fn get_block() -> BlockStuffAug {
let block_data = include_bytes!("../../../core/tests/data/block.bin");
let root = Boc::decode(block_data).unwrap();
let block = root.parse::<Block>().unwrap();
let block_id = {
let block_id_str = include_str!("../../../core/tests/data/block_id.txt");
let block_id_str = block_id_str.trim_end();
BlockId::from_str(block_id_str).unwrap()
};
BlockStuff::from_block_and_root(&block_id, block, root, block_data.len())
.with_archive_data(block_data.as_slice())
}
fn get_empty_block() -> BlockStuffAug {
let block_data = include_bytes!("../../../core/tests/data/empty_block.bin");
let root = Boc::decode(block_data).unwrap();
let block = root.parse::<Block>().unwrap();
let block_id = BlockId {
root_hash: *root.repr_hash(),
..Default::default()
};
BlockStuff::from_block_and_root(&block_id, block, root, block_data.len())
.with_archive_data(block_data.as_slice())
}
#[tokio::test]
async fn rpc_state_handle_block() -> Result<()> {
tycho_util::test::init_logger("rpc_state_handle_block", "debug");
let (ctx, _tmp_dir) = StorageContext::new_temp().await?;
let storage = CoreStorage::open(ctx, CoreStorageConfig::new_potato()).await?;
let config = RpcConfig::default();
let network = make_network()?;
let public_overlay = PublicOverlay::builder(PUBLIC_OVERLAY_ID).build(
BlockchainRpcService::builder()
.with_storage(storage.clone())
.without_broadcast_listener()
.build(),
);
let blockchain_rpc_client = BlockchainRpcClient::builder()
.with_public_overlay_client(PublicOverlayClient::new(
network,
public_overlay,
PublicOverlayClientConfig::default(),
))
.build();
let rpc_state = RpcState::builder()
.with_config(config)
.with_storage(storage)
.with_blockchain_rpc_client(blockchain_rpc_client)
.with_zerostate_id(ZerostateId::default())
.build()?;
let block = get_block();
let (delayed_handle, delayed) = DelayedTasks::new();
let ctx = BlockSubscriberContext {
mc_block_id: BlockId::default(),
mc_is_key_block: false,
is_key_block: false,
is_top_block: false,
block: block.data,
archive_data: block.archive_data,
delayed,
};
let (block_subscriber, _) = rpc_state.clone().split();
let delayed_handle = delayed_handle.spawn();
let prepared = block_subscriber.prepare_block(&ctx).await?;
block_subscriber.handle_block(&ctx, prepared).await?;
delayed_handle.join().await?;
let account = HashBytes::from_str(
"b06c29df56964af1aeb3bbda73ea5685bc54f4131c1c8559ba2c6f971976cd2b",
)?;
let new_code_hash = HashBytes::from_str(
"fc42205fe8c1c08846c1222c81eb416bdbf403253f6079691e04d52ce4400f8f",
)?;
let account_by_code_hash = rpc_state
.get_accounts_by_code_hash(&new_code_hash, None, None)?
.last()
.unwrap();
assert_eq!(account, account_by_code_hash.address);
Ok(())
}
#[tokio::test]
async fn rpc_state_handle_empty_block() -> Result<()> {
tycho_util::test::init_logger("rpc_state_handle_empty_block", "debug");
let config = RpcConfig::default();
let (ctx, _tmp_dir) = StorageContext::new_temp().await?;
let storage = CoreStorage::open(ctx, CoreStorageConfig::new_potato()).await?;
let network = make_network()?;
let public_overlay = PublicOverlay::builder(PUBLIC_OVERLAY_ID).build(
BlockchainRpcService::builder()
.with_storage(storage.clone())
.without_broadcast_listener()
.build(),
);
let blockchain_rpc_client = BlockchainRpcClient::builder()
.with_public_overlay_client(PublicOverlayClient::new(
network,
public_overlay,
PublicOverlayClientConfig::default(),
))
.build();
let rpc_state = RpcState::builder()
.with_config(config)
.with_storage(storage)
.with_blockchain_rpc_client(blockchain_rpc_client)
.with_zerostate_id(ZerostateId::default())
.build()?;
let block = get_empty_block();
let (delayed_handle, delayed) = DelayedTasks::new();
let ctx = BlockSubscriberContext {
mc_block_id: BlockId::default(),
mc_is_key_block: false,
is_key_block: false,
is_top_block: false,
block: block.data,
archive_data: block.archive_data,
delayed,
};
let (block_subscriber, _) = rpc_state.clone().split();
let delayed_handle = delayed_handle.spawn();
let prepared = block_subscriber.prepare_block(&ctx).await?;
block_subscriber.handle_block(&ctx, prepared).await?;
delayed_handle.join().await?;
Ok(())
}
static PUBLIC_OVERLAY_ID: OverlayId = OverlayId([1; 32]);
}