dusk_node/
mempool.rs

1// This Source Code Form is subject to the terms of the Mozilla Public
2// License, v. 2.0. If a copy of the MPL was not distributed with this
3// file, You can obtain one at http://mozilla.org/MPL/2.0/.
4//
5// Copyright (c) DUSK NETWORK. All rights reserved.
6
7pub mod conf;
8
9use std::sync::Arc;
10use std::time::Duration;
11
12use anyhow::anyhow;
13use async_trait::async_trait;
14use conf::{
15    DEFAULT_DOWNLOAD_REDUNDANCY, DEFAULT_EXPIRY_TIME, DEFAULT_IDLE_INTERVAL,
16};
17use dusk_consensus::config::MAX_BLOCK_SIZE;
18use dusk_consensus::errors::BlobError;
19use dusk_core::TxPreconditionError;
20use node_data::events::{Event, TransactionEvent};
21use node_data::get_current_timestamp;
22use node_data::ledger::{Header, SpendingId, Transaction};
23use node_data::message::{payload, AsyncQueue, Payload, Topics};
24use rkyv::ser::serializers::{
25    BufferScratch, BufferSerializer, BufferSerializerError,
26    CompositeSerializer, CompositeSerializerError,
27};
28use rkyv::ser::Serializer;
29use rkyv::Infallible;
30use thiserror::Error;
31use tokio::sync::mpsc::Sender;
32use tokio::sync::RwLock;
33use tracing::{error, info, warn};
34
35use crate::database::{Ledger, Mempool};
36use crate::mempool::conf::Params;
37use crate::vm::PreverificationResult;
38use crate::{database, vm, LongLivedService, Message, Network};
39
40const TOPICS: &[u8] = &[Topics::Tx as u8];
41
42#[derive(Debug, Error)]
43pub enum TxAcceptanceError {
44    #[error("this transaction exists in the mempool")]
45    AlreadyExistsInMempool,
46    #[error("this transaction exists in the ledger")]
47    AlreadyExistsInLedger,
48    #[error("Transaction blob id {} is missing sidecar", hex::encode(.0))]
49    BlobMissingSidecar([u8; 32]),
50    #[error("No blobs provided")]
51    BlobEmpty,
52    #[error("Transaction has too many blobs: {0}")]
53    BlobTooMany(usize),
54    #[error("Invalid blob: {0}")]
55    BlobInvalid(String),
56    #[error("this transaction's spendId exists in the mempool")]
57    SpendIdExistsInMempool,
58    #[error("this transaction is invalid {0}")]
59    VerificationFailed(String),
60    #[error("gas price lower than minimum {0}")]
61    GasPriceTooLow(u64),
62    #[error("gas limit lower than minimum {0}")]
63    GasLimitTooLow(u64),
64    #[error("Maximum count of transactions exceeded {0}")]
65    MaxTxnCountExceeded(usize),
66    #[error("this transaction is too large to be serialized")]
67    TooLarge,
68    #[error("Maximum transaction size exceeded {0}")]
69    MaxSizeExceeded(usize),
70    #[error("A generic error occurred {0}")]
71    Generic(anyhow::Error),
72}
73
74impl From<anyhow::Error> for TxAcceptanceError {
75    fn from(err: anyhow::Error) -> Self {
76        Self::Generic(err)
77    }
78}
79
80impl From<BlobError> for TxAcceptanceError {
81    fn from(err: BlobError) -> Self {
82        match err {
83            BlobError::MissingSidecar(id) => {
84                TxAcceptanceError::BlobMissingSidecar(id)
85            }
86            BlobError::BlobEmpty => TxAcceptanceError::BlobEmpty,
87            BlobError::BlobTooMany(n) => TxAcceptanceError::BlobTooMany(n),
88            BlobError::BlobInvalid(msg) => TxAcceptanceError::BlobInvalid(msg),
89        }
90    }
91}
92
93impl From<TxPreconditionError> for TxAcceptanceError {
94    fn from(err: TxPreconditionError) -> Self {
95        match err {
96            TxPreconditionError::BlobLowLimit(min) => {
97                TxAcceptanceError::GasLimitTooLow(min)
98            }
99            TxPreconditionError::DeployLowLimit(min) => {
100                TxAcceptanceError::GasLimitTooLow(min)
101            }
102            TxPreconditionError::DeployLowPrice(min) => {
103                TxAcceptanceError::GasPriceTooLow(min)
104            }
105            TxPreconditionError::BlobEmpty => TxAcceptanceError::BlobEmpty,
106            TxPreconditionError::BlobTooMany(n) => {
107                TxAcceptanceError::BlobTooMany(n)
108            }
109        }
110    }
111}
112
113pub struct MempoolSrv {
114    inbound: AsyncQueue<Message>,
115    conf: Params,
116    /// Sender channel for sending out RUES events
117    event_sender: Sender<Event>,
118}
119
120impl MempoolSrv {
121    pub fn new(conf: Params, event_sender: Sender<Event>) -> Self {
122        info!("MempoolSrv::new with conf {}", conf);
123        Self {
124            inbound: AsyncQueue::bounded(
125                conf.max_queue_size,
126                "mempool_inbound",
127            ),
128            conf,
129            event_sender,
130        }
131    }
132}
133
134#[async_trait]
135impl<N: Network, DB: database::DB, VM: vm::VMExecution>
136    LongLivedService<N, DB, VM> for MempoolSrv
137{
138    async fn execute(
139        &mut self,
140        network: Arc<RwLock<N>>,
141        db: Arc<RwLock<DB>>,
142        vm: Arc<RwLock<VM>>,
143    ) -> anyhow::Result<usize> {
144        LongLivedService::<N, DB, VM>::add_routes(
145            self,
146            TOPICS,
147            self.inbound.clone(),
148            &network,
149        )
150        .await?;
151
152        // Request mempool update from N alive peers
153        self.request_mempool(&network).await;
154
155        let idle_interval =
156            self.conf.idle_interval.unwrap_or(DEFAULT_IDLE_INTERVAL);
157
158        let mempool_expiry = self
159            .conf
160            .mempool_expiry
161            .unwrap_or(DEFAULT_EXPIRY_TIME)
162            .as_secs();
163
164        // Mempool service loop
165        let mut on_idle_event = tokio::time::interval(idle_interval);
166        loop {
167            tokio::select! {
168                biased;
169                _ = on_idle_event.tick() => {
170                    info!(event = "mempool_idle", interval = ?idle_interval);
171
172                    let expiration_time = get_current_timestamp()
173                        .checked_sub(mempool_expiry)
174                        .expect("valid duration");
175
176                    // Remove expired transactions from the mempool
177                    db.read().await.update(|db| {
178                        let expired_txs = db.mempool_expired_txs(expiration_time).unwrap_or_else(|e| {
179                            error!("cannot get expired txs: {e}");
180                            vec![]
181                        });
182                        for tx_id in expired_txs {
183                            info!(event = "expired_tx", hash = hex::encode(tx_id));
184                            let deleted_txs = db.delete_mempool_tx(tx_id, true).unwrap_or_else(|e| {
185                                error!("cannot delete expired tx: {e}");
186                                vec![]
187                            });
188                            for deleted_tx_id in deleted_txs{
189                                let event = TransactionEvent::Removed(deleted_tx_id);
190                                info!(event = "mempool_deleted", hash = hex::encode(deleted_tx_id));
191                                if let Err(e) = self.event_sender.try_send(event.into()) {
192                                    warn!("cannot notify mempool removed transaction {e}")
193                                };
194                            }
195                        }
196                        Ok(())
197                    })?;
198
199                },
200                msg = self.inbound.recv() => {
201                    if let Ok(msg) = msg {
202                        match &msg.payload {
203                            Payload::Transaction(tx) => {
204                                let accept = self.accept_tx(&db, &vm, tx);
205                                if let Err(e) = accept.await {
206                                    error!("Tx {} not accepted: {e}", hex::encode(tx.id()));
207                                    continue;
208                                }
209
210                                let network = network.read().await;
211                                if let Err(e) = network.broadcast(&msg).await {
212                                    warn!("Unable to broadcast accepted tx: {e}")
213                                };
214                            }
215                            _ => error!("invalid inbound message payload"),
216                        }
217                    }
218                }
219            }
220        }
221    }
222
223    /// Returns service name.
224    fn name(&self) -> &'static str {
225        "mempool"
226    }
227}
228
229impl MempoolSrv {
230    async fn accept_tx<DB: database::DB, VM: vm::VMExecution>(
231        &mut self,
232        db: &Arc<RwLock<DB>>,
233        vm: &Arc<RwLock<VM>>,
234        tx: &Transaction,
235    ) -> Result<(), TxAcceptanceError> {
236        let max_mempool_txn_count = self.conf.max_mempool_txn_count;
237
238        let events =
239            MempoolSrv::check_tx(db, vm, tx, false, max_mempool_txn_count)
240                .await?;
241
242        tracing::info!(
243            event = "transaction accepted",
244            hash = hex::encode(tx.id())
245        );
246
247        for tx_event in events {
248            let node_event = tx_event.into();
249            if let Err(e) = self.event_sender.try_send(node_event) {
250                warn!("cannot notify mempool accepted transaction {e}")
251            };
252        }
253
254        Ok(())
255    }
256
257    pub async fn check_tx<'t, DB: database::DB, VM: vm::VMExecution>(
258        db: &Arc<RwLock<DB>>,
259        vm: &Arc<RwLock<VM>>,
260        tx: &'t Transaction,
261        dry_run: bool,
262        max_mempool_txn_count: usize,
263    ) -> Result<Vec<TransactionEvent<'t>>, TxAcceptanceError> {
264        let tx_id = tx.id();
265        let tx_size = tx.size();
266
267        // We consider the maximum transaction size as the total avilable block
268        // space minus the minimum size of the block header (i.e., the size of
269        // the header in a first-iteration block with no faults)
270        let min_header_size = Header::default().size();
271        let max_tx_size = MAX_BLOCK_SIZE - min_header_size;
272        if tx_size > max_tx_size {
273            return Err(TxAcceptanceError::MaxSizeExceeded(tx_size));
274        }
275
276        check_tx_serialization(&tx.inner)?;
277
278        if tx.gas_price() < 1 {
279            return Err(TxAcceptanceError::GasPriceTooLow(1));
280        }
281
282        {
283            // Mimic the VM's additional checks for transactions
284            let vm = vm.read().await;
285
286            // Check deployment tx
287            if tx.inner.deploy().is_some() {
288                let min_deployment_gas_price = vm.min_deployment_gas_price();
289                let gas_per_deploy_byte = vm.gas_per_deploy_byte();
290                let min_deploy_points = vm.min_deploy_points();
291                tx.inner.deploy_check(
292                    gas_per_deploy_byte,
293                    min_deployment_gas_price,
294                    min_deploy_points,
295                )?;
296            }
297
298            // Check blob tx
299            if tx.inner.blob().is_some() {
300                db.read()
301                    .await
302                    .view(|db| {
303                        db.block_label_by_height(vm.blob_activation_height())
304                    })
305                    .map_err(|e| {
306                        anyhow!("Cannot get blob activation height: {e}")
307                    })?
308                    .ok_or(anyhow!(
309                        "Blobs acceptance will start at block height: {}",
310                        vm.blob_activation_height()
311                    ))?;
312
313                let gas_per_blob = vm.gas_per_blob();
314                tx.inner.blob_check(gas_per_blob)?;
315                dusk_consensus::validate_blob_sidecars(tx)?;
316            }
317
318            // Check global minimum gas limit
319            let min_gas_limit = vm.min_gas_limit();
320            if tx.inner.gas_limit() < min_gas_limit {
321                return Err(TxAcceptanceError::GasLimitTooLow(min_gas_limit));
322            }
323        }
324
325        // Perform basic checks on the transaction
326        let tx_to_delete = db.read().await.view(|view| {
327            // ensure transaction does not exist in the mempool
328            if view.mempool_tx_exists(tx_id)? {
329                return Err(TxAcceptanceError::AlreadyExistsInMempool);
330            }
331
332            // ensure transaction does not exist in the blockchain
333            if view.ledger_tx_exists(&tx_id)? {
334                return Err(TxAcceptanceError::AlreadyExistsInLedger);
335            }
336
337            let txs_count = view.mempool_txs_count();
338            if txs_count >= max_mempool_txn_count {
339                // Get the lowest fee transaction to delete
340                let (lowest_price, to_delete) = view
341                    .mempool_txs_ids_sorted_by_low_fee()
342                    .next()
343                    .ok_or(anyhow::anyhow!("Cannot get lowest fee tx"))?;
344
345                if tx.gas_price() < lowest_price {
346                    // Or error if the gas price proposed is the lowest of all
347                    // the transactions in the mempool
348                    Err(TxAcceptanceError::MaxTxnCountExceeded(
349                        max_mempool_txn_count,
350                    ))
351                } else {
352                    Ok(Some(to_delete))
353                }
354            } else {
355                Ok(None)
356            }
357        })?;
358
359        // VM Preverify call
360        let preverification_data =
361            vm.read().await.preverify(tx).map_err(|e| {
362                TxAcceptanceError::VerificationFailed(format!("{e}"))
363            })?;
364
365        if let PreverificationResult::FutureNonce {
366            account,
367            state,
368            nonce_used,
369        } = preverification_data
370        {
371            db.read().await.view(|db| {
372                for nonce in state.nonce + 1..nonce_used {
373                    let spending_id = SpendingId::AccountNonce(account, nonce);
374                    if db
375                        .mempool_txs_by_spendable_ids(&[spending_id])
376                        .is_empty()
377                    {
378                        return Err(TxAcceptanceError::VerificationFailed(
379                            format!("Missing intermediate nonce {nonce}"),
380                        ));
381                    }
382                }
383                Ok(())
384            })?;
385        }
386
387        let mut events = vec![];
388
389        // Try to add the transaction to the mempool
390        db.read().await.update_dry_run(dry_run, |db| {
391            let spend_ids = tx.to_spend_ids();
392
393            let mut replaced = false;
394            // ensure spend_ids do not exist in the mempool
395            for m_tx_id in db.mempool_txs_by_spendable_ids(&spend_ids) {
396                if let Some(m_tx) = db.mempool_tx(m_tx_id)? {
397                    // If the transaction spendingId is already in the mempool
398                    // (same nonce or same nullifier), we check if it can be
399                    // replaced by the new transaction based on gas price and
400                    // gas limit.
401                    // If the new transaction has a higher gas price or the same
402                    // gas price but a higher gas limit, we replace the old
403                    // transaction with the new one.
404                    if m_tx.inner.gas_price() < tx.inner.gas_price()
405                        || (m_tx.inner.gas_price() == tx.inner.gas_price()
406                            && m_tx.inner.gas_limit() < tx.inner.gas_limit())
407                    {
408                        for deleted in db.delete_mempool_tx(m_tx_id, false)? {
409                            events.push(TransactionEvent::Removed(deleted));
410                            replaced = true;
411                        }
412                    } else {
413                        return Err(
414                            TxAcceptanceError::SpendIdExistsInMempool.into()
415                        );
416                    }
417                }
418            }
419
420            events.push(TransactionEvent::Included(tx));
421
422            if !replaced {
423                if let Some(to_delete) = tx_to_delete {
424                    for deleted in db.delete_mempool_tx(to_delete, true)? {
425                        events.push(TransactionEvent::Removed(deleted));
426                    }
427                }
428            }
429            // Persist transaction in mempool storage
430
431            let now = get_current_timestamp();
432
433            db.store_mempool_tx(tx, now)
434        })?;
435        Ok(events)
436    }
437
438    /// Requests full mempool data from N alive peers
439    ///
440    /// Message flow:
441    /// GetMempool -> Inv -> GetResource -> Tx
442    async fn request_mempool<N: Network>(&self, network: &Arc<RwLock<N>>) {
443        const WAIT_TIMEOUT: Duration = Duration::from_secs(5);
444        let max_peers = self
445            .conf
446            .mempool_download_redundancy
447            .unwrap_or(DEFAULT_DOWNLOAD_REDUNDANCY);
448
449        let net = network.read().await;
450        net.wait_for_alive_nodes(max_peers, WAIT_TIMEOUT).await;
451
452        let msg = payload::GetMempool::default().into();
453        if let Err(err) = net.send_to_alive_peers(msg, max_peers).await {
454            error!("could not request mempool from network: {err}");
455        }
456    }
457}
458
459fn check_tx_serialization(
460    tx: &dusk_core::transfer::Transaction,
461) -> Result<(), TxAcceptanceError> {
462    // The transaction is an argument to the transfer contract, so
463    // its serialized size has to be within the same 64Kib limit.
464    const SCRATCH_BUF_BYTES: usize = 1024;
465    const ARGBUF_LEN: usize = 64 * 1024;
466    let stripped_tx = tx.strip_off_bytecode().or(tx.blob_to_memo());
467    let mut sbuf = [0u8; SCRATCH_BUF_BYTES];
468    let mut buffer = [0u8; ARGBUF_LEN];
469    let scratch = BufferScratch::new(&mut sbuf);
470    let ser = BufferSerializer::new(&mut buffer);
471    let mut ser = CompositeSerializer::new(ser, scratch, Infallible);
472    if let Err(err) = ser.serialize_value(stripped_tx.as_ref().unwrap_or(tx)) {
473        match err {
474            CompositeSerializerError::SerializerError(err) => match err {
475                BufferSerializerError::Overflow { .. } => {
476                    return Err(TxAcceptanceError::TooLarge);
477                }
478            },
479            err => return Err(TxAcceptanceError::Generic(anyhow!("{err}"))),
480        }
481    }
482    Ok(())
483}
484
485#[cfg(test)]
486mod tests {
487    use dusk_core::signatures::bls::{PublicKey, SecretKey};
488    use rand::rngs::StdRng;
489    use rand::Rng;
490    use rand::{CryptoRng, RngCore, SeedableRng};
491    use wallet_core::transaction::moonlight_deployment;
492
493    use super::*;
494
495    fn new_moonlight_deploy_tx<R: RngCore + CryptoRng>(
496        rng: &mut R,
497        bytecode: Vec<u8>,
498        init_args: Vec<u8>,
499    ) -> dusk_core::transfer::Transaction {
500        const CHAIN_ID: u8 = 0xfa;
501        let sk = SecretKey::random(rng);
502        let pk = PublicKey::from(&SecretKey::random(rng));
503
504        let gas_limit: u64 = rng.gen();
505        let gas_price: u64 = rng.gen();
506        let nonce: u64 = rng.gen();
507        let deploy_nonce: u64 = rng.gen();
508
509        moonlight_deployment(
510            &sk,
511            bytecode,
512            &pk,
513            init_args,
514            gas_limit,
515            gas_price,
516            nonce,
517            deploy_nonce,
518            CHAIN_ID,
519        )
520        .expect("should create a transaction")
521    }
522
523    const MAX_MOONLIGHT_ARG_SIZE: usize = 64 * 1024 - 2320;
524
525    #[test]
526    fn test_tx_serialization_check_normal() {
527        let mut rng = StdRng::seed_from_u64(42);
528        let tx = new_moonlight_deploy_tx(
529            &mut rng,
530            vec![0; 64 * 1024],
531            vec![0; MAX_MOONLIGHT_ARG_SIZE],
532        );
533        let result = check_tx_serialization(&tx);
534        assert!(matches!(result, Ok(())));
535    }
536
537    #[test]
538    fn test_tx_serialization_check_tx_too_large() {
539        let mut rng = StdRng::seed_from_u64(42);
540        let tx = new_moonlight_deploy_tx(
541            &mut rng,
542            vec![0; 64 * 1024],
543            vec![0; MAX_MOONLIGHT_ARG_SIZE + 1],
544        );
545        let result = check_tx_serialization(&tx);
546        assert!(matches!(result, Err(TxAcceptanceError::TooLarge)));
547    }
548}