Skip to main content

dusk_node/
mempool.rs

1// This Source Code Form is subject to the terms of the Mozilla Public
2// License, v. 2.0. If a copy of the MPL was not distributed with this
3// file, You can obtain one at http://mozilla.org/MPL/2.0/.
4//
5// Copyright (c) DUSK NETWORK. All rights reserved.
6
7mod admission;
8pub mod conf;
9mod prequeue;
10
11use std::sync::Arc;
12use std::time::Duration;
13
14use anyhow::anyhow;
15use async_trait::async_trait;
16use conf::{
17    DEFAULT_DOWNLOAD_REDUNDANCY, DEFAULT_EXPIRY_TIME, DEFAULT_IDLE_INTERVAL,
18};
19use dusk_consensus::errors::BlobError;
20use dusk_core::TxPreconditionError;
21use dusk_core::transfer::TransactionFormat;
22use node_data::events::{Event, TransactionEvent};
23use node_data::get_current_timestamp;
24use node_data::ledger::{CanonicalTransaction, LedgerTransaction};
25use node_data::message::{AsyncQueue, Payload, Topics, payload};
26pub use prequeue::FutureNonceRetryHandle;
27use prequeue::{
28    RETRY_POLL_INTERVAL, drain_unblocked_chain, handle_enqueue_outcome,
29    process_due_retries,
30};
31use rkyv::Infallible;
32use rkyv::ser::Serializer;
33use rkyv::ser::serializers::{
34    BufferScratch, BufferSerializer, BufferSerializerError,
35    CompositeSerializer, CompositeSerializerError,
36};
37use thiserror::Error;
38use tokio::sync::RwLock;
39use tokio::sync::mpsc::Sender;
40use tokio::time::Instant;
41use tracing::{error, info, warn};
42
43use self::admission::{TxAdmission, apply_mempool_admission};
44use crate::database::{Ledger, Mempool};
45use crate::mempool::conf::Params;
46use crate::{LongLivedService, Message, Network, database, vm};
47
48const TOPICS: &[u8] = &[Topics::Tx as u8];
49
50pub(super) fn should_replace_conflicting_tx(
51    existing: &LedgerTransaction,
52    incoming: &LedgerTransaction,
53) -> bool {
54    incoming.gas_price() > existing.gas_price()
55}
56
57#[derive(Debug, Error)]
58pub enum TxAcceptanceError {
59    #[error("this transaction exists in the mempool")]
60    AlreadyExistsInMempool,
61    #[error("this transaction exists in the ledger")]
62    AlreadyExistsInLedger,
63    #[error("Transaction blob id {} is missing sidecar", hex::encode(.0))]
64    BlobMissingSidecar([u8; 32]),
65    #[error("No blobs provided")]
66    BlobEmpty,
67    #[error("Transaction has too many blobs: {0}")]
68    BlobTooMany(usize),
69    #[error("Invalid blob: {0}")]
70    BlobInvalid(String),
71    #[error("this transaction's spendId exists in the mempool")]
72    SpendIdExistsInMempool,
73    #[error("this transaction is invalid {0}")]
74    VerificationFailed(String),
75    #[error("gas price lower than minimum {0}")]
76    GasPriceTooLow(u64),
77    #[error("gas limit lower than minimum {0}")]
78    GasLimitTooLow(u64),
79    #[error(
80        "transaction format {actual:?} is not supported for live ingress; minimum supported format is {minimum:?}"
81    )]
82    UnsupportedIngressFormat {
83        actual: TransactionFormat,
84        minimum: TransactionFormat,
85    },
86    #[error("Maximum count of transactions exceeded {0}")]
87    MaxTxnCountExceeded(usize),
88    #[error("Missing intermediate nonce {0}")]
89    MissingIntermediateNonce(u64),
90    #[error("Maximum future nonce retry queue size exceeded {0}")]
91    MaxFutureNonceQueueExceeded(usize),
92    #[error(
93        "Maximum queued future Moonlight transactions per account exceeded {0}"
94    )]
95    MaxMoonlightFutureNoncePerAccountExceeded(usize),
96    #[error("this transaction is too large to be serialized")]
97    TooLarge,
98    #[error("Maximum transaction size exceeded {0}")]
99    MaxSizeExceeded(usize),
100    #[error("A generic error occurred {0}")]
101    Generic(anyhow::Error),
102}
103
104impl From<anyhow::Error> for TxAcceptanceError {
105    fn from(err: anyhow::Error) -> Self {
106        Self::Generic(err)
107    }
108}
109
110impl From<BlobError> for TxAcceptanceError {
111    fn from(err: BlobError) -> Self {
112        match err {
113            BlobError::MissingSidecar(id) => {
114                TxAcceptanceError::BlobMissingSidecar(id)
115            }
116            BlobError::BlobEmpty => TxAcceptanceError::BlobEmpty,
117            BlobError::BlobTooMany(n) => TxAcceptanceError::BlobTooMany(n),
118            BlobError::BlobInvalid(msg) => TxAcceptanceError::BlobInvalid(msg),
119        }
120    }
121}
122
123impl From<TxPreconditionError> for TxAcceptanceError {
124    fn from(err: TxPreconditionError) -> Self {
125        match err {
126            TxPreconditionError::BlobLowLimit(min) => {
127                TxAcceptanceError::GasLimitTooLow(min)
128            }
129            TxPreconditionError::DeployChargeOverflow => {
130                TxAcceptanceError::VerificationFailed(
131                    "deploy charge overflow".into(),
132                )
133            }
134            TxPreconditionError::BlobChargeOverflow => {
135                TxAcceptanceError::VerificationFailed(
136                    "blob charge overflow".into(),
137                )
138            }
139            TxPreconditionError::DeployLowLimit(min) => {
140                TxAcceptanceError::GasLimitTooLow(min)
141            }
142            TxPreconditionError::DeployLowPrice(min) => {
143                TxAcceptanceError::GasPriceTooLow(min)
144            }
145            TxPreconditionError::BlobEmpty => TxAcceptanceError::BlobEmpty,
146            TxPreconditionError::BlobTooMany(n) => {
147                TxAcceptanceError::BlobTooMany(n)
148            }
149            TxPreconditionError::PhoenixFeeOverflow => {
150                TxAcceptanceError::VerificationFailed(
151                    "phoenix fee overflow".into(),
152                )
153            }
154            TxPreconditionError::PhoenixFeeTampered => {
155                TxAcceptanceError::VerificationFailed(
156                    "phoenix fee tampered".into(),
157                )
158            }
159            TxPreconditionError::PhoenixFeeRefundMismatch => {
160                TxAcceptanceError::VerificationFailed(
161                    "phoenix fee refund stealth address mismatch".into(),
162                )
163            }
164        }
165    }
166}
167
168fn check_supported_ingress_tx_format(
169    tx: &CanonicalTransaction,
170) -> Result<(), TxAcceptanceError> {
171    // Live network admission accepts both Aegis and Boreas envelopes. Only the
172    // historical PreAegis encoding remains replay-only.
173    if tx.format() == TransactionFormat::PreAegis {
174        return Err(TxAcceptanceError::UnsupportedIngressFormat {
175            actual: tx.format(),
176            minimum: TransactionFormat::Aegis,
177        });
178    }
179
180    Ok(())
181}
182
183fn normalize_ingress_tx(
184    tx: &LedgerTransaction,
185    block_height: u64,
186) -> Result<LedgerTransaction, TxAcceptanceError> {
187    check_supported_ingress_tx_format(tx.canonical())?;
188    Ok(tx.reformat_for_ingress(block_height))
189}
190
191pub struct MempoolSrv {
192    inbound: AsyncQueue<Message>,
193    conf: Params,
194    /// Sender channel for sending out RUES events
195    event_sender: Sender<Event>,
196    future_nonce_retry_queue: FutureNonceRetryHandle,
197}
198
199impl MempoolSrv {
200    pub fn new(conf: Params, event_sender: Sender<Event>) -> Self {
201        let queue = FutureNonceRetryHandle::new(
202            conf.max_queue_size,
203            conf.max_moonlight_future_nonce_per_account,
204        );
205        Self::with_future_nonce_retry_queue(conf, event_sender, queue)
206    }
207
208    pub fn with_future_nonce_retry_queue(
209        conf: Params,
210        event_sender: Sender<Event>,
211        future_nonce_retry_queue: FutureNonceRetryHandle,
212    ) -> Self {
213        info!("MempoolSrv::new with conf {}", conf);
214        Self {
215            inbound: AsyncQueue::bounded(
216                conf.max_queue_size,
217                "mempool_inbound",
218            ),
219            conf,
220            event_sender,
221            future_nonce_retry_queue,
222        }
223    }
224}
225
226#[async_trait]
227impl<N: Network, DB: database::DB, VM: vm::VMExecution>
228    LongLivedService<N, DB, VM> for MempoolSrv
229{
230    async fn execute(
231        &mut self,
232        network: Arc<RwLock<N>>,
233        db: Arc<RwLock<DB>>,
234        vm: Arc<RwLock<VM>>,
235    ) -> anyhow::Result<usize> {
236        LongLivedService::<N, DB, VM>::add_routes(
237            self,
238            TOPICS,
239            self.inbound.clone(),
240            &network,
241        )
242        .await?;
243
244        // Request mempool update from N alive peers
245        self.request_mempool(&network).await;
246
247        let idle_interval =
248            self.conf.idle_interval.unwrap_or(DEFAULT_IDLE_INTERVAL);
249
250        let mempool_expiry = self
251            .conf
252            .mempool_expiry
253            .unwrap_or(DEFAULT_EXPIRY_TIME)
254            .as_secs();
255
256        let retry_queue = self.future_nonce_retry_queue.clone();
257        let retry_event_sender = self.event_sender.clone();
258        let retry_max_mempool_txn_count = self.conf.max_mempool_txn_count;
259        let retry_network = network.clone();
260        let retry_db = db.clone();
261        let retry_vm = vm.clone();
262        tokio::spawn(async move {
263            MempoolSrv::run_retry_worker(
264                retry_queue,
265                retry_event_sender,
266                retry_max_mempool_txn_count,
267                retry_network,
268                retry_db,
269                retry_vm,
270            )
271            .await;
272        });
273
274        // Mempool service loop
275        let mut on_idle_event = tokio::time::interval(idle_interval);
276        loop {
277            tokio::select! {
278                biased;
279                _ = on_idle_event.tick() => {
280                    info!(event = "mempool_idle", interval = ?idle_interval);
281
282                    let expiration_time = get_current_timestamp()
283                        .checked_sub(mempool_expiry)
284                        .expect("valid duration");
285
286                    // Remove expired transactions from the mempool
287                    db.read().await.update(|db| {
288                        let expired_txs = db.mempool_expired_txs(expiration_time).unwrap_or_else(|e| {
289                            error!("cannot get expired txs: {e}");
290                            vec![]
291                        });
292                        for tx_id in expired_txs {
293                            info!(event = "expired_tx", hash = hex::encode(tx_id));
294                            let deleted_txs = db.delete_mempool_tx(tx_id, true).unwrap_or_else(|e| {
295                                error!("cannot delete expired tx: {e}");
296                                vec![]
297                            });
298                            for deleted_tx_id in deleted_txs{
299                                let event = TransactionEvent::Removed(deleted_tx_id);
300                                info!(event = "mempool_deleted", hash = hex::encode(deleted_tx_id));
301                                if let Err(e) = self.event_sender.try_send(event.into()) {
302                                    warn!("cannot notify mempool removed transaction {e}")
303                                };
304                            }
305                        }
306                        Ok(())
307                    })?;
308
309                },
310                msg = self.inbound.recv() => {
311                    if let Ok(msg) = msg {
312                        match &msg.payload {
313                            Payload::Transaction(tx) => {
314                                if let Err(e) = self
315                                    .handle_tx_message(
316                                        &network,
317                                        &db,
318                                        &vm,
319                                        &msg,
320                                    )
321                                    .await
322                                {
323                                    error!("Tx {} not accepted: {e}", hex::encode(tx.id()));
324                                };
325                            }
326                            _ => error!("invalid inbound message payload"),
327                        }
328                    }
329                }
330            }
331        }
332    }
333
334    /// Returns service name.
335    fn name(&self) -> &'static str {
336        "mempool"
337    }
338}
339
340impl MempoolSrv {
341    async fn run_retry_worker<
342        N: Network,
343        DB: database::DB,
344        VM: vm::VMExecution,
345    >(
346        future_nonce_retry_queue: FutureNonceRetryHandle,
347        event_sender: Sender<Event>,
348        max_mempool_txn_count: usize,
349        network: Arc<RwLock<N>>,
350        db: Arc<RwLock<DB>>,
351        vm: Arc<RwLock<VM>>,
352    ) {
353        let mut on_retry_event = tokio::time::interval(RETRY_POLL_INTERVAL);
354
355        loop {
356            on_retry_event.tick().await;
357            process_due_retries(
358                &future_nonce_retry_queue,
359                &event_sender,
360                max_mempool_txn_count,
361                &network,
362                &db,
363                &vm,
364                Instant::now(),
365            )
366            .await;
367        }
368    }
369
370    async fn broadcast_tx<N: Network>(network: &Arc<RwLock<N>>, msg: &Message) {
371        let network = network.read().await;
372        if let Err(e) = network.broadcast(msg).await {
373            warn!("Unable to broadcast accepted tx: {e}");
374        };
375    }
376
377    async fn broadcast_accepted_tx<N: Network>(
378        network: &Arc<RwLock<N>>,
379        msg: &Message,
380        tx: &LedgerTransaction,
381        source: Option<&str>,
382        queue_age_ms: Option<u64>,
383    ) {
384        if let Some(source) = source {
385            info!(
386                event = "future_nonce_retry_accepted",
387                hash = hex::encode(tx.id()),
388                source,
389                queue_age_ms
390            );
391        }
392        Self::broadcast_tx(network, msg).await;
393    }
394
395    async fn handle_tx_message<
396        N: Network,
397        DB: database::DB,
398        VM: vm::VMExecution,
399    >(
400        &mut self,
401        network: &Arc<RwLock<N>>,
402        db: &Arc<RwLock<DB>>,
403        vm: &Arc<RwLock<VM>>,
404        msg: &Message,
405    ) -> Result<(), TxAcceptanceError> {
406        let Payload::Transaction(tx) = &msg.payload else {
407            return Err(TxAcceptanceError::Generic(anyhow!(
408                "invalid inbound message payload"
409            )));
410        };
411
412        let next_block_height = db
413            .read()
414            .await
415            .view(|db| db.latest_block())
416            .map_err(|e| {
417                TxAcceptanceError::Generic(anyhow!(
418                    "Cannot get tip block height from the database: {e}"
419                ))
420            })?
421            .header
422            .height
423            .saturating_add(1);
424        let tx = normalize_ingress_tx(tx, next_block_height)?;
425        let msg = {
426            let mut normalized = msg.clone();
427            normalized.payload = tx.clone().into();
428            normalized
429        };
430
431        match Self::accept_tx(
432            &self.event_sender,
433            self.conf.max_mempool_txn_count,
434            db,
435            vm,
436            &tx,
437        )
438        .await
439        {
440            Ok(()) => {
441                Self::broadcast_accepted_tx(network, &msg, &tx, None, None)
442                    .await;
443                drain_unblocked_chain(
444                    &self.future_nonce_retry_queue,
445                    &self.event_sender,
446                    self.conf.max_mempool_txn_count,
447                    network,
448                    db,
449                    vm,
450                    &tx,
451                )
452                .await;
453                Ok(())
454            }
455            Err(TxAcceptanceError::MissingIntermediateNonce(_)) => {
456                handle_enqueue_outcome(
457                    &self.event_sender,
458                    &tx,
459                    self.future_nonce_retry_queue
460                        .enqueue_message_with_outcome(&msg)
461                        .await,
462                )
463            }
464            Err(err) => Err(err),
465        }
466    }
467
468    async fn accept_tx<DB: database::DB, VM: vm::VMExecution>(
469        event_sender: &Sender<Event>,
470        max_mempool_txn_count: usize,
471        db: &Arc<RwLock<DB>>,
472        vm: &Arc<RwLock<VM>>,
473        tx: &LedgerTransaction,
474    ) -> Result<(), TxAcceptanceError> {
475        let events =
476            MempoolSrv::check_tx(db, vm, tx, false, max_mempool_txn_count)
477                .await?;
478
479        tracing::info!(
480            event = "transaction accepted",
481            hash = hex::encode(tx.id())
482        );
483
484        for tx_event in events {
485            let node_event = tx_event.into();
486            if let Err(e) = event_sender.try_send(node_event) {
487                warn!("cannot notify mempool accepted transaction {e}")
488            };
489        }
490
491        Ok(())
492    }
493
494    pub async fn check_tx<'t, DB: database::DB, VM: vm::VMExecution>(
495        db: &Arc<RwLock<DB>>,
496        vm: &Arc<RwLock<VM>>,
497        tx: &'t LedgerTransaction,
498        dry_run: bool,
499        max_mempool_txn_count: usize,
500    ) -> Result<Vec<TransactionEvent<'t>>, TxAcceptanceError> {
501        let admission = TxAdmission::new(db, vm, max_mempool_txn_count)
502            .check(tx.canonical())
503            .await?;
504
505        let mut events = vec![];
506        db.read().await.update_dry_run(dry_run, |db| {
507            events = apply_mempool_admission(
508                db,
509                tx,
510                &admission.facts,
511                admission.tx_to_delete,
512                get_current_timestamp(),
513            )?;
514            Ok(())
515        })?;
516
517        Ok(events)
518    }
519
520    pub async fn check_canonical_tx_at_tip<
521        DB: database::DB,
522        VM: vm::VMExecution,
523    >(
524        db: &Arc<RwLock<DB>>,
525        vm: &Arc<RwLock<VM>>,
526        tx: &CanonicalTransaction,
527        tip_height: u64,
528        max_mempool_txn_count: usize,
529    ) -> Result<LedgerTransaction, TxAcceptanceError> {
530        let _ = TxAdmission::new(db, vm, max_mempool_txn_count)
531            .check_with_tip(tx, tip_height)
532            .await?;
533        Ok(tx.clone().into())
534    }
535
536    /// Requests full mempool data from N alive peers
537    ///
538    /// Message flow:
539    /// GetMempool -> Inv -> GetResource -> Tx
540    async fn request_mempool<N: Network>(&self, network: &Arc<RwLock<N>>) {
541        const WAIT_TIMEOUT: Duration = Duration::from_secs(5);
542        let max_peers = self
543            .conf
544            .mempool_download_redundancy
545            .unwrap_or(DEFAULT_DOWNLOAD_REDUNDANCY);
546
547        let net = network.read().await;
548        net.wait_for_alive_nodes(max_peers, WAIT_TIMEOUT).await;
549
550        let msg = payload::GetMempool::default().into();
551        if let Err(err) = net.send_to_alive_peers(msg, max_peers).await {
552            error!("could not request mempool from network: {err}");
553        }
554    }
555}
556
557fn check_tx_serialization(
558    tx: &dusk_core::transfer::Transaction,
559) -> Result<(), TxAcceptanceError> {
560    // The transaction is an argument to the transfer contract, so
561    // its serialized size has to be within the same 64Kib limit.
562    const SCRATCH_BUF_BYTES: usize = 1024;
563    const ARGBUF_LEN: usize = 64 * 1024;
564    let stripped_tx = tx.strip_off_bytecode().or(tx.blob_to_memo());
565    let mut sbuf = [0u8; SCRATCH_BUF_BYTES];
566    let mut buffer = [0u8; ARGBUF_LEN];
567    let scratch = BufferScratch::new(&mut sbuf);
568    let ser = BufferSerializer::new(&mut buffer);
569    let mut ser = CompositeSerializer::new(ser, scratch, Infallible);
570    if let Err(err) = ser.serialize_value(stripped_tx.as_ref().unwrap_or(tx)) {
571        match err {
572            CompositeSerializerError::SerializerError(err) => match err {
573                BufferSerializerError::Overflow { .. } => {
574                    return Err(TxAcceptanceError::TooLarge);
575                }
576            },
577            err => return Err(TxAcceptanceError::Generic(anyhow!("{err}"))),
578        }
579    }
580    Ok(())
581}
582
583#[cfg(test)]
584mod tests {
585    use dusk_core::signatures::bls::{PublicKey, SecretKey};
586    use rand::rngs::StdRng;
587    use rand::{CryptoRng, Rng, RngCore, SeedableRng};
588    use wallet_core::transaction::moonlight_deployment;
589
590    use super::*;
591
592    fn new_moonlight_deploy_tx<R: RngCore + CryptoRng>(
593        rng: &mut R,
594        bytecode: Vec<u8>,
595        init_args: Vec<u8>,
596    ) -> dusk_core::transfer::Transaction {
597        const CHAIN_ID: u8 = 0xfa;
598        let sk = SecretKey::random(rng);
599        let pk = PublicKey::from(&SecretKey::random(rng));
600
601        let gas_limit: u64 = rng.r#gen();
602        let gas_price: u64 = rng.r#gen();
603        let nonce: u64 = rng.r#gen();
604        let deploy_nonce: u64 = rng.r#gen();
605
606        moonlight_deployment(
607            &sk,
608            bytecode,
609            &pk,
610            init_args,
611            gas_limit,
612            gas_price,
613            nonce,
614            deploy_nonce,
615            CHAIN_ID,
616        )
617        .expect("should create a transaction")
618    }
619
620    const MAX_MOONLIGHT_ARG_SIZE: usize = 64 * 1024 - 2320;
621
622    #[test]
623    fn test_tx_serialization_check_normal() {
624        let mut rng = StdRng::seed_from_u64(42);
625        let tx = new_moonlight_deploy_tx(
626            &mut rng,
627            vec![0; 64 * 1024],
628            vec![0; MAX_MOONLIGHT_ARG_SIZE],
629        );
630        let result = check_tx_serialization(&tx);
631        assert!(matches!(result, Ok(())));
632    }
633
634    #[test]
635    fn test_tx_serialization_check_tx_too_large() {
636        let mut rng = StdRng::seed_from_u64(42);
637        let tx = new_moonlight_deploy_tx(
638            &mut rng,
639            vec![0; 64 * 1024],
640            vec![0; MAX_MOONLIGHT_ARG_SIZE + 1],
641        );
642        let result = check_tx_serialization(&tx);
643        assert!(matches!(result, Err(TxAcceptanceError::TooLarge)));
644    }
645
646    #[test]
647    fn test_supported_ingress_format_check_rejects_pre_aegis() {
648        let mut rng = StdRng::seed_from_u64(42);
649        let tx = CanonicalTransaction::canonicalize(
650            new_moonlight_deploy_tx(&mut rng, vec![0; 32], vec![0; 32]),
651            TransactionFormat::PreAegis,
652        );
653
654        let result = check_supported_ingress_tx_format(&tx);
655
656        assert!(matches!(
657            result,
658            Err(TxAcceptanceError::UnsupportedIngressFormat {
659                actual: TransactionFormat::PreAegis,
660                minimum: TransactionFormat::Aegis,
661            })
662        ));
663    }
664
665    #[test]
666    fn test_supported_ingress_format_check_accepts_aegis() {
667        let mut rng = StdRng::seed_from_u64(42);
668        let tx = CanonicalTransaction::canonicalize(
669            new_moonlight_deploy_tx(&mut rng, vec![0; 32], vec![0; 32]),
670            node_data::hard_fork::ingress_tx_format_at(1),
671        );
672
673        let result = check_supported_ingress_tx_format(&tx);
674
675        assert!(matches!(result, Ok(())));
676    }
677
678    #[test]
679    fn test_normalize_ingress_tx_reformats_aegis_to_boreas() {
680        let mut rng = StdRng::seed_from_u64(42);
681        let tx = LedgerTransaction::from_protocol_with_format(
682            new_moonlight_deploy_tx(&mut rng, vec![0; 32], vec![0; 32]),
683            TransactionFormat::Aegis,
684        );
685
686        let normalized = normalize_ingress_tx(&tx, u64::MAX)
687            .expect("aegis ingress should normalize to boreas");
688
689        assert_eq!(normalized.format(), TransactionFormat::Boreas);
690        assert_eq!(normalized.id(), tx.id());
691    }
692
693    #[test]
694    fn test_normalize_ingress_tx_reformats_boreas_to_aegis() {
695        let mut rng = StdRng::seed_from_u64(42);
696        let tx = LedgerTransaction::from_protocol_with_format(
697            new_moonlight_deploy_tx(&mut rng, vec![0; 32], vec![0; 32]),
698            TransactionFormat::Boreas,
699        );
700
701        let normalized = normalize_ingress_tx(&tx, 1)
702            .expect("boreas ingress should normalize to aegis");
703
704        assert_eq!(normalized.format(), TransactionFormat::Aegis);
705        assert_eq!(normalized.id(), tx.id());
706    }
707}