Skip to main content

dusk_node/
mempool.rs

1// This Source Code Form is subject to the terms of the Mozilla Public
2// License, v. 2.0. If a copy of the MPL was not distributed with this
3// file, You can obtain one at http://mozilla.org/MPL/2.0/.
4//
5// Copyright (c) DUSK NETWORK. All rights reserved.
6
7pub mod conf;
8
9use std::sync::Arc;
10use std::time::Duration;
11
12use anyhow::anyhow;
13use async_trait::async_trait;
14use conf::{
15    DEFAULT_DOWNLOAD_REDUNDANCY, DEFAULT_EXPIRY_TIME, DEFAULT_IDLE_INTERVAL,
16};
17use dusk_consensus::config::MAX_BLOCK_SIZE;
18use dusk_consensus::errors::BlobError;
19use dusk_core::TxPreconditionError;
20use dusk_core::stake::STAKE_CONTRACT;
21use dusk_core::transfer::TRANSFER_CONTRACT;
22use node_data::events::{Event, TransactionEvent};
23use node_data::get_current_timestamp;
24use node_data::ledger::{Header, SpendingId, Transaction};
25use node_data::message::{AsyncQueue, Payload, Topics, payload};
26use rkyv::Infallible;
27use rkyv::ser::Serializer;
28use rkyv::ser::serializers::{
29    BufferScratch, BufferSerializer, BufferSerializerError,
30    CompositeSerializer, CompositeSerializerError,
31};
32use thiserror::Error;
33use tokio::sync::RwLock;
34use tokio::sync::mpsc::Sender;
35use tracing::{error, info, warn};
36
37use crate::database::{Ledger, Mempool};
38use crate::mempool::conf::Params;
39use crate::vm::PreverificationResult;
40use crate::{LongLivedService, Message, Network, database, vm};
41
42const TOPICS: &[u8] = &[Topics::Tx as u8];
43
44#[derive(Debug, Error)]
45pub enum TxAcceptanceError {
46    #[error("this transaction exists in the mempool")]
47    AlreadyExistsInMempool,
48    #[error("this transaction exists in the ledger")]
49    AlreadyExistsInLedger,
50    #[error("Transaction blob id {} is missing sidecar", hex::encode(.0))]
51    BlobMissingSidecar([u8; 32]),
52    #[error("No blobs provided")]
53    BlobEmpty,
54    #[error("Transaction has too many blobs: {0}")]
55    BlobTooMany(usize),
56    #[error("Invalid blob: {0}")]
57    BlobInvalid(String),
58    #[error("this transaction's spendId exists in the mempool")]
59    SpendIdExistsInMempool,
60    #[error("this transaction is invalid {0}")]
61    VerificationFailed(String),
62    #[error("gas price lower than minimum {0}")]
63    GasPriceTooLow(u64),
64    #[error("gas limit lower than minimum {0}")]
65    GasLimitTooLow(u64),
66    #[error("Maximum count of transactions exceeded {0}")]
67    MaxTxnCountExceeded(usize),
68    #[error("this transaction is too large to be serialized")]
69    TooLarge,
70    #[error("Maximum transaction size exceeded {0}")]
71    MaxSizeExceeded(usize),
72    #[error("A generic error occurred {0}")]
73    Generic(anyhow::Error),
74}
75
76impl From<anyhow::Error> for TxAcceptanceError {
77    fn from(err: anyhow::Error) -> Self {
78        Self::Generic(err)
79    }
80}
81
82impl From<BlobError> for TxAcceptanceError {
83    fn from(err: BlobError) -> Self {
84        match err {
85            BlobError::MissingSidecar(id) => {
86                TxAcceptanceError::BlobMissingSidecar(id)
87            }
88            BlobError::BlobEmpty => TxAcceptanceError::BlobEmpty,
89            BlobError::BlobTooMany(n) => TxAcceptanceError::BlobTooMany(n),
90            BlobError::BlobInvalid(msg) => TxAcceptanceError::BlobInvalid(msg),
91        }
92    }
93}
94
95impl From<TxPreconditionError> for TxAcceptanceError {
96    fn from(err: TxPreconditionError) -> Self {
97        match err {
98            TxPreconditionError::BlobLowLimit(min) => {
99                TxAcceptanceError::GasLimitTooLow(min)
100            }
101            TxPreconditionError::DeployLowLimit(min) => {
102                TxAcceptanceError::GasLimitTooLow(min)
103            }
104            TxPreconditionError::DeployLowPrice(min) => {
105                TxAcceptanceError::GasPriceTooLow(min)
106            }
107            TxPreconditionError::BlobEmpty => TxAcceptanceError::BlobEmpty,
108            TxPreconditionError::BlobTooMany(n) => {
109                TxAcceptanceError::BlobTooMany(n)
110            }
111            TxPreconditionError::PhoenixFeeOverflow => {
112                TxAcceptanceError::VerificationFailed(
113                    "phoenix fee overflow".into(),
114                )
115            }
116            TxPreconditionError::PhoenixFeeTampered => {
117                TxAcceptanceError::VerificationFailed(
118                    "phoenix fee tampered".into(),
119                )
120            }
121            TxPreconditionError::PhoenixFeeRefundMismatch => {
122                TxAcceptanceError::VerificationFailed(
123                    "phoenix fee refund stealth address mismatch".into(),
124                )
125            }
126        }
127    }
128}
129
130pub struct MempoolSrv {
131    inbound: AsyncQueue<Message>,
132    conf: Params,
133    /// Sender channel for sending out RUES events
134    event_sender: Sender<Event>,
135}
136
137impl MempoolSrv {
138    pub fn new(conf: Params, event_sender: Sender<Event>) -> Self {
139        info!("MempoolSrv::new with conf {}", conf);
140        Self {
141            inbound: AsyncQueue::bounded(
142                conf.max_queue_size,
143                "mempool_inbound",
144            ),
145            conf,
146            event_sender,
147        }
148    }
149}
150
151#[async_trait]
152impl<N: Network, DB: database::DB, VM: vm::VMExecution>
153    LongLivedService<N, DB, VM> for MempoolSrv
154{
155    async fn execute(
156        &mut self,
157        network: Arc<RwLock<N>>,
158        db: Arc<RwLock<DB>>,
159        vm: Arc<RwLock<VM>>,
160    ) -> anyhow::Result<usize> {
161        LongLivedService::<N, DB, VM>::add_routes(
162            self,
163            TOPICS,
164            self.inbound.clone(),
165            &network,
166        )
167        .await?;
168
169        // Request mempool update from N alive peers
170        self.request_mempool(&network).await;
171
172        let idle_interval =
173            self.conf.idle_interval.unwrap_or(DEFAULT_IDLE_INTERVAL);
174
175        let mempool_expiry = self
176            .conf
177            .mempool_expiry
178            .unwrap_or(DEFAULT_EXPIRY_TIME)
179            .as_secs();
180
181        // Mempool service loop
182        let mut on_idle_event = tokio::time::interval(idle_interval);
183        loop {
184            tokio::select! {
185                biased;
186                _ = on_idle_event.tick() => {
187                    info!(event = "mempool_idle", interval = ?idle_interval);
188
189                    let expiration_time = get_current_timestamp()
190                        .checked_sub(mempool_expiry)
191                        .expect("valid duration");
192
193                    // Remove expired transactions from the mempool
194                    db.read().await.update(|db| {
195                        let expired_txs = db.mempool_expired_txs(expiration_time).unwrap_or_else(|e| {
196                            error!("cannot get expired txs: {e}");
197                            vec![]
198                        });
199                        for tx_id in expired_txs {
200                            info!(event = "expired_tx", hash = hex::encode(tx_id));
201                            let deleted_txs = db.delete_mempool_tx(tx_id, true).unwrap_or_else(|e| {
202                                error!("cannot delete expired tx: {e}");
203                                vec![]
204                            });
205                            for deleted_tx_id in deleted_txs{
206                                let event = TransactionEvent::Removed(deleted_tx_id);
207                                info!(event = "mempool_deleted", hash = hex::encode(deleted_tx_id));
208                                if let Err(e) = self.event_sender.try_send(event.into()) {
209                                    warn!("cannot notify mempool removed transaction {e}")
210                                };
211                            }
212                        }
213                        Ok(())
214                    })?;
215
216                },
217                msg = self.inbound.recv() => {
218                    if let Ok(msg) = msg {
219                        match &msg.payload {
220                            Payload::Transaction(tx) => {
221                                let accept = self.accept_tx(&db, &vm, tx);
222                                if let Err(e) = accept.await {
223                                    error!("Tx {} not accepted: {e}", hex::encode(tx.id()));
224                                    continue;
225                                }
226
227                                let network = network.read().await;
228                                if let Err(e) = network.broadcast(&msg).await {
229                                    warn!("Unable to broadcast accepted tx: {e}")
230                                };
231                            }
232                            _ => error!("invalid inbound message payload"),
233                        }
234                    }
235                }
236            }
237        }
238    }
239
240    /// Returns service name.
241    fn name(&self) -> &'static str {
242        "mempool"
243    }
244}
245
246impl MempoolSrv {
247    async fn accept_tx<DB: database::DB, VM: vm::VMExecution>(
248        &mut self,
249        db: &Arc<RwLock<DB>>,
250        vm: &Arc<RwLock<VM>>,
251        tx: &Transaction,
252    ) -> Result<(), TxAcceptanceError> {
253        let max_mempool_txn_count = self.conf.max_mempool_txn_count;
254
255        let events =
256            MempoolSrv::check_tx(db, vm, tx, false, max_mempool_txn_count)
257                .await?;
258
259        tracing::info!(
260            event = "transaction accepted",
261            hash = hex::encode(tx.id())
262        );
263
264        for tx_event in events {
265            let node_event = tx_event.into();
266            if let Err(e) = self.event_sender.try_send(node_event) {
267                warn!("cannot notify mempool accepted transaction {e}")
268            };
269        }
270
271        Ok(())
272    }
273
274    pub async fn check_tx<'t, DB: database::DB, VM: vm::VMExecution>(
275        db: &Arc<RwLock<DB>>,
276        vm: &Arc<RwLock<VM>>,
277        tx: &'t Transaction,
278        dry_run: bool,
279        max_mempool_txn_count: usize,
280    ) -> Result<Vec<TransactionEvent<'t>>, TxAcceptanceError> {
281        let tx_id = tx.id();
282        let tx_size = tx.size();
283
284        // We consider the maximum transaction size as the total avilable block
285        // space minus the minimum size of the block header (i.e., the size of
286        // the header in a first-iteration block with no faults)
287        let min_header_size = Header::default().size();
288        let max_tx_size = MAX_BLOCK_SIZE - min_header_size;
289        if tx_size > max_tx_size {
290            return Err(TxAcceptanceError::MaxSizeExceeded(tx_size));
291        }
292
293        check_tx_serialization(&tx.inner)?;
294
295        if tx.gas_price() < 1 {
296            return Err(TxAcceptanceError::GasPriceTooLow(1));
297        }
298
299        let tip_height = db
300            .read()
301            .await
302            .view(|db| db.latest_block())
303            .map_err(|e| {
304                anyhow!("Cannot get tip block height from the database: {e}")
305            })?
306            .header
307            .height;
308
309        {
310            // Mimic the VM's additional checks for transactions
311            let vm = vm.read().await;
312
313            let disable_wasm_32 = vm.wasm32_disabled(tip_height);
314            let disable_wasm_64 = vm.wasm64_disabled(tip_height);
315            let disable_3rd_party = vm.third_party_disabled(tip_height);
316
317            if let Some(_contract_deploy) = tx.inner.deploy() {
318                match (disable_wasm_32, disable_wasm_64) {
319                    (true, true) => {
320                        Err(TxAcceptanceError::Generic(anyhow::anyhow!(
321                            "contract deployment is not enabled in the VM"
322                        )))
323                    }
324                    // TODO: We should selectively check both config in the
325                    // future
326                    _ => Ok(()),
327                }?
328            }
329
330            if disable_3rd_party {
331                if let Some(call) = tx.inner.call() {
332                    if call.contract != TRANSFER_CONTRACT
333                        && call.contract != STAKE_CONTRACT
334                    {
335                        Err(TxAcceptanceError::Generic(anyhow::anyhow!(
336                            "3rd party contracts are not enabled in the VM"
337                        )))?;
338                    }
339                }
340            }
341
342            tx.inner.phoenix_fee_check()?;
343
344            if vm.phoenix_refund_check_active(tip_height) {
345                tx.inner.phoenix_refund_check()?;
346            }
347
348            // Check deployment tx
349            if tx.inner.deploy().is_some() {
350                let min_deployment_gas_price = vm.min_deployment_gas_price();
351                let gas_per_deploy_byte = vm.gas_per_deploy_byte();
352                let min_deploy_points = vm.min_deploy_points();
353                tx.inner.deploy_check(
354                    gas_per_deploy_byte,
355                    min_deployment_gas_price,
356                    min_deploy_points,
357                )?;
358            }
359
360            // Check blob tx
361            if tx.inner.blob().is_some() {
362                if !vm.blob_active(tip_height) {
363                    return Err(TxAcceptanceError::Generic(anyhow::anyhow!(
364                        "blobs are not enabled in the VM"
365                    )));
366                }
367
368                let gas_per_blob = vm.gas_per_blob();
369                tx.inner.blob_check(gas_per_blob)?;
370                dusk_consensus::validate_blob_sidecars(tx)?;
371            }
372
373            // Check global minimum gas limit
374            let min_gas_limit = vm.min_gas_limit();
375            if tx.inner.gas_limit() < min_gas_limit {
376                return Err(TxAcceptanceError::GasLimitTooLow(min_gas_limit));
377            }
378        }
379
380        // Perform basic checks on the transaction
381        let tx_to_delete = db.read().await.view(|view| {
382            // ensure transaction does not exist in the mempool
383            if view.mempool_tx_exists(tx_id)? {
384                return Err(TxAcceptanceError::AlreadyExistsInMempool);
385            }
386
387            // ensure transaction does not exist in the blockchain
388            if view.ledger_tx_exists(&tx_id)? {
389                return Err(TxAcceptanceError::AlreadyExistsInLedger);
390            }
391
392            let txs_count = view.mempool_txs_count();
393            if txs_count >= max_mempool_txn_count {
394                // Get the lowest fee transaction to delete
395                let (lowest_price, to_delete) = view
396                    .mempool_txs_ids_sorted_by_low_fee()
397                    .next()
398                    .ok_or(anyhow::anyhow!("Cannot get lowest fee tx"))?;
399
400                if tx.gas_price() < lowest_price {
401                    // Or error if the gas price proposed is the lowest of all
402                    // the transactions in the mempool
403                    Err(TxAcceptanceError::MaxTxnCountExceeded(
404                        max_mempool_txn_count,
405                    ))
406                } else {
407                    Ok(Some(to_delete))
408                }
409            } else {
410                Ok(None)
411            }
412        })?;
413
414        // VM Preverify call
415        let preverification_data =
416            vm.read().await.preverify(tx, tip_height).map_err(|e| {
417                TxAcceptanceError::VerificationFailed(format!("{e}"))
418            })?;
419
420        if let PreverificationResult::FutureNonce {
421            account,
422            state,
423            nonce_used,
424        } = preverification_data
425        {
426            db.read().await.view(|db| {
427                for nonce in state.nonce + 1..nonce_used {
428                    let spending_id = SpendingId::AccountNonce(account, nonce);
429                    if db
430                        .mempool_txs_by_spendable_ids(&[spending_id])
431                        .is_empty()
432                    {
433                        return Err(TxAcceptanceError::VerificationFailed(
434                            format!("Missing intermediate nonce {nonce}"),
435                        ));
436                    }
437                }
438                Ok(())
439            })?;
440        }
441
442        let mut events = vec![];
443
444        // Try to add the transaction to the mempool
445        db.read().await.update_dry_run(dry_run, |db| {
446            let spend_ids = tx.to_spend_ids();
447
448            let mut replaced = false;
449            // ensure spend_ids do not exist in the mempool
450            for m_tx_id in db.mempool_txs_by_spendable_ids(&spend_ids) {
451                if let Some(m_tx) = db.mempool_tx(m_tx_id)? {
452                    // If the transaction spendingId is already in the mempool
453                    // (same nonce or same nullifier), we check if it can be
454                    // replaced by the new transaction based on gas price and
455                    // gas limit.
456                    // If the new transaction has a higher gas price or the same
457                    // gas price but a higher gas limit, we replace the old
458                    // transaction with the new one.
459                    if m_tx.inner.gas_price() < tx.inner.gas_price()
460                        || (m_tx.inner.gas_price() == tx.inner.gas_price()
461                            && m_tx.inner.gas_limit() < tx.inner.gas_limit())
462                    {
463                        for deleted in db.delete_mempool_tx(m_tx_id, false)? {
464                            events.push(TransactionEvent::Removed(deleted));
465                            replaced = true;
466                        }
467                    } else {
468                        return Err(
469                            TxAcceptanceError::SpendIdExistsInMempool.into()
470                        );
471                    }
472                }
473            }
474
475            events.push(TransactionEvent::Included(tx));
476
477            if !replaced {
478                if let Some(to_delete) = tx_to_delete {
479                    for deleted in db.delete_mempool_tx(to_delete, true)? {
480                        events.push(TransactionEvent::Removed(deleted));
481                    }
482                }
483            }
484            // Persist transaction in mempool storage
485
486            let now = get_current_timestamp();
487
488            db.store_mempool_tx(tx, now)
489        })?;
490        Ok(events)
491    }
492
493    /// Requests full mempool data from N alive peers
494    ///
495    /// Message flow:
496    /// GetMempool -> Inv -> GetResource -> Tx
497    async fn request_mempool<N: Network>(&self, network: &Arc<RwLock<N>>) {
498        const WAIT_TIMEOUT: Duration = Duration::from_secs(5);
499        let max_peers = self
500            .conf
501            .mempool_download_redundancy
502            .unwrap_or(DEFAULT_DOWNLOAD_REDUNDANCY);
503
504        let net = network.read().await;
505        net.wait_for_alive_nodes(max_peers, WAIT_TIMEOUT).await;
506
507        let msg = payload::GetMempool::default().into();
508        if let Err(err) = net.send_to_alive_peers(msg, max_peers).await {
509            error!("could not request mempool from network: {err}");
510        }
511    }
512}
513
514fn check_tx_serialization(
515    tx: &dusk_core::transfer::Transaction,
516) -> Result<(), TxAcceptanceError> {
517    // The transaction is an argument to the transfer contract, so
518    // its serialized size has to be within the same 64Kib limit.
519    const SCRATCH_BUF_BYTES: usize = 1024;
520    const ARGBUF_LEN: usize = 64 * 1024;
521    let stripped_tx = tx.strip_off_bytecode().or(tx.blob_to_memo());
522    let mut sbuf = [0u8; SCRATCH_BUF_BYTES];
523    let mut buffer = [0u8; ARGBUF_LEN];
524    let scratch = BufferScratch::new(&mut sbuf);
525    let ser = BufferSerializer::new(&mut buffer);
526    let mut ser = CompositeSerializer::new(ser, scratch, Infallible);
527    if let Err(err) = ser.serialize_value(stripped_tx.as_ref().unwrap_or(tx)) {
528        match err {
529            CompositeSerializerError::SerializerError(err) => match err {
530                BufferSerializerError::Overflow { .. } => {
531                    return Err(TxAcceptanceError::TooLarge);
532                }
533            },
534            err => return Err(TxAcceptanceError::Generic(anyhow!("{err}"))),
535        }
536    }
537    Ok(())
538}
539
540#[cfg(test)]
541mod tests {
542    use dusk_core::signatures::bls::{PublicKey, SecretKey};
543    use rand::Rng;
544    use rand::rngs::StdRng;
545    use rand::{CryptoRng, RngCore, SeedableRng};
546    use wallet_core::transaction::moonlight_deployment;
547
548    use super::*;
549
550    fn new_moonlight_deploy_tx<R: RngCore + CryptoRng>(
551        rng: &mut R,
552        bytecode: Vec<u8>,
553        init_args: Vec<u8>,
554    ) -> dusk_core::transfer::Transaction {
555        const CHAIN_ID: u8 = 0xfa;
556        let sk = SecretKey::random(rng);
557        let pk = PublicKey::from(&SecretKey::random(rng));
558
559        let gas_limit: u64 = rng.r#gen();
560        let gas_price: u64 = rng.r#gen();
561        let nonce: u64 = rng.r#gen();
562        let deploy_nonce: u64 = rng.r#gen();
563
564        moonlight_deployment(
565            &sk,
566            bytecode,
567            &pk,
568            init_args,
569            gas_limit,
570            gas_price,
571            nonce,
572            deploy_nonce,
573            CHAIN_ID,
574        )
575        .expect("should create a transaction")
576    }
577
578    const MAX_MOONLIGHT_ARG_SIZE: usize = 64 * 1024 - 2320;
579
580    #[test]
581    fn test_tx_serialization_check_normal() {
582        let mut rng = StdRng::seed_from_u64(42);
583        let tx = new_moonlight_deploy_tx(
584            &mut rng,
585            vec![0; 64 * 1024],
586            vec![0; MAX_MOONLIGHT_ARG_SIZE],
587        );
588        let result = check_tx_serialization(&tx);
589        assert!(matches!(result, Ok(())));
590    }
591
592    #[test]
593    fn test_tx_serialization_check_tx_too_large() {
594        let mut rng = StdRng::seed_from_u64(42);
595        let tx = new_moonlight_deploy_tx(
596            &mut rng,
597            vec![0; 64 * 1024],
598            vec![0; MAX_MOONLIGHT_ARG_SIZE + 1],
599        );
600        let result = check_tx_serialization(&tx);
601        assert!(matches!(result, Err(TxAcceptanceError::TooLarge)));
602    }
603}