dusk_node/
mempool.rs

1// This Source Code Form is subject to the terms of the Mozilla Public
2// License, v. 2.0. If a copy of the MPL was not distributed with this
3// file, You can obtain one at http://mozilla.org/MPL/2.0/.
4//
5// Copyright (c) DUSK NETWORK. All rights reserved.
6
7pub mod conf;
8
9use std::sync::Arc;
10use std::time::Duration;
11
12use anyhow::anyhow;
13use async_trait::async_trait;
14use conf::{
15    DEFAULT_DOWNLOAD_REDUNDANCY, DEFAULT_EXPIRY_TIME, DEFAULT_IDLE_INTERVAL,
16};
17use dusk_consensus::config::MAX_BLOCK_SIZE;
18use node_data::events::{Event, TransactionEvent};
19use node_data::get_current_timestamp;
20use node_data::ledger::{Header, SpendingId, Transaction};
21use node_data::message::{payload, AsyncQueue, Payload, Topics};
22use rkyv::ser::serializers::{
23    BufferScratch, BufferSerializer, BufferSerializerError,
24    CompositeSerializer, CompositeSerializerError,
25};
26use rkyv::ser::Serializer;
27use rkyv::Infallible;
28use thiserror::Error;
29use tokio::sync::mpsc::Sender;
30use tokio::sync::RwLock;
31use tracing::{error, info, warn};
32
33use crate::database::{Ledger, Mempool};
34use crate::mempool::conf::Params;
35use crate::vm::PreverificationResult;
36use crate::{database, vm, LongLivedService, Message, Network};
37
38const TOPICS: &[u8] = &[Topics::Tx as u8];
39
40#[derive(Debug, Error)]
41pub enum TxAcceptanceError {
42    #[error("this transaction exists in the mempool")]
43    AlreadyExistsInMempool,
44    #[error("this transaction exists in the ledger")]
45    AlreadyExistsInLedger,
46    #[error("this transaction's spendId exists in the mempool")]
47    SpendIdExistsInMempool,
48    #[error("this transaction is invalid {0}")]
49    VerificationFailed(String),
50    #[error("gas price lower than minimum {0}")]
51    GasPriceTooLow(u64),
52    #[error("gas limit lower than minimum {0}")]
53    GasLimitTooLow(u64),
54    #[error("Maximum count of transactions exceeded {0}")]
55    MaxTxnCountExceeded(usize),
56    #[error("this transaction is too large to be serialized")]
57    TooLarge,
58    #[error("Maximum transaction size exceeded {0}")]
59    MaxSizeExceeded(usize),
60    #[error("A generic error occurred {0}")]
61    Generic(anyhow::Error),
62}
63
64impl From<anyhow::Error> for TxAcceptanceError {
65    fn from(err: anyhow::Error) -> Self {
66        Self::Generic(err)
67    }
68}
69
70pub struct MempoolSrv {
71    inbound: AsyncQueue<Message>,
72    conf: Params,
73    /// Sender channel for sending out RUES events
74    event_sender: Sender<Event>,
75}
76
77impl MempoolSrv {
78    pub fn new(conf: Params, event_sender: Sender<Event>) -> Self {
79        info!("MempoolSrv::new with conf {}", conf);
80        Self {
81            inbound: AsyncQueue::bounded(
82                conf.max_queue_size,
83                "mempool_inbound",
84            ),
85            conf,
86            event_sender,
87        }
88    }
89}
90
91#[async_trait]
92impl<N: Network, DB: database::DB, VM: vm::VMExecution>
93    LongLivedService<N, DB, VM> for MempoolSrv
94{
95    async fn execute(
96        &mut self,
97        network: Arc<RwLock<N>>,
98        db: Arc<RwLock<DB>>,
99        vm: Arc<RwLock<VM>>,
100    ) -> anyhow::Result<usize> {
101        LongLivedService::<N, DB, VM>::add_routes(
102            self,
103            TOPICS,
104            self.inbound.clone(),
105            &network,
106        )
107        .await?;
108
109        // Request mempool update from N alive peers
110        self.request_mempool(&network).await;
111
112        let idle_interval =
113            self.conf.idle_interval.unwrap_or(DEFAULT_IDLE_INTERVAL);
114
115        let mempool_expiry = self
116            .conf
117            .mempool_expiry
118            .unwrap_or(DEFAULT_EXPIRY_TIME)
119            .as_secs();
120
121        // Mempool service loop
122        let mut on_idle_event = tokio::time::interval(idle_interval);
123        loop {
124            tokio::select! {
125                biased;
126                _ = on_idle_event.tick() => {
127                    info!(event = "mempool_idle", interval = ?idle_interval);
128
129                    let expiration_time = get_current_timestamp()
130                        .checked_sub(mempool_expiry)
131                        .expect("valid duration");
132
133                    // Remove expired transactions from the mempool
134                    db.read().await.update(|db| {
135                        let expired_txs = db.mempool_expired_txs(expiration_time).unwrap_or_else(|e| {
136                            error!("cannot get expired txs: {e}");
137                            vec![]
138                        });
139                        for tx_id in expired_txs {
140                            info!(event = "expired_tx", hash = hex::encode(tx_id));
141                            let deleted_txs = db.delete_mempool_tx(tx_id, true).unwrap_or_else(|e| {
142                                error!("cannot delete expired tx: {e}");
143                                vec![]
144                            });
145                            for deleted_tx_id in deleted_txs{
146                                let event = TransactionEvent::Removed(deleted_tx_id);
147                                info!(event = "mempool_deleted", hash = hex::encode(deleted_tx_id));
148                                if let Err(e) = self.event_sender.try_send(event.into()) {
149                                    warn!("cannot notify mempool removed transaction {e}")
150                                };
151                            }
152                        }
153                        Ok(())
154                    })?;
155
156                },
157                msg = self.inbound.recv() => {
158                    if let Ok(msg) = msg {
159                        match &msg.payload {
160                            Payload::Transaction(tx) => {
161                                let accept = self.accept_tx(&db, &vm, tx);
162                                if let Err(e) = accept.await {
163                                    error!("Tx {} not accepted: {e}", hex::encode(tx.id()));
164                                    continue;
165                                }
166
167                                let network = network.read().await;
168                                if let Err(e) = network.broadcast(&msg).await {
169                                    warn!("Unable to broadcast accepted tx: {e}")
170                                };
171                            }
172                            _ => error!("invalid inbound message payload"),
173                        }
174                    }
175                }
176            }
177        }
178    }
179
180    /// Returns service name.
181    fn name(&self) -> &'static str {
182        "mempool"
183    }
184}
185
186impl MempoolSrv {
187    async fn accept_tx<DB: database::DB, VM: vm::VMExecution>(
188        &mut self,
189        db: &Arc<RwLock<DB>>,
190        vm: &Arc<RwLock<VM>>,
191        tx: &Transaction,
192    ) -> Result<(), TxAcceptanceError> {
193        let max_mempool_txn_count = self.conf.max_mempool_txn_count;
194
195        let events =
196            MempoolSrv::check_tx(db, vm, tx, false, max_mempool_txn_count)
197                .await?;
198
199        tracing::info!(
200            event = "transaction accepted",
201            hash = hex::encode(tx.id())
202        );
203
204        for tx_event in events {
205            let node_event = tx_event.into();
206            if let Err(e) = self.event_sender.try_send(node_event) {
207                warn!("cannot notify mempool accepted transaction {e}")
208            };
209        }
210
211        Ok(())
212    }
213
214    pub async fn check_tx<'t, DB: database::DB, VM: vm::VMExecution>(
215        db: &Arc<RwLock<DB>>,
216        vm: &Arc<RwLock<VM>>,
217        tx: &'t Transaction,
218        dry_run: bool,
219        max_mempool_txn_count: usize,
220    ) -> Result<Vec<TransactionEvent<'t>>, TxAcceptanceError> {
221        let tx_id = tx.id();
222        let tx_size = tx.size();
223
224        // We consider the maximum transaction size as the total avilable block
225        // space minus the minimum size of the block header (i.e., the size of
226        // the header in a first-iteration block with no faults)
227        let min_header_size = Header::default().size();
228        let max_tx_size = MAX_BLOCK_SIZE - min_header_size;
229        if tx_size > max_tx_size {
230            return Err(TxAcceptanceError::MaxSizeExceeded(tx_size));
231        }
232
233        check_tx_serialization(&tx.inner)?;
234
235        if tx.gas_price() < 1 {
236            return Err(TxAcceptanceError::GasPriceTooLow(1));
237        }
238
239        if tx.inner.deploy().is_some() {
240            let vm = vm.read().await;
241            let min_deployment_gas_price = vm.min_deployment_gas_price();
242            if tx.gas_price() < min_deployment_gas_price {
243                return Err(TxAcceptanceError::GasPriceTooLow(
244                    min_deployment_gas_price,
245                ));
246            }
247
248            let gas_per_deploy_byte = vm.gas_per_deploy_byte();
249            let deploy_charge = tx
250                .inner
251                .deploy_charge(gas_per_deploy_byte, vm.min_deploy_points());
252            if tx.inner.gas_limit() < deploy_charge {
253                return Err(TxAcceptanceError::GasLimitTooLow(deploy_charge));
254            }
255        } else {
256            let vm = vm.read().await;
257            let min_gas_limit = vm.min_gas_limit();
258            if tx.inner.gas_limit() < min_gas_limit {
259                return Err(TxAcceptanceError::GasLimitTooLow(min_gas_limit));
260            }
261        }
262
263        // Perform basic checks on the transaction
264        let tx_to_delete = db.read().await.view(|view| {
265            // ensure transaction does not exist in the mempool
266            if view.mempool_tx_exists(tx_id)? {
267                return Err(TxAcceptanceError::AlreadyExistsInMempool);
268            }
269
270            // ensure transaction does not exist in the blockchain
271            if view.ledger_tx_exists(&tx_id)? {
272                return Err(TxAcceptanceError::AlreadyExistsInLedger);
273            }
274
275            let txs_count = view.mempool_txs_count();
276            if txs_count >= max_mempool_txn_count {
277                // Get the lowest fee transaction to delete
278                let (lowest_price, to_delete) = view
279                    .mempool_txs_ids_sorted_by_low_fee()
280                    .next()
281                    .ok_or(anyhow::anyhow!("Cannot get lowest fee tx"))?;
282
283                if tx.gas_price() < lowest_price {
284                    // Or error if the gas price proposed is the lowest of all
285                    // the transactions in the mempool
286                    Err(TxAcceptanceError::MaxTxnCountExceeded(
287                        max_mempool_txn_count,
288                    ))
289                } else {
290                    Ok(Some(to_delete))
291                }
292            } else {
293                Ok(None)
294            }
295        })?;
296
297        // VM Preverify call
298        let preverification_data =
299            vm.read().await.preverify(tx).map_err(|e| {
300                TxAcceptanceError::VerificationFailed(format!("{e:?}"))
301            })?;
302
303        if let PreverificationResult::FutureNonce {
304            account,
305            state,
306            nonce_used,
307        } = preverification_data
308        {
309            db.read().await.view(|db| {
310                for nonce in state.nonce + 1..nonce_used {
311                    let spending_id = SpendingId::AccountNonce(account, nonce);
312                    if db
313                        .mempool_txs_by_spendable_ids(&[spending_id])
314                        .is_empty()
315                    {
316                        return Err(TxAcceptanceError::VerificationFailed(
317                            format!("Missing intermediate nonce {nonce}"),
318                        ));
319                    }
320                }
321                Ok(())
322            })?;
323        }
324
325        let mut events = vec![];
326
327        // Try to add the transaction to the mempool
328        db.read().await.update_dry_run(dry_run, |db| {
329            let spend_ids = tx.to_spend_ids();
330
331            let mut replaced = false;
332            // ensure spend_ids do not exist in the mempool
333            for m_tx_id in db.mempool_txs_by_spendable_ids(&spend_ids) {
334                if let Some(m_tx) = db.mempool_tx(m_tx_id)? {
335                    if m_tx.inner.gas_price() < tx.inner.gas_price() {
336                        for deleted in db.delete_mempool_tx(m_tx_id, false)? {
337                            events.push(TransactionEvent::Removed(deleted));
338                            replaced = true;
339                        }
340                    } else {
341                        return Err(
342                            TxAcceptanceError::SpendIdExistsInMempool.into()
343                        );
344                    }
345                }
346            }
347
348            events.push(TransactionEvent::Included(tx));
349
350            if !replaced {
351                if let Some(to_delete) = tx_to_delete {
352                    for deleted in db.delete_mempool_tx(to_delete, true)? {
353                        events.push(TransactionEvent::Removed(deleted));
354                    }
355                }
356            }
357            // Persist transaction in mempool storage
358
359            let now = get_current_timestamp();
360
361            db.store_mempool_tx(tx, now)
362        })?;
363        Ok(events)
364    }
365
366    /// Requests full mempool data from N alive peers
367    ///
368    /// Message flow:
369    /// GetMempool -> Inv -> GetResource -> Tx
370    async fn request_mempool<N: Network>(&self, network: &Arc<RwLock<N>>) {
371        const WAIT_TIMEOUT: Duration = Duration::from_secs(5);
372        let max_peers = self
373            .conf
374            .mempool_download_redundancy
375            .unwrap_or(DEFAULT_DOWNLOAD_REDUNDANCY);
376
377        let net = network.read().await;
378        net.wait_for_alive_nodes(max_peers, WAIT_TIMEOUT).await;
379
380        let msg = payload::GetMempool::default().into();
381        if let Err(err) = net.send_to_alive_peers(msg, max_peers).await {
382            error!("could not request mempool from network: {err}");
383        }
384    }
385}
386
387fn check_tx_serialization(
388    tx: &dusk_core::transfer::Transaction,
389) -> Result<(), TxAcceptanceError> {
390    // The transaction is an argument to the transfer contract, so
391    // its serialized size has to be within the same 64Kib limit.
392    const SCRATCH_BUF_BYTES: usize = 1024;
393    const ARGBUF_LEN: usize = 64 * 1024;
394    let stripped_tx = tx.strip_off_bytecode();
395    let tx = stripped_tx.as_ref().unwrap_or(tx);
396    let mut sbuf = [0u8; SCRATCH_BUF_BYTES];
397    let mut buffer = [0u8; ARGBUF_LEN];
398    let scratch = BufferScratch::new(&mut sbuf);
399    let ser = BufferSerializer::new(&mut buffer);
400    let mut ser = CompositeSerializer::new(ser, scratch, Infallible);
401    if let Err(err) = ser.serialize_value(tx) {
402        match err {
403            CompositeSerializerError::SerializerError(err) => match err {
404                BufferSerializerError::Overflow { .. } => {
405                    return Err(TxAcceptanceError::TooLarge);
406                }
407            },
408            err => return Err(TxAcceptanceError::Generic(anyhow!("{err}"))),
409        }
410    }
411    Ok(())
412}
413
414#[cfg(test)]
415mod tests {
416    use dusk_core::signatures::bls::{PublicKey, SecretKey};
417    use rand::rngs::StdRng;
418    use rand::Rng;
419    use rand::{CryptoRng, RngCore, SeedableRng};
420    use wallet_core::transaction::moonlight_deployment;
421
422    use super::*;
423
424    fn new_moonlight_deploy_tx<R: RngCore + CryptoRng>(
425        rng: &mut R,
426        bytecode: Vec<u8>,
427        init_args: Vec<u8>,
428    ) -> dusk_core::transfer::Transaction {
429        const CHAIN_ID: u8 = 0xfa;
430        let sk = SecretKey::random(rng);
431        let pk = PublicKey::from(&SecretKey::random(rng));
432
433        let gas_limit: u64 = rng.gen();
434        let gas_price: u64 = rng.gen();
435        let nonce: u64 = rng.gen();
436        let deploy_nonce: u64 = rng.gen();
437
438        moonlight_deployment(
439            &sk,
440            bytecode,
441            &pk,
442            init_args,
443            gas_limit,
444            gas_price,
445            nonce,
446            deploy_nonce,
447            CHAIN_ID,
448        )
449        .expect("should create a transaction")
450    }
451
452    const MAX_MOONLIGHT_ARG_SIZE: usize = 64 * 1024 - 2320;
453
454    #[test]
455    fn test_tx_serialization_check_normal() {
456        let mut rng = StdRng::seed_from_u64(42);
457        let tx = new_moonlight_deploy_tx(
458            &mut rng,
459            vec![0; 64 * 1024],
460            vec![0; MAX_MOONLIGHT_ARG_SIZE],
461        );
462        let result = check_tx_serialization(&tx);
463        assert!(matches!(result, Ok(())));
464    }
465
466    #[test]
467    fn test_tx_serialization_check_tx_too_large() {
468        let mut rng = StdRng::seed_from_u64(42);
469        let tx = new_moonlight_deploy_tx(
470            &mut rng,
471            vec![0; 64 * 1024],
472            vec![0; MAX_MOONLIGHT_ARG_SIZE + 1],
473        );
474        let result = check_tx_serialization(&tx);
475        assert!(matches!(result, Err(TxAcceptanceError::TooLarge)));
476    }
477}