zksync_node_api_server 0.1.0

ZKsync API server
use std::collections::hash_map::{Entry, HashMap};

use tokio::sync::Mutex;
use zksync_dal::{transactions_dal::L2TxSubmissionResult, ConnectionPool, Core, CoreDal};
use zksync_shared_metrics::{TxStage, APP_METRICS};
use zksync_types::{fee::TransactionExecutionMetrics, l2::L2Tx, Address, Nonce, H256};

use super::{tx_sink::TxSink, SubmitTxError};
use crate::web3::metrics::API_METRICS;

/// Wrapper for the master DB pool that allows to submit transactions to the mempool.
#[derive(Debug)]
pub struct MasterPoolSink {
    master_pool: ConnectionPool<Core>,
    inflight_requests: Mutex<HashMap<(Address, Nonce), H256>>,
}

impl MasterPoolSink {
    pub fn new(master_pool: ConnectionPool<Core>) -> Self {
        Self {
            master_pool,
            inflight_requests: Mutex::new(HashMap::new()),
        }
    }
}

#[async_trait::async_trait]
impl TxSink for MasterPoolSink {
    async fn submit_tx(
        &self,
        tx: &L2Tx,
        execution_metrics: TransactionExecutionMetrics,
    ) -> Result<L2TxSubmissionResult, SubmitTxError> {
        let address_and_nonce = (tx.initiator_account(), tx.nonce());

        let mut lock = self.inflight_requests.lock().await;
        match lock.entry(address_and_nonce) {
            Entry::Occupied(entry) => {
                let submission_res_handle = if entry.get() == &tx.hash() {
                    L2TxSubmissionResult::Duplicate
                } else {
                    L2TxSubmissionResult::InsertionInProgress
                };
                APP_METRICS.processed_txs[&TxStage::Mempool(submission_res_handle)].inc();
                return Ok(submission_res_handle);
            }
            Entry::Vacant(entry) => {
                entry.insert(tx.hash());
                API_METRICS.inflight_tx_submissions.inc_by(1);
            }
        };
        drop(lock);

        let result = match self.master_pool.connection_tagged("api").await {
            Ok(mut connection) => connection
                .transactions_dal()
                .insert_transaction_l2(tx, execution_metrics)
                .await
                .map(|submission_res_handle| {
                    APP_METRICS.processed_txs[&TxStage::Mempool(submission_res_handle)].inc();
                    submission_res_handle
                })
                .map_err(|err| err.generalize().into()),
            Err(err) => Err(err.generalize().into()),
        };

        self.inflight_requests
            .lock()
            .await
            .remove(&address_and_nonce);
        API_METRICS.inflight_tx_submissions.dec_by(1);

        result
    }
}