dusk_node/
mempool.rs

1// This Source Code Form is subject to the terms of the Mozilla Public
2// License, v. 2.0. If a copy of the MPL was not distributed with this
3// file, You can obtain one at http://mozilla.org/MPL/2.0/.
4//
5// Copyright (c) DUSK NETWORK. All rights reserved.
6
7pub mod conf;
8
9use std::sync::Arc;
10use std::time::Duration;
11
12use async_trait::async_trait;
13use conf::{
14    DEFAULT_DOWNLOAD_REDUNDANCY, DEFAULT_EXPIRY_TIME, DEFAULT_IDLE_INTERVAL,
15};
16use node_data::events::{Event, TransactionEvent};
17use node_data::get_current_timestamp;
18use node_data::ledger::{SpendingId, Transaction};
19use node_data::message::{payload, AsyncQueue, Payload, Topics};
20use thiserror::Error;
21use tokio::sync::mpsc::Sender;
22use tokio::sync::RwLock;
23use tracing::{error, info, warn};
24
25use crate::database::{Ledger, Mempool};
26use crate::mempool::conf::Params;
27use crate::vm::PreverificationResult;
28use crate::{database, vm, LongLivedService, Message, Network};
29
30const TOPICS: &[u8] = &[Topics::Tx as u8];
31
32#[derive(Debug, Error)]
33pub enum TxAcceptanceError {
34    #[error("this transaction exists in the mempool")]
35    AlreadyExistsInMempool,
36    #[error("this transaction exists in the ledger")]
37    AlreadyExistsInLedger,
38    #[error("this transaction's spendId exists in the mempool")]
39    SpendIdExistsInMempool,
40    #[error("this transaction is invalid {0}")]
41    VerificationFailed(String),
42    #[error("gas price lower than minimum {0}")]
43    GasPriceTooLow(u64),
44    #[error("gas limit lower than minimum {0}")]
45    GasLimitTooLow(u64),
46    #[error("Maximum count of transactions exceeded {0}")]
47    MaxTxnCountExceeded(usize),
48    #[error("A generic error occurred {0}")]
49    Generic(anyhow::Error),
50}
51
52impl From<anyhow::Error> for TxAcceptanceError {
53    fn from(err: anyhow::Error) -> Self {
54        Self::Generic(err)
55    }
56}
57
58pub struct MempoolSrv {
59    inbound: AsyncQueue<Message>,
60    conf: Params,
61    /// Sender channel for sending out RUES events
62    event_sender: Sender<Event>,
63}
64
65impl MempoolSrv {
66    pub fn new(conf: Params, event_sender: Sender<Event>) -> Self {
67        info!("MempoolSrv::new with conf {}", conf);
68        Self {
69            inbound: AsyncQueue::bounded(
70                conf.max_queue_size,
71                "mempool_inbound",
72            ),
73            conf,
74            event_sender,
75        }
76    }
77}
78
79#[async_trait]
80impl<N: Network, DB: database::DB, VM: vm::VMExecution>
81    LongLivedService<N, DB, VM> for MempoolSrv
82{
83    async fn execute(
84        &mut self,
85        network: Arc<RwLock<N>>,
86        db: Arc<RwLock<DB>>,
87        vm: Arc<RwLock<VM>>,
88    ) -> anyhow::Result<usize> {
89        LongLivedService::<N, DB, VM>::add_routes(
90            self,
91            TOPICS,
92            self.inbound.clone(),
93            &network,
94        )
95        .await?;
96
97        // Request mempool update from N alive peers
98        self.request_mempool(&network).await;
99
100        let idle_interval =
101            self.conf.idle_interval.unwrap_or(DEFAULT_IDLE_INTERVAL);
102
103        let mempool_expiry = self
104            .conf
105            .mempool_expiry
106            .unwrap_or(DEFAULT_EXPIRY_TIME)
107            .as_secs();
108
109        // Mempool service loop
110        let mut on_idle_event = tokio::time::interval(idle_interval);
111        loop {
112            tokio::select! {
113                biased;
114                _ = on_idle_event.tick() => {
115                    info!(event = "mempool_idle", interval = ?idle_interval);
116
117                    let expiration_time = get_current_timestamp()
118                        .checked_sub(mempool_expiry)
119                        .expect("valid duration");
120
121                    // Remove expired transactions from the mempool
122                    db.read().await.update(|db| {
123                        let expired_txs = db.mempool_expired_txs(expiration_time).unwrap_or_else(|e| {
124                            error!("cannot get expired txs: {e}");
125                            vec![]
126                        });
127                        for tx_id in expired_txs {
128                            info!(event = "expired_tx", hash = hex::encode(tx_id));
129                            let deleted_txs = db.delete_mempool_tx(tx_id, true).unwrap_or_else(|e| {
130                                error!("cannot delete expired tx: {e}");
131                                vec![]
132                            });
133                            for deleted_tx_id in deleted_txs{
134                                let event = TransactionEvent::Removed(deleted_tx_id);
135                                info!(event = "mempool_deleted", hash = hex::encode(deleted_tx_id));
136                                if let Err(e) = self.event_sender.try_send(event.into()) {
137                                    warn!("cannot notify mempool removed transaction {e}")
138                                };
139                            }
140                        }
141                        Ok(())
142                    })?;
143
144                },
145                msg = self.inbound.recv() => {
146                    if let Ok(msg) = msg {
147                        match &msg.payload {
148                            Payload::Transaction(tx) => {
149                                let accept = self.accept_tx(&db, &vm, tx);
150                                if let Err(e) = accept.await {
151                                    error!("Tx {} not accepted: {e}", hex::encode(tx.id()));
152                                    continue;
153                                }
154
155                                let network = network.read().await;
156                                if let Err(e) = network.broadcast(&msg).await {
157                                    warn!("Unable to broadcast accepted tx: {e}")
158                                };
159                            }
160                            _ => error!("invalid inbound message payload"),
161                        }
162                    }
163                }
164            }
165        }
166    }
167
168    /// Returns service name.
169    fn name(&self) -> &'static str {
170        "mempool"
171    }
172}
173
174impl MempoolSrv {
175    async fn accept_tx<DB: database::DB, VM: vm::VMExecution>(
176        &mut self,
177        db: &Arc<RwLock<DB>>,
178        vm: &Arc<RwLock<VM>>,
179        tx: &Transaction,
180    ) -> Result<(), TxAcceptanceError> {
181        let max_mempool_txn_count = self.conf.max_mempool_txn_count;
182
183        let events =
184            MempoolSrv::check_tx(db, vm, tx, false, max_mempool_txn_count)
185                .await?;
186
187        tracing::info!(
188            event = "transaction accepted",
189            hash = hex::encode(tx.id())
190        );
191
192        for tx_event in events {
193            let node_event = tx_event.into();
194            if let Err(e) = self.event_sender.try_send(node_event) {
195                warn!("cannot notify mempool accepted transaction {e}")
196            };
197        }
198
199        Ok(())
200    }
201
202    pub async fn check_tx<'t, DB: database::DB, VM: vm::VMExecution>(
203        db: &Arc<RwLock<DB>>,
204        vm: &Arc<RwLock<VM>>,
205        tx: &'t Transaction,
206        dry_run: bool,
207        max_mempool_txn_count: usize,
208    ) -> Result<Vec<TransactionEvent<'t>>, TxAcceptanceError> {
209        let tx_id = tx.id();
210
211        if tx.gas_price() < 1 {
212            return Err(TxAcceptanceError::GasPriceTooLow(1));
213        }
214
215        if tx.inner.deploy().is_some() {
216            let vm = vm.read().await;
217            let min_deployment_gas_price = vm.min_deployment_gas_price();
218            if tx.gas_price() < min_deployment_gas_price {
219                return Err(TxAcceptanceError::GasPriceTooLow(
220                    min_deployment_gas_price,
221                ));
222            }
223
224            let gas_per_deploy_byte = vm.gas_per_deploy_byte();
225            let deploy_charge = tx
226                .inner
227                .deploy_charge(gas_per_deploy_byte, vm.min_deploy_points());
228            if tx.inner.gas_limit() < deploy_charge {
229                return Err(TxAcceptanceError::GasLimitTooLow(deploy_charge));
230            }
231        } else {
232            let vm = vm.read().await;
233            let min_gas_limit = vm.min_gas_limit();
234            if tx.inner.gas_limit() < min_gas_limit {
235                return Err(TxAcceptanceError::GasLimitTooLow(min_gas_limit));
236            }
237        }
238
239        // Perform basic checks on the transaction
240        let tx_to_delete = db.read().await.view(|view| {
241            // ensure transaction does not exist in the mempool
242            if view.mempool_tx_exists(tx_id)? {
243                return Err(TxAcceptanceError::AlreadyExistsInMempool);
244            }
245
246            // ensure transaction does not exist in the blockchain
247            if view.ledger_tx_exists(&tx_id)? {
248                return Err(TxAcceptanceError::AlreadyExistsInLedger);
249            }
250
251            let txs_count = view.mempool_txs_count();
252            if txs_count >= max_mempool_txn_count {
253                // Get the lowest fee transaction to delete
254                let (lowest_price, to_delete) = view
255                    .mempool_txs_ids_sorted_by_low_fee()?
256                    .next()
257                    .ok_or(anyhow::anyhow!("Cannot get lowest fee tx"))?;
258
259                if tx.gas_price() < lowest_price {
260                    // Or error if the gas price proposed is the lowest of all
261                    // the transactions in the mempool
262                    Err(TxAcceptanceError::MaxTxnCountExceeded(
263                        max_mempool_txn_count,
264                    ))
265                } else {
266                    Ok(Some(to_delete))
267                }
268            } else {
269                Ok(None)
270            }
271        })?;
272
273        // VM Preverify call
274        let preverification_data =
275            vm.read().await.preverify(tx).map_err(|e| {
276                TxAcceptanceError::VerificationFailed(format!("{e:?}"))
277            })?;
278
279        if let PreverificationResult::FutureNonce {
280            account,
281            state,
282            nonce_used,
283        } = preverification_data
284        {
285            db.read().await.view(|db| {
286                for nonce in state.nonce + 1..nonce_used {
287                    let spending_id = SpendingId::AccountNonce(account, nonce);
288                    if db
289                        .mempool_txs_by_spendable_ids(&[spending_id])
290                        .is_empty()
291                    {
292                        return Err(TxAcceptanceError::VerificationFailed(
293                            format!("Missing intermediate nonce {nonce}"),
294                        ));
295                    }
296                }
297                Ok(())
298            })?;
299        }
300
301        let mut events = vec![];
302
303        // Try to add the transaction to the mempool
304        db.read().await.update_dry_run(dry_run, |db| {
305            let spend_ids = tx.to_spend_ids();
306
307            let mut replaced = false;
308            // ensure spend_ids do not exist in the mempool
309            for m_tx_id in db.mempool_txs_by_spendable_ids(&spend_ids) {
310                if let Some(m_tx) = db.mempool_tx(m_tx_id)? {
311                    if m_tx.inner.gas_price() < tx.inner.gas_price() {
312                        for deleted in db.delete_mempool_tx(m_tx_id, false)? {
313                            events.push(TransactionEvent::Removed(deleted));
314                            replaced = true;
315                        }
316                    } else {
317                        return Err(
318                            TxAcceptanceError::SpendIdExistsInMempool.into()
319                        );
320                    }
321                }
322            }
323
324            events.push(TransactionEvent::Included(tx));
325
326            if !replaced {
327                if let Some(to_delete) = tx_to_delete {
328                    for deleted in db.delete_mempool_tx(to_delete, true)? {
329                        events.push(TransactionEvent::Removed(deleted));
330                    }
331                }
332            }
333            // Persist transaction in mempool storage
334
335            let now = get_current_timestamp();
336
337            db.store_mempool_tx(tx, now)
338        })?;
339        Ok(events)
340    }
341
342    /// Requests full mempool data from N alive peers
343    ///
344    /// Message flow:
345    /// GetMempool -> Inv -> GetResource -> Tx
346    async fn request_mempool<N: Network>(&self, network: &Arc<RwLock<N>>) {
347        const WAIT_TIMEOUT: Duration = Duration::from_secs(5);
348        let max_peers = self
349            .conf
350            .mempool_download_redundancy
351            .unwrap_or(DEFAULT_DOWNLOAD_REDUNDANCY);
352
353        let net = network.read().await;
354        net.wait_for_alive_nodes(max_peers, WAIT_TIMEOUT).await;
355
356        let msg = payload::GetMempool::default().into();
357        if let Err(err) = net.send_to_alive_peers(msg, max_peers).await {
358            error!("could not request mempool from network: {err}");
359        }
360    }
361}