dusk_node/
mempool.rs

1// This Source Code Form is subject to the terms of the Mozilla Public
2// License, v. 2.0. If a copy of the MPL was not distributed with this
3// file, You can obtain one at http://mozilla.org/MPL/2.0/.
4//
5// Copyright (c) DUSK NETWORK. All rights reserved.
6
7pub mod conf;
8
9use std::sync::Arc;
10use std::time::Duration;
11
12use anyhow::anyhow;
13use async_trait::async_trait;
14use conf::{
15    DEFAULT_DOWNLOAD_REDUNDANCY, DEFAULT_EXPIRY_TIME, DEFAULT_IDLE_INTERVAL,
16};
17use dusk_consensus::config::MAX_BLOCK_SIZE;
18use dusk_consensus::errors::BlobError;
19use dusk_core::stake::STAKE_CONTRACT;
20use dusk_core::transfer::TRANSFER_CONTRACT;
21use dusk_core::TxPreconditionError;
22use node_data::events::{Event, TransactionEvent};
23use node_data::get_current_timestamp;
24use node_data::ledger::{Header, SpendingId, Transaction};
25use node_data::message::{payload, AsyncQueue, Payload, Topics};
26use rkyv::ser::serializers::{
27    BufferScratch, BufferSerializer, BufferSerializerError,
28    CompositeSerializer, CompositeSerializerError,
29};
30use rkyv::ser::Serializer;
31use rkyv::Infallible;
32use thiserror::Error;
33use tokio::sync::mpsc::Sender;
34use tokio::sync::RwLock;
35use tracing::{error, info, warn};
36
37use crate::database::{Ledger, Mempool};
38use crate::mempool::conf::Params;
39use crate::vm::PreverificationResult;
40use crate::{database, vm, LongLivedService, Message, Network};
41
42const TOPICS: &[u8] = &[Topics::Tx as u8];
43
44#[derive(Debug, Error)]
45pub enum TxAcceptanceError {
46    #[error("this transaction exists in the mempool")]
47    AlreadyExistsInMempool,
48    #[error("this transaction exists in the ledger")]
49    AlreadyExistsInLedger,
50    #[error("Transaction blob id {} is missing sidecar", hex::encode(.0))]
51    BlobMissingSidecar([u8; 32]),
52    #[error("No blobs provided")]
53    BlobEmpty,
54    #[error("Transaction has too many blobs: {0}")]
55    BlobTooMany(usize),
56    #[error("Invalid blob: {0}")]
57    BlobInvalid(String),
58    #[error("this transaction's spendId exists in the mempool")]
59    SpendIdExistsInMempool,
60    #[error("this transaction is invalid {0}")]
61    VerificationFailed(String),
62    #[error("gas price lower than minimum {0}")]
63    GasPriceTooLow(u64),
64    #[error("gas limit lower than minimum {0}")]
65    GasLimitTooLow(u64),
66    #[error("Maximum count of transactions exceeded {0}")]
67    MaxTxnCountExceeded(usize),
68    #[error("this transaction is too large to be serialized")]
69    TooLarge,
70    #[error("Maximum transaction size exceeded {0}")]
71    MaxSizeExceeded(usize),
72    #[error("A generic error occurred {0}")]
73    Generic(anyhow::Error),
74}
75
76impl From<anyhow::Error> for TxAcceptanceError {
77    fn from(err: anyhow::Error) -> Self {
78        Self::Generic(err)
79    }
80}
81
82impl From<BlobError> for TxAcceptanceError {
83    fn from(err: BlobError) -> Self {
84        match err {
85            BlobError::MissingSidecar(id) => {
86                TxAcceptanceError::BlobMissingSidecar(id)
87            }
88            BlobError::BlobEmpty => TxAcceptanceError::BlobEmpty,
89            BlobError::BlobTooMany(n) => TxAcceptanceError::BlobTooMany(n),
90            BlobError::BlobInvalid(msg) => TxAcceptanceError::BlobInvalid(msg),
91        }
92    }
93}
94
95impl From<TxPreconditionError> for TxAcceptanceError {
96    fn from(err: TxPreconditionError) -> Self {
97        match err {
98            TxPreconditionError::BlobLowLimit(min) => {
99                TxAcceptanceError::GasLimitTooLow(min)
100            }
101            TxPreconditionError::DeployLowLimit(min) => {
102                TxAcceptanceError::GasLimitTooLow(min)
103            }
104            TxPreconditionError::DeployLowPrice(min) => {
105                TxAcceptanceError::GasPriceTooLow(min)
106            }
107            TxPreconditionError::BlobEmpty => TxAcceptanceError::BlobEmpty,
108            TxPreconditionError::BlobTooMany(n) => {
109                TxAcceptanceError::BlobTooMany(n)
110            }
111        }
112    }
113}
114
115pub struct MempoolSrv {
116    inbound: AsyncQueue<Message>,
117    conf: Params,
118    /// Sender channel for sending out RUES events
119    event_sender: Sender<Event>,
120}
121
122impl MempoolSrv {
123    pub fn new(conf: Params, event_sender: Sender<Event>) -> Self {
124        info!("MempoolSrv::new with conf {}", conf);
125        Self {
126            inbound: AsyncQueue::bounded(
127                conf.max_queue_size,
128                "mempool_inbound",
129            ),
130            conf,
131            event_sender,
132        }
133    }
134}
135
136#[async_trait]
137impl<N: Network, DB: database::DB, VM: vm::VMExecution>
138    LongLivedService<N, DB, VM> for MempoolSrv
139{
140    async fn execute(
141        &mut self,
142        network: Arc<RwLock<N>>,
143        db: Arc<RwLock<DB>>,
144        vm: Arc<RwLock<VM>>,
145    ) -> anyhow::Result<usize> {
146        LongLivedService::<N, DB, VM>::add_routes(
147            self,
148            TOPICS,
149            self.inbound.clone(),
150            &network,
151        )
152        .await?;
153
154        // Request mempool update from N alive peers
155        self.request_mempool(&network).await;
156
157        let idle_interval =
158            self.conf.idle_interval.unwrap_or(DEFAULT_IDLE_INTERVAL);
159
160        let mempool_expiry = self
161            .conf
162            .mempool_expiry
163            .unwrap_or(DEFAULT_EXPIRY_TIME)
164            .as_secs();
165
166        // Mempool service loop
167        let mut on_idle_event = tokio::time::interval(idle_interval);
168        loop {
169            tokio::select! {
170                biased;
171                _ = on_idle_event.tick() => {
172                    info!(event = "mempool_idle", interval = ?idle_interval);
173
174                    let expiration_time = get_current_timestamp()
175                        .checked_sub(mempool_expiry)
176                        .expect("valid duration");
177
178                    // Remove expired transactions from the mempool
179                    db.read().await.update(|db| {
180                        let expired_txs = db.mempool_expired_txs(expiration_time).unwrap_or_else(|e| {
181                            error!("cannot get expired txs: {e}");
182                            vec![]
183                        });
184                        for tx_id in expired_txs {
185                            info!(event = "expired_tx", hash = hex::encode(tx_id));
186                            let deleted_txs = db.delete_mempool_tx(tx_id, true).unwrap_or_else(|e| {
187                                error!("cannot delete expired tx: {e}");
188                                vec![]
189                            });
190                            for deleted_tx_id in deleted_txs{
191                                let event = TransactionEvent::Removed(deleted_tx_id);
192                                info!(event = "mempool_deleted", hash = hex::encode(deleted_tx_id));
193                                if let Err(e) = self.event_sender.try_send(event.into()) {
194                                    warn!("cannot notify mempool removed transaction {e}")
195                                };
196                            }
197                        }
198                        Ok(())
199                    })?;
200
201                },
202                msg = self.inbound.recv() => {
203                    if let Ok(msg) = msg {
204                        match &msg.payload {
205                            Payload::Transaction(tx) => {
206                                let accept = self.accept_tx(&db, &vm, tx);
207                                if let Err(e) = accept.await {
208                                    error!("Tx {} not accepted: {e}", hex::encode(tx.id()));
209                                    continue;
210                                }
211
212                                let network = network.read().await;
213                                if let Err(e) = network.broadcast(&msg).await {
214                                    warn!("Unable to broadcast accepted tx: {e}")
215                                };
216                            }
217                            _ => error!("invalid inbound message payload"),
218                        }
219                    }
220                }
221            }
222        }
223    }
224
225    /// Returns service name.
226    fn name(&self) -> &'static str {
227        "mempool"
228    }
229}
230
231impl MempoolSrv {
232    async fn accept_tx<DB: database::DB, VM: vm::VMExecution>(
233        &mut self,
234        db: &Arc<RwLock<DB>>,
235        vm: &Arc<RwLock<VM>>,
236        tx: &Transaction,
237    ) -> Result<(), TxAcceptanceError> {
238        let max_mempool_txn_count = self.conf.max_mempool_txn_count;
239
240        let events =
241            MempoolSrv::check_tx(db, vm, tx, false, max_mempool_txn_count)
242                .await?;
243
244        tracing::info!(
245            event = "transaction accepted",
246            hash = hex::encode(tx.id())
247        );
248
249        for tx_event in events {
250            let node_event = tx_event.into();
251            if let Err(e) = self.event_sender.try_send(node_event) {
252                warn!("cannot notify mempool accepted transaction {e}")
253            };
254        }
255
256        Ok(())
257    }
258
259    pub async fn check_tx<'t, DB: database::DB, VM: vm::VMExecution>(
260        db: &Arc<RwLock<DB>>,
261        vm: &Arc<RwLock<VM>>,
262        tx: &'t Transaction,
263        dry_run: bool,
264        max_mempool_txn_count: usize,
265    ) -> Result<Vec<TransactionEvent<'t>>, TxAcceptanceError> {
266        let tx_id = tx.id();
267        let tx_size = tx.size();
268
269        // We consider the maximum transaction size as the total avilable block
270        // space minus the minimum size of the block header (i.e., the size of
271        // the header in a first-iteration block with no faults)
272        let min_header_size = Header::default().size();
273        let max_tx_size = MAX_BLOCK_SIZE - min_header_size;
274        if tx_size > max_tx_size {
275            return Err(TxAcceptanceError::MaxSizeExceeded(tx_size));
276        }
277
278        check_tx_serialization(&tx.inner)?;
279
280        if tx.gas_price() < 1 {
281            return Err(TxAcceptanceError::GasPriceTooLow(1));
282        }
283
284        let tip_height = db
285            .read()
286            .await
287            .view(|db| db.latest_block())
288            .map_err(|e| {
289                anyhow!("Cannot get tip block height from the database: {e}")
290            })?
291            .header
292            .height;
293
294        {
295            // Mimic the VM's additional checks for transactions
296            let vm = vm.read().await;
297
298            let disable_wasm_32 = vm.wasm32_disabled(tip_height);
299            let disable_wasm_64 = vm.wasm64_disabled(tip_height);
300            let disable_3rd_party = vm.third_party_disabled(tip_height);
301
302            if let Some(_contract_deploy) = tx.inner.deploy() {
303                match (disable_wasm_32, disable_wasm_64) {
304                    (true, true) => {
305                        Err(TxAcceptanceError::Generic(anyhow::anyhow!(
306                            "contract deployment is not enabled in the VM"
307                        )))
308                    }
309                    // TODO: We should selectively check both config in the
310                    // future
311                    _ => Ok(()),
312                }?
313            }
314
315            if disable_3rd_party {
316                if let Some(call) = tx.inner.call() {
317                    if call.contract != TRANSFER_CONTRACT
318                        && call.contract != STAKE_CONTRACT
319                    {
320                        Err(TxAcceptanceError::Generic(anyhow::anyhow!(
321                            "3rd party contracts are not enabled in the VM"
322                        )))?;
323                    }
324                }
325            }
326
327            // Check deployment tx
328            if tx.inner.deploy().is_some() {
329                let min_deployment_gas_price = vm.min_deployment_gas_price();
330                let gas_per_deploy_byte = vm.gas_per_deploy_byte();
331                let min_deploy_points = vm.min_deploy_points();
332                tx.inner.deploy_check(
333                    gas_per_deploy_byte,
334                    min_deployment_gas_price,
335                    min_deploy_points,
336                )?;
337            }
338
339            // Check blob tx
340            if tx.inner.blob().is_some() {
341                if !vm.blob_active(tip_height) {
342                    return Err(TxAcceptanceError::Generic(anyhow::anyhow!(
343                        "blobs are not enabled in the VM"
344                    )));
345                }
346
347                let gas_per_blob = vm.gas_per_blob();
348                tx.inner.blob_check(gas_per_blob)?;
349                dusk_consensus::validate_blob_sidecars(tx)?;
350            }
351
352            // Check global minimum gas limit
353            let min_gas_limit = vm.min_gas_limit();
354            if tx.inner.gas_limit() < min_gas_limit {
355                return Err(TxAcceptanceError::GasLimitTooLow(min_gas_limit));
356            }
357        }
358
359        // Perform basic checks on the transaction
360        let tx_to_delete = db.read().await.view(|view| {
361            // ensure transaction does not exist in the mempool
362            if view.mempool_tx_exists(tx_id)? {
363                return Err(TxAcceptanceError::AlreadyExistsInMempool);
364            }
365
366            // ensure transaction does not exist in the blockchain
367            if view.ledger_tx_exists(&tx_id)? {
368                return Err(TxAcceptanceError::AlreadyExistsInLedger);
369            }
370
371            let txs_count = view.mempool_txs_count();
372            if txs_count >= max_mempool_txn_count {
373                // Get the lowest fee transaction to delete
374                let (lowest_price, to_delete) = view
375                    .mempool_txs_ids_sorted_by_low_fee()
376                    .next()
377                    .ok_or(anyhow::anyhow!("Cannot get lowest fee tx"))?;
378
379                if tx.gas_price() < lowest_price {
380                    // Or error if the gas price proposed is the lowest of all
381                    // the transactions in the mempool
382                    Err(TxAcceptanceError::MaxTxnCountExceeded(
383                        max_mempool_txn_count,
384                    ))
385                } else {
386                    Ok(Some(to_delete))
387                }
388            } else {
389                Ok(None)
390            }
391        })?;
392
393        // VM Preverify call
394        let preverification_data =
395            vm.read().await.preverify(tx).map_err(|e| {
396                TxAcceptanceError::VerificationFailed(format!("{e}"))
397            })?;
398
399        if let PreverificationResult::FutureNonce {
400            account,
401            state,
402            nonce_used,
403        } = preverification_data
404        {
405            db.read().await.view(|db| {
406                for nonce in state.nonce + 1..nonce_used {
407                    let spending_id = SpendingId::AccountNonce(account, nonce);
408                    if db
409                        .mempool_txs_by_spendable_ids(&[spending_id])
410                        .is_empty()
411                    {
412                        return Err(TxAcceptanceError::VerificationFailed(
413                            format!("Missing intermediate nonce {nonce}"),
414                        ));
415                    }
416                }
417                Ok(())
418            })?;
419        }
420
421        let mut events = vec![];
422
423        // Try to add the transaction to the mempool
424        db.read().await.update_dry_run(dry_run, |db| {
425            let spend_ids = tx.to_spend_ids();
426
427            let mut replaced = false;
428            // ensure spend_ids do not exist in the mempool
429            for m_tx_id in db.mempool_txs_by_spendable_ids(&spend_ids) {
430                if let Some(m_tx) = db.mempool_tx(m_tx_id)? {
431                    // If the transaction spendingId is already in the mempool
432                    // (same nonce or same nullifier), we check if it can be
433                    // replaced by the new transaction based on gas price and
434                    // gas limit.
435                    // If the new transaction has a higher gas price or the same
436                    // gas price but a higher gas limit, we replace the old
437                    // transaction with the new one.
438                    if m_tx.inner.gas_price() < tx.inner.gas_price()
439                        || (m_tx.inner.gas_price() == tx.inner.gas_price()
440                            && m_tx.inner.gas_limit() < tx.inner.gas_limit())
441                    {
442                        for deleted in db.delete_mempool_tx(m_tx_id, false)? {
443                            events.push(TransactionEvent::Removed(deleted));
444                            replaced = true;
445                        }
446                    } else {
447                        return Err(
448                            TxAcceptanceError::SpendIdExistsInMempool.into()
449                        );
450                    }
451                }
452            }
453
454            events.push(TransactionEvent::Included(tx));
455
456            if !replaced {
457                if let Some(to_delete) = tx_to_delete {
458                    for deleted in db.delete_mempool_tx(to_delete, true)? {
459                        events.push(TransactionEvent::Removed(deleted));
460                    }
461                }
462            }
463            // Persist transaction in mempool storage
464
465            let now = get_current_timestamp();
466
467            db.store_mempool_tx(tx, now)
468        })?;
469        Ok(events)
470    }
471
472    /// Requests full mempool data from N alive peers
473    ///
474    /// Message flow:
475    /// GetMempool -> Inv -> GetResource -> Tx
476    async fn request_mempool<N: Network>(&self, network: &Arc<RwLock<N>>) {
477        const WAIT_TIMEOUT: Duration = Duration::from_secs(5);
478        let max_peers = self
479            .conf
480            .mempool_download_redundancy
481            .unwrap_or(DEFAULT_DOWNLOAD_REDUNDANCY);
482
483        let net = network.read().await;
484        net.wait_for_alive_nodes(max_peers, WAIT_TIMEOUT).await;
485
486        let msg = payload::GetMempool::default().into();
487        if let Err(err) = net.send_to_alive_peers(msg, max_peers).await {
488            error!("could not request mempool from network: {err}");
489        }
490    }
491}
492
493fn check_tx_serialization(
494    tx: &dusk_core::transfer::Transaction,
495) -> Result<(), TxAcceptanceError> {
496    // The transaction is an argument to the transfer contract, so
497    // its serialized size has to be within the same 64Kib limit.
498    const SCRATCH_BUF_BYTES: usize = 1024;
499    const ARGBUF_LEN: usize = 64 * 1024;
500    let stripped_tx = tx.strip_off_bytecode().or(tx.blob_to_memo());
501    let mut sbuf = [0u8; SCRATCH_BUF_BYTES];
502    let mut buffer = [0u8; ARGBUF_LEN];
503    let scratch = BufferScratch::new(&mut sbuf);
504    let ser = BufferSerializer::new(&mut buffer);
505    let mut ser = CompositeSerializer::new(ser, scratch, Infallible);
506    if let Err(err) = ser.serialize_value(stripped_tx.as_ref().unwrap_or(tx)) {
507        match err {
508            CompositeSerializerError::SerializerError(err) => match err {
509                BufferSerializerError::Overflow { .. } => {
510                    return Err(TxAcceptanceError::TooLarge);
511                }
512            },
513            err => return Err(TxAcceptanceError::Generic(anyhow!("{err}"))),
514        }
515    }
516    Ok(())
517}
518
519#[cfg(test)]
520mod tests {
521    use dusk_core::signatures::bls::{PublicKey, SecretKey};
522    use rand::rngs::StdRng;
523    use rand::Rng;
524    use rand::{CryptoRng, RngCore, SeedableRng};
525    use wallet_core::transaction::moonlight_deployment;
526
527    use super::*;
528
529    fn new_moonlight_deploy_tx<R: RngCore + CryptoRng>(
530        rng: &mut R,
531        bytecode: Vec<u8>,
532        init_args: Vec<u8>,
533    ) -> dusk_core::transfer::Transaction {
534        const CHAIN_ID: u8 = 0xfa;
535        let sk = SecretKey::random(rng);
536        let pk = PublicKey::from(&SecretKey::random(rng));
537
538        let gas_limit: u64 = rng.gen();
539        let gas_price: u64 = rng.gen();
540        let nonce: u64 = rng.gen();
541        let deploy_nonce: u64 = rng.gen();
542
543        moonlight_deployment(
544            &sk,
545            bytecode,
546            &pk,
547            init_args,
548            gas_limit,
549            gas_price,
550            nonce,
551            deploy_nonce,
552            CHAIN_ID,
553        )
554        .expect("should create a transaction")
555    }
556
557    const MAX_MOONLIGHT_ARG_SIZE: usize = 64 * 1024 - 2320;
558
559    #[test]
560    fn test_tx_serialization_check_normal() {
561        let mut rng = StdRng::seed_from_u64(42);
562        let tx = new_moonlight_deploy_tx(
563            &mut rng,
564            vec![0; 64 * 1024],
565            vec![0; MAX_MOONLIGHT_ARG_SIZE],
566        );
567        let result = check_tx_serialization(&tx);
568        assert!(matches!(result, Ok(())));
569    }
570
571    #[test]
572    fn test_tx_serialization_check_tx_too_large() {
573        let mut rng = StdRng::seed_from_u64(42);
574        let tx = new_moonlight_deploy_tx(
575            &mut rng,
576            vec![0; 64 * 1024],
577            vec![0; MAX_MOONLIGHT_ARG_SIZE + 1],
578        );
579        let result = check_tx_serialization(&tx);
580        assert!(matches!(result, Err(TxAcceptanceError::TooLarge)));
581    }
582}