1use std::{
2 future::Future,
3 sync::{
4 atomic::{AtomicU32, Ordering},
5 Arc,
6 },
7 time::Instant,
8};
9
10use anyhow::Context as _;
11use futures::TryFutureExt;
12use lru::LruCache;
13use tokio::sync::{Mutex, RwLock};
14use vise::GaugeGuard;
15use zksync_config::{
16 configs::{
17 api::Web3JsonRpcConfig,
18 contracts::{
19 chain::L2Contracts,
20 ecosystem::{EcosystemCommonContracts, L1SpecificContracts},
21 SettlementLayerSpecificContracts,
22 },
23 },
24 GenesisConfig,
25};
26use zksync_dal::{Connection, ConnectionPool, Core, CoreDal, DalError};
27use zksync_metadata_calculator::api_server::TreeApiClient;
28use zksync_node_sync::SyncState;
29use zksync_types::{
30 api, commitment::L1BatchCommitmentMode, l2::L2Tx, settlement::SettlementLayer,
31 transaction_request::CallRequest, Address, L1BatchNumber, L1ChainId, L2BlockNumber, L2ChainId,
32 H256, U256, U64,
33};
34use zksync_web3_decl::{
35 client::{DynClient, L2},
36 error::Web3Error,
37 types::Filter,
38};
39
40use super::{
41 backend_jsonrpsee::MethodTracer,
42 mempool_cache::MempoolCache,
43 metrics::{FilterType, FILTER_METRICS},
44 TypedFilter,
45};
46use crate::{
47 execution_sandbox::{BlockArgs, BlockArgsError, BlockStartInfo},
48 tx_sender::{tx_sink::TxSink, TxSender},
49 utils::AccountTypesCache,
50 web3::metrics::FilterMetrics,
51};
52
53#[derive(Debug)]
54pub(super) enum PruneQuery {
55 BlockId(api::BlockId),
56 L1Batch(L1BatchNumber),
57}
58
59impl From<api::BlockId> for PruneQuery {
60 fn from(id: api::BlockId) -> Self {
61 Self::BlockId(id)
62 }
63}
64
65impl From<L2BlockNumber> for PruneQuery {
66 fn from(number: L2BlockNumber) -> Self {
67 Self::BlockId(api::BlockId::Number(number.0.into()))
68 }
69}
70
71impl From<L1BatchNumber> for PruneQuery {
72 fn from(number: L1BatchNumber) -> Self {
73 Self::L1Batch(number)
74 }
75}
76
77impl From<BlockArgsError> for Web3Error {
78 fn from(value: BlockArgsError) -> Self {
79 match value {
80 BlockArgsError::Pruned(l2_block) => Web3Error::PrunedBlock(l2_block),
81 BlockArgsError::Missing => Web3Error::NoBlock,
82 BlockArgsError::Database(error) => Web3Error::InternalError(error),
83 }
84 }
85}
86
87impl BlockStartInfo {
88 pub(super) async fn ensure_not_pruned(
89 &self,
90 query: impl Into<PruneQuery>,
91 storage: &mut Connection<'_, Core>,
92 ) -> Result<(), Web3Error> {
93 match query.into() {
94 PruneQuery::BlockId(id) => Ok(self.ensure_not_pruned_block(id, storage).await?),
95 PruneQuery::L1Batch(number) => {
96 let first_l1_batch = self.first_l1_batch(storage).await?;
97 if number < first_l1_batch {
98 return Err(Web3Error::PrunedL1Batch(first_l1_batch));
99 }
100 Ok(())
101 }
102 }
103 }
104}
105
106#[derive(Debug, Clone)]
110pub struct InternalApiConfigBase {
111 pub l1_chain_id: L1ChainId,
113 pub l2_chain_id: L2ChainId,
114 pub dummy_verifier: bool,
115 pub l1_batch_commit_data_generator_mode: L1BatchCommitmentMode,
116 pub max_tx_size: usize,
117 pub estimate_gas_scale_factor: f64,
118 pub estimate_gas_acceptable_overestimation: u32,
119 pub estimate_gas_optimize_search: bool,
120 pub req_entities_limit: usize,
121 pub fee_history_limit: u64,
122 pub filters_disabled: bool,
123 pub l1_to_l2_txs_paused: bool,
124}
125
126impl InternalApiConfigBase {
127 pub fn new(genesis: &GenesisConfig, web3_config: &Web3JsonRpcConfig) -> Self {
128 Self {
129 l1_chain_id: genesis.l1_chain_id,
130 l2_chain_id: genesis.l2_chain_id,
131 dummy_verifier: genesis.dummy_verifier,
132 l1_batch_commit_data_generator_mode: genesis.l1_batch_commit_data_generator_mode,
133 max_tx_size: web3_config.max_tx_size,
134 estimate_gas_scale_factor: web3_config.estimate_gas_scale_factor,
135 estimate_gas_acceptable_overestimation: web3_config
136 .estimate_gas_acceptable_overestimation,
137 estimate_gas_optimize_search: web3_config.estimate_gas_optimize_search,
138 req_entities_limit: web3_config.req_entities_limit(),
139 fee_history_limit: web3_config.fee_history_limit(),
140 filters_disabled: web3_config.filters_disabled,
141 l1_to_l2_txs_paused: false,
142 }
143 }
144
145 pub fn with_l1_to_l2_txs_paused(mut self, l1_to_l2_txs_paused: bool) -> Self {
146 self.l1_to_l2_txs_paused = l1_to_l2_txs_paused;
147 self
148 }
149}
150
151#[derive(Debug, Clone)]
157pub struct InternalApiConfig {
158 pub l1_chain_id: L1ChainId,
160 pub l2_chain_id: L2ChainId,
161 pub max_tx_size: usize,
162 pub estimate_gas_scale_factor: f64,
163 pub estimate_gas_acceptable_overestimation: u32,
164 pub estimate_gas_optimize_search: bool,
165 pub bridge_addresses: api::BridgeAddresses,
166 pub l1_ecosystem_contracts: EcosystemCommonContracts,
167 pub server_notifier_addr: Option<Address>,
168 pub l1_bytecodes_supplier_addr: Option<Address>,
169 pub l1_wrapped_base_token_store: Option<Address>,
170 pub l1_diamond_proxy_addr: Address,
171 pub l2_testnet_paymaster_addr: Option<Address>,
172 pub req_entities_limit: usize,
173 pub fee_history_limit: u64,
174 pub base_token_address: Option<Address>,
175 pub filters_disabled: bool,
176 pub dummy_verifier: bool,
177 pub l1_batch_commit_data_generator_mode: L1BatchCommitmentMode,
178 pub timestamp_asserter_address: Option<Address>,
179 pub l2_multicall3: Option<Address>,
180 pub l1_to_l2_txs_paused: bool,
181 pub settlement_layer: SettlementLayer,
182}
183
184impl InternalApiConfig {
185 pub fn from_base_and_contracts(
186 base: InternalApiConfigBase,
187 l1_contracts_config: &SettlementLayerSpecificContracts,
188 l1_ecosystem_contracts: &L1SpecificContracts,
189 l2_contracts: &L2Contracts,
190 settlement_layer: SettlementLayer,
191 ) -> Self {
192 Self {
193 l1_chain_id: base.l1_chain_id,
194 l2_chain_id: base.l2_chain_id,
195 max_tx_size: base.max_tx_size,
196 estimate_gas_scale_factor: base.estimate_gas_scale_factor,
197 estimate_gas_acceptable_overestimation: base.estimate_gas_acceptable_overestimation,
198 estimate_gas_optimize_search: base.estimate_gas_optimize_search,
199 bridge_addresses: api::BridgeAddresses {
200 l1_erc20_default_bridge: l1_ecosystem_contracts.erc_20_bridge,
201 l2_erc20_default_bridge: Some(l2_contracts.erc20_default_bridge),
202 l1_shared_default_bridge: l1_ecosystem_contracts.shared_bridge,
203 l2_shared_default_bridge: Some(l2_contracts.shared_bridge_addr),
204 l1_weth_bridge: Some(Address::zero()),
206 l2_weth_bridge: Some(Address::zero()),
207 l2_legacy_shared_bridge: l2_contracts.legacy_shared_bridge_addr,
208 },
209 l1_ecosystem_contracts: l1_contracts_config.ecosystem_contracts.clone(),
210 server_notifier_addr: l1_ecosystem_contracts.server_notifier_addr,
211 l1_bytecodes_supplier_addr: l1_ecosystem_contracts.bytecodes_supplier_addr,
212 l1_wrapped_base_token_store: l1_ecosystem_contracts.wrapped_base_token_store,
213 l1_diamond_proxy_addr: l1_contracts_config
214 .chain_contracts_config
215 .diamond_proxy_addr,
216 l2_testnet_paymaster_addr: l2_contracts.testnet_paymaster_addr,
217 req_entities_limit: base.req_entities_limit,
218 fee_history_limit: base.fee_history_limit,
219 base_token_address: Some(l1_ecosystem_contracts.base_token_address),
220 filters_disabled: base.filters_disabled,
221 dummy_verifier: base.dummy_verifier,
222 l1_batch_commit_data_generator_mode: base.l1_batch_commit_data_generator_mode,
223 timestamp_asserter_address: l2_contracts.timestamp_asserter_addr,
224 l2_multicall3: l2_contracts.multicall3,
225 l1_to_l2_txs_paused: base.l1_to_l2_txs_paused,
226 settlement_layer,
227 }
228 }
229
230 pub fn new(
231 web3_config: &Web3JsonRpcConfig,
232 l1_contracts_config: &SettlementLayerSpecificContracts,
233 l1_ecosystem_contracts: &L1SpecificContracts,
234 l2_contracts: &L2Contracts,
235 genesis_config: &GenesisConfig,
236 l1_to_l2_txs_paused: bool,
237 settlement_layer: SettlementLayer,
238 ) -> Self {
239 let base = InternalApiConfigBase::new(genesis_config, web3_config)
240 .with_l1_to_l2_txs_paused(l1_to_l2_txs_paused);
241 Self::from_base_and_contracts(
242 base,
243 l1_contracts_config,
244 l1_ecosystem_contracts,
245 l2_contracts,
246 settlement_layer,
247 )
248 }
249}
250
251#[derive(Debug, Clone, Default)]
256pub struct SealedL2BlockNumber(Arc<AtomicU32>);
257
258impl SealedL2BlockNumber {
259 pub fn update(&self, maybe_newer_l2_block_number: L2BlockNumber) -> L2BlockNumber {
264 let prev_value = self
265 .0
266 .fetch_max(maybe_newer_l2_block_number.0, Ordering::Relaxed);
267 L2BlockNumber(prev_value).max(maybe_newer_l2_block_number)
268 }
269
270 pub fn diff(&self, l2_block_number: L2BlockNumber) -> u32 {
271 let sealed_l2_block_number = self.update(l2_block_number);
272 sealed_l2_block_number.0.saturating_sub(l2_block_number.0)
273 }
274
275 pub(crate) fn diff_with_block_args(&self, block_args: &BlockArgs) -> u32 {
278 let diff = self.diff(block_args.resolved_block_number());
280
281 if block_args.resolves_to_latest_sealed_l2_block() {
282 0 } else {
284 diff
285 }
286 }
287}
288
289#[derive(Debug, Clone)]
290pub struct BridgeAddressesHandle(Arc<RwLock<api::BridgeAddresses>>);
291
292impl BridgeAddressesHandle {
293 pub fn new(bridge_addresses: api::BridgeAddresses) -> Self {
294 Self(Arc::new(RwLock::new(bridge_addresses)))
295 }
296
297 pub async fn update(&self, bridge_addresses: api::BridgeAddresses) {
298 *self.0.write().await = bridge_addresses;
299 }
300
301 pub async fn update_l1_shared_bridge(&self, l1_shared_bridge: Address) {
302 self.0.write().await.l1_shared_default_bridge = Some(l1_shared_bridge);
303 }
304
305 pub async fn update_l2_bridges(&self, l2_shared_bridge: Address) {
306 self.0.write().await.l2_shared_default_bridge = Some(l2_shared_bridge);
307 self.0.write().await.l2_erc20_default_bridge = Some(l2_shared_bridge);
308 }
309
310 pub async fn read(&self) -> api::BridgeAddresses {
311 self.0.read().await.clone()
312 }
313}
314
315#[derive(Debug, Clone)]
317pub(crate) struct RpcState {
318 pub(super) current_method: Arc<MethodTracer>,
319 pub(super) installed_filters: Option<Arc<Mutex<Filters>>>,
320 pub(super) connection_pool: ConnectionPool<Core>,
321 pub(super) tree_api: Option<Arc<dyn TreeApiClient>>,
322 pub(super) tx_sender: TxSender,
323 pub(super) sync_state: Option<SyncState>,
324 pub(super) api_config: InternalApiConfig,
325 pub(super) start_info: BlockStartInfo,
328 pub(super) mempool_cache: Option<MempoolCache>,
329 pub(super) account_types_cache: AccountTypesCache,
330 pub(super) last_sealed_l2_block: SealedL2BlockNumber,
331 pub(super) bridge_addresses_handle: BridgeAddressesHandle,
332 pub(super) l2_l1_log_proof_handler: Option<Box<DynClient<L2>>>,
333}
334
335impl RpcState {
336 pub fn parse_transaction_bytes(
337 &self,
338 bytes: &[u8],
339 block_args: &BlockArgs,
340 ) -> Result<(L2Tx, H256), Web3Error> {
341 let chain_id = self.api_config.l2_chain_id;
342 let (tx_request, hash) = api::TransactionRequest::from_bytes(bytes, chain_id)?;
343 Ok((
344 L2Tx::from_request(
345 tx_request,
346 self.api_config.max_tx_size,
347 block_args.use_evm_emulator(),
348 )?,
349 hash,
350 ))
351 }
352
353 pub fn u64_to_block_number(n: U64) -> L2BlockNumber {
354 if n.as_u64() > u32::MAX as u64 {
355 L2BlockNumber(u32::MAX)
356 } else {
357 L2BlockNumber(n.as_u32())
358 }
359 }
360
361 pub(crate) fn tx_sink(&self) -> &dyn TxSink {
362 self.tx_sender.0.tx_sink.as_ref()
363 }
364
365 #[track_caller]
369 pub(crate) fn acquire_connection(
370 &self,
371 ) -> impl Future<Output = Result<Connection<'static, Core>, Web3Error>> + '_ {
372 self.connection_pool
373 .connection_tagged("api")
374 .map_err(|err| err.generalize().into())
375 }
376
377 pub(crate) async fn resolve_block(
379 &self,
380 connection: &mut Connection<'_, Core>,
381 block: api::BlockId,
382 ) -> Result<L2BlockNumber, Web3Error> {
383 self.start_info.ensure_not_pruned(block, connection).await?;
384 connection
385 .blocks_web3_dal()
386 .resolve_block_id(block)
387 .await
388 .map_err(DalError::generalize)?
389 .ok_or(Web3Error::NoBlock)
390 }
391
392 pub(crate) async fn resolve_block_unchecked(
400 &self,
401 connection: &mut Connection<'_, Core>,
402 block: api::BlockId,
403 ) -> Result<Option<L2BlockNumber>, Web3Error> {
404 self.start_info.ensure_not_pruned(block, connection).await?;
405 match block {
406 api::BlockId::Number(api::BlockNumber::Number(number)) => {
407 Ok(u32::try_from(number).ok().map(L2BlockNumber))
408 }
409 api::BlockId::Number(api::BlockNumber::Earliest) => Ok(Some(L2BlockNumber(0))),
410 _ => Ok(connection
411 .blocks_web3_dal()
412 .resolve_block_id(block)
413 .await
414 .map_err(DalError::generalize)?),
415 }
416 }
417
418 pub(crate) async fn resolve_block_args(
419 &self,
420 connection: &mut Connection<'_, Core>,
421 block: api::BlockId,
422 ) -> Result<BlockArgs, Web3Error> {
423 BlockArgs::new(connection, block, &self.start_info)
424 .await
425 .map_err(|err| match err {
426 BlockArgsError::Pruned(number) => Web3Error::PrunedBlock(number),
427 BlockArgsError::Missing => Web3Error::NoBlock,
428 BlockArgsError::Database(err) => Web3Error::InternalError(err),
429 })
430 }
431
432 pub async fn resolve_filter_block_number(
433 &self,
434 block_number: Option<api::BlockNumber>,
435 ) -> Result<L2BlockNumber, Web3Error> {
436 if let Some(api::BlockNumber::Number(number)) = block_number {
437 return Ok(Self::u64_to_block_number(number));
438 }
439
440 let block_number = block_number.unwrap_or(api::BlockNumber::Latest);
441 let block_id = api::BlockId::Number(block_number);
442 let mut conn = self.acquire_connection().await?;
443 Ok(self.resolve_block(&mut conn, block_id).await.unwrap())
444 }
447
448 pub async fn resolve_filter_block_range(
449 &self,
450 filter: &Filter,
451 ) -> Result<(L2BlockNumber, L2BlockNumber), Web3Error> {
452 let from_block = self.resolve_filter_block_number(filter.from_block).await?;
453 let to_block = self.resolve_filter_block_number(filter.to_block).await?;
454 Ok((from_block, to_block))
455 }
456
457 pub async fn resolve_filter_block_hash(&self, filter: &mut Filter) -> Result<(), Web3Error> {
459 match (filter.block_hash, filter.from_block, filter.to_block) {
460 (Some(block_hash), None, None) => {
461 let mut storage = self.acquire_connection().await?;
462 let block_number = self
463 .resolve_block(&mut storage, api::BlockId::Hash(block_hash))
464 .await?;
465 filter.from_block = Some(api::BlockNumber::Number(block_number.0.into()));
466 filter.to_block = Some(api::BlockNumber::Number(block_number.0.into()));
467 Ok(())
468 }
469 (Some(_), _, _) => Err(Web3Error::InvalidFilterBlockHash),
470 (None, _, _) => Ok(()),
471 }
472 }
473
474 pub async fn get_filter_from_block(&self, filter: &Filter) -> Result<L2BlockNumber, Web3Error> {
477 let mut connection = self.acquire_connection().await?;
478 let pending_block = self
479 .resolve_block_unchecked(
480 &mut connection,
481 api::BlockId::Number(api::BlockNumber::Pending),
482 )
483 .await?
484 .context("Pending block number shouldn't be None")?;
485 let block_number = match filter.from_block {
486 Some(api::BlockNumber::Number(number)) => {
487 let block_number = Self::u64_to_block_number(number);
488 block_number.max(pending_block)
489 }
490 _ => pending_block,
491 };
492 Ok(block_number)
493 }
494
495 pub(crate) async fn set_nonce_for_call_request(
496 &self,
497 call_request: &mut CallRequest,
498 ) -> Result<(), Web3Error> {
499 if call_request.nonce.is_some() {
500 return Ok(());
501 }
502 let mut connection = self.acquire_connection().await?;
503 let latest_block_id = api::BlockId::Number(api::BlockNumber::Latest);
504 let latest_block_number = self.resolve_block(&mut connection, latest_block_id).await?;
505
506 let from = call_request.from.unwrap_or_default();
507 let address_historical_nonce = connection
508 .storage_web3_dal()
509 .get_address_historical_nonce(from, latest_block_number)
510 .await
511 .map_err(DalError::generalize)?;
512 call_request.nonce = Some(address_historical_nonce);
513 Ok(())
514 }
515}
516
517#[derive(Debug)]
519pub(crate) struct Filters(LruCache<U256, InstalledFilter>);
520
521#[derive(Debug)]
522struct InstalledFilter {
523 pub filter: TypedFilter,
524 metrics: &'static FilterMetrics,
525 _guard: GaugeGuard,
526 created_at: Instant,
527 last_request: Instant,
528 request_count: usize,
529}
530
531impl InstalledFilter {
532 pub fn new(filter: TypedFilter) -> Self {
533 let metrics = &FILTER_METRICS[&FilterType::from(&filter)];
534 let guard = metrics.filter_count.inc_guard(1);
535 Self {
536 filter,
537 metrics,
538 _guard: guard,
539 created_at: Instant::now(),
540 last_request: Instant::now(),
541 request_count: 0,
542 }
543 }
544
545 pub fn update_stats(&mut self) {
546 let previous_request_timestamp = self.last_request;
547 let now = Instant::now();
548
549 self.last_request = now;
550 self.request_count += 1;
551
552 self.metrics
553 .request_frequency
554 .observe(now - previous_request_timestamp);
555 }
556}
557
558impl Drop for InstalledFilter {
559 fn drop(&mut self) {
560 self.metrics.request_count.observe(self.request_count);
561 self.metrics
562 .filter_lifetime
563 .observe(self.created_at.elapsed());
564 }
565}
566
567impl Filters {
568 pub fn new(max_cap: Option<usize>) -> Self {
570 let state = match max_cap {
571 Some(max_cap) => {
572 LruCache::new(max_cap.try_into().expect("Filter capacity should not be 0"))
573 }
574 None => LruCache::unbounded(),
575 };
576 Self(state)
577 }
578
579 pub fn add(&mut self, filter: TypedFilter) -> U256 {
581 let idx = loop {
582 let val = H256::random().to_fixed_bytes().into();
583 if !self.0.contains(&val) {
584 break val;
585 }
586 };
587
588 self.0.push(idx, InstalledFilter::new(filter));
589
590 idx
591 }
592
593 pub fn get_and_update_stats(&mut self, index: U256) -> Option<TypedFilter> {
595 let installed_filter = self.0.get_mut(&index)?;
596
597 installed_filter.update_stats();
598
599 Some(installed_filter.filter.clone())
600 }
601
602 pub fn update(&mut self, index: U256, new_filter: TypedFilter) {
604 if let Some(installed_filter) = self.0.get_mut(&index) {
605 installed_filter.filter = new_filter;
606 }
607 }
608
609 pub fn remove(&mut self, index: U256) -> bool {
611 self.0.pop(&index).is_some()
612 }
613}
614
615#[cfg(test)]
616mod tests {
617 use chrono::NaiveDateTime;
618
619 #[test]
620 fn test_filters_functionality() {
621 use super::*;
622
623 let mut filters = Filters::new(Some(2));
624
625 let filter1 = TypedFilter::Events(Filter::default(), L2BlockNumber::default());
626 let filter2 = TypedFilter::Blocks(L2BlockNumber::default());
627 let filter3 = TypedFilter::PendingTransactions(NaiveDateTime::default());
628
629 let idx1 = filters.add(filter1.clone());
630 let idx2 = filters.add(filter2);
631 let idx3 = filters.add(filter3);
632
633 assert_eq!(filters.0.len(), 2);
634 assert!(!filters.0.contains(&idx1));
635 assert!(filters.0.contains(&idx2));
636 assert!(filters.0.contains(&idx3));
637
638 filters.get_and_update_stats(idx2);
639
640 let idx1 = filters.add(filter1);
641 assert_eq!(filters.0.len(), 2);
642 assert!(filters.0.contains(&idx1));
643 assert!(filters.0.contains(&idx2));
644 assert!(!filters.0.contains(&idx3));
645
646 filters.remove(idx1);
647
648 assert_eq!(filters.0.len(), 1);
649 assert!(!filters.0.contains(&idx1));
650 assert!(filters.0.contains(&idx2));
651 assert!(!filters.0.contains(&idx3));
652 }
653}