use std::{sync::Arc, time::Duration};
use tokio::sync::RwLock;
use zksync_config::configs::chain::TimestampAsserterConfig;
use zksync_dal::node::{PoolResource, ReplicaPool};
use zksync_health_check::AppHealthCheck;
use zksync_node_fee_model::node::ApiFeeInputResource;
use zksync_node_framework::{
service::StopReceiver,
task::{Task, TaskId},
wiring_layer::{WiringError, WiringLayer},
FromContext, IntoContext,
};
use zksync_object_store::ObjectStore;
use zksync_shared_resources::contracts::L2ContractsResource;
use zksync_state::{PostgresStorageCaches, PostgresStorageCachesTask};
use zksync_types::{vm::FastVmMode, AccountTreeId, Address};
use zksync_vm_executor::node::ApiTransactionFilter;
use zksync_web3_decl::{
client::{DynClient, L2},
jsonrpsee,
namespaces::EnNamespaceClient as _,
};
use crate::{
execution_sandbox::{VmConcurrencyBarrier, VmConcurrencyLimiter},
tx_sender::{
tx_sink::TxSink, SandboxExecutorOptions, TimestampAsserterParams, TxSender,
TxSenderBuilder, TxSenderConfig,
},
};
#[derive(Debug)]
pub struct PostgresStorageCachesConfig {
pub factory_deps_cache_size: u64,
pub initial_writes_cache_size: u64,
pub latest_values_cache_size: u64,
pub latest_values_max_block_lag: u32,
}
#[derive(Debug)]
pub struct TxSenderLayer {
postgres_storage_caches_config: PostgresStorageCachesConfig,
max_vm_concurrency: usize,
whitelisted_tokens_for_aa_cache: bool,
vm_mode: FastVmMode,
timestamp_asserter_config: TimestampAsserterConfig,
tx_sender_config: TxSenderConfig,
}
#[derive(Debug, FromContext)]
pub struct Input {
app_health: Arc<AppHealthCheck>,
tx_sink: Arc<dyn TxSink>,
replica_pool: PoolResource<ReplicaPool>,
fee_input: ApiFeeInputResource,
main_node_client: Option<Box<DynClient<L2>>>,
transaction_filter: Option<ApiTransactionFilter>,
l2_contracts: L2ContractsResource,
core_object_store: Option<Arc<dyn ObjectStore>>,
}
#[derive(Debug, IntoContext)]
pub struct Output {
tx_sender: TxSender,
#[context(task)]
vm_concurrency_barrier: VmConcurrencyBarrier,
#[context(task)]
postgres_storage_caches_task: Option<PostgresStorageCachesTaskWrapper>,
#[context(task)]
whitelisted_tokens_for_aa_update_task: Option<WhitelistedTokensForAaUpdateTask>,
}
impl TxSenderLayer {
pub fn new(
postgres_storage_caches_config: PostgresStorageCachesConfig,
max_vm_concurrency: usize,
tx_sender_config: TxSenderConfig,
timestamp_asserter_config: TimestampAsserterConfig,
) -> Self {
Self {
postgres_storage_caches_config,
max_vm_concurrency,
whitelisted_tokens_for_aa_cache: false,
vm_mode: FastVmMode::Old,
timestamp_asserter_config,
tx_sender_config,
}
}
pub fn with_whitelisted_tokens_for_aa_cache(mut self, value: bool) -> Self {
self.whitelisted_tokens_for_aa_cache = value;
self
}
pub fn with_vm_mode(mut self, mode: FastVmMode) -> Self {
self.vm_mode = mode;
self
}
}
#[async_trait::async_trait]
impl WiringLayer for TxSenderLayer {
type Input = Input;
type Output = Output;
fn layer_name(&self) -> &'static str {
"tx_sender_layer"
}
async fn wire(self, input: Self::Input) -> Result<Self::Output, WiringError> {
let tx_sink = input.tx_sink;
let replica_pool = input.replica_pool.get().await?;
let transaction_filter = input.transaction_filter.map(|filter| filter.0);
let fee_input = input.fee_input.0;
let config = match input.l2_contracts.0.timestamp_asserter_addr {
Some(address) => {
let timestamp_asserter_config = self.timestamp_asserter_config;
self.tx_sender_config
.with_timestamp_asserter_params(TimestampAsserterParams {
address,
min_time_till_end: timestamp_asserter_config.min_time_till_end,
})
}
None => self.tx_sender_config,
};
let factory_deps_capacity = self.postgres_storage_caches_config.factory_deps_cache_size;
let initial_writes_capacity = self
.postgres_storage_caches_config
.initial_writes_cache_size;
let values_capacity = self.postgres_storage_caches_config.latest_values_cache_size;
let mut storage_caches =
PostgresStorageCaches::new(factory_deps_capacity, initial_writes_capacity);
let postgres_storage_caches_task = if values_capacity > 0 {
let update_task = storage_caches.configure_storage_values_cache(
values_capacity,
self.postgres_storage_caches_config
.latest_values_max_block_lag,
replica_pool.clone(),
);
Some(PostgresStorageCachesTaskWrapper(update_task))
} else {
None
};
let (vm_concurrency_limiter, vm_concurrency_barrier) =
VmConcurrencyLimiter::new(self.max_vm_concurrency);
let mut executor_options = SandboxExecutorOptions::new(
config.chain_id,
AccountTreeId::new(config.fee_account_addr),
config.validation_computational_gas_limit,
)
.await?;
executor_options.set_fast_vm_mode(self.vm_mode);
if let Some(store) = input.core_object_store {
executor_options.set_vm_dump_object_store(store);
}
let mut tx_sender = TxSenderBuilder::new(config, replica_pool, tx_sink);
if let Some(transaction_filter) = transaction_filter {
tx_sender = tx_sender.with_transaction_filter(transaction_filter);
}
let whitelisted_tokens_for_aa_update_task = if self.whitelisted_tokens_for_aa_cache {
let main_node_client = input.main_node_client.ok_or_else(|| {
WiringError::Configuration(
"Main node client is required for the whitelisted tokens for AA cache".into(),
)
})?;
let whitelisted_tokens = Arc::new(RwLock::new(Default::default()));
tx_sender = tx_sender.with_whitelisted_tokens_for_aa(whitelisted_tokens.clone());
Some(WhitelistedTokensForAaUpdateTask {
whitelisted_tokens: whitelisted_tokens.clone(),
main_node_client,
})
} else {
None
};
let tx_sender = tx_sender.build(
fee_input,
Arc::new(vm_concurrency_limiter),
executor_options,
storage_caches,
);
input
.app_health
.insert_custom_component(Arc::new(tx_sender.health_check()))
.map_err(WiringError::internal)?;
Ok(Output {
tx_sender,
postgres_storage_caches_task,
vm_concurrency_barrier,
whitelisted_tokens_for_aa_update_task,
})
}
}
#[derive(Debug)]
struct PostgresStorageCachesTaskWrapper(PostgresStorageCachesTask);
#[async_trait::async_trait]
impl Task for PostgresStorageCachesTaskWrapper {
fn id(&self) -> TaskId {
"postgres_storage_caches".into()
}
async fn run(self: Box<Self>, stop_receiver: StopReceiver) -> anyhow::Result<()> {
self.0.run(stop_receiver.0).await
}
}
#[async_trait::async_trait]
impl Task for VmConcurrencyBarrier {
fn id(&self) -> TaskId {
"vm_concurrency_barrier_task".into()
}
async fn run(mut self: Box<Self>, mut stop_receiver: StopReceiver) -> anyhow::Result<()> {
stop_receiver.0.changed().await?;
self.close();
self.wait_until_stopped().await;
Ok(())
}
}
#[derive(Debug)]
pub struct WhitelistedTokensForAaUpdateTask {
whitelisted_tokens: Arc<RwLock<Vec<Address>>>,
main_node_client: Box<DynClient<L2>>,
}
#[async_trait::async_trait]
impl Task for WhitelistedTokensForAaUpdateTask {
fn id(&self) -> TaskId {
"whitelisted_tokens_for_aa_update_task".into()
}
async fn run(mut self: Box<Self>, mut stop_receiver: StopReceiver) -> anyhow::Result<()> {
while !*stop_receiver.0.borrow_and_update() {
match self.main_node_client.whitelisted_tokens_for_aa().await {
Ok(tokens) => {
*self.whitelisted_tokens.write().await = tokens;
}
Err(jsonrpsee::core::client::Error::Call(error))
if error.code() == jsonrpsee::types::error::METHOD_NOT_FOUND_CODE =>
{
}
Err(err) => {
tracing::error!("Failed to query `whitelisted_tokens_for_aa`, error: {err:?}");
}
}
tokio::time::timeout(Duration::from_secs(30), stop_receiver.0.changed())
.await
.ok();
}
Ok(())
}
}