1mod admission;
8pub mod conf;
9mod prequeue;
10
11use std::sync::Arc;
12use std::time::Duration;
13
14use anyhow::anyhow;
15use async_trait::async_trait;
16use conf::{
17 DEFAULT_DOWNLOAD_REDUNDANCY, DEFAULT_EXPIRY_TIME, DEFAULT_IDLE_INTERVAL,
18};
19use dusk_consensus::errors::BlobError;
20use dusk_core::TxPreconditionError;
21use dusk_core::transfer::TransactionFormat;
22use node_data::events::{Event, TransactionEvent};
23use node_data::get_current_timestamp;
24use node_data::ledger::{CanonicalTransaction, LedgerTransaction};
25use node_data::message::{AsyncQueue, Payload, Topics, payload};
26pub use prequeue::FutureNonceRetryHandle;
27use prequeue::{
28 RETRY_POLL_INTERVAL, drain_unblocked_chain, handle_enqueue_outcome,
29 process_due_retries,
30};
31use rkyv::Infallible;
32use rkyv::ser::Serializer;
33use rkyv::ser::serializers::{
34 BufferScratch, BufferSerializer, BufferSerializerError,
35 CompositeSerializer, CompositeSerializerError,
36};
37use thiserror::Error;
38use tokio::sync::RwLock;
39use tokio::sync::mpsc::Sender;
40use tokio::time::Instant;
41use tracing::{error, info, warn};
42
43use self::admission::{TxAdmission, apply_mempool_admission};
44use crate::database::{Ledger, Mempool};
45use crate::mempool::conf::Params;
46use crate::{LongLivedService, Message, Network, database, vm};
47
48const TOPICS: &[u8] = &[Topics::Tx as u8];
49
50pub(super) fn should_replace_conflicting_tx(
51 existing: &LedgerTransaction,
52 incoming: &LedgerTransaction,
53) -> bool {
54 incoming.gas_price() > existing.gas_price()
55}
56
57#[derive(Debug, Error)]
58pub enum TxAcceptanceError {
59 #[error("this transaction exists in the mempool")]
60 AlreadyExistsInMempool,
61 #[error("this transaction exists in the ledger")]
62 AlreadyExistsInLedger,
63 #[error("Transaction blob id {} is missing sidecar", hex::encode(.0))]
64 BlobMissingSidecar([u8; 32]),
65 #[error("No blobs provided")]
66 BlobEmpty,
67 #[error("Transaction has too many blobs: {0}")]
68 BlobTooMany(usize),
69 #[error("Invalid blob: {0}")]
70 BlobInvalid(String),
71 #[error("this transaction's spendId exists in the mempool")]
72 SpendIdExistsInMempool,
73 #[error("this transaction is invalid {0}")]
74 VerificationFailed(String),
75 #[error("gas price lower than minimum {0}")]
76 GasPriceTooLow(u64),
77 #[error("gas limit lower than minimum {0}")]
78 GasLimitTooLow(u64),
79 #[error(
80 "transaction format {actual:?} is not supported for live ingress; minimum supported format is {minimum:?}"
81 )]
82 UnsupportedIngressFormat {
83 actual: TransactionFormat,
84 minimum: TransactionFormat,
85 },
86 #[error("Maximum count of transactions exceeded {0}")]
87 MaxTxnCountExceeded(usize),
88 #[error("Missing intermediate nonce {0}")]
89 MissingIntermediateNonce(u64),
90 #[error("Maximum future nonce retry queue size exceeded {0}")]
91 MaxFutureNonceQueueExceeded(usize),
92 #[error(
93 "Maximum queued future Moonlight transactions per account exceeded {0}"
94 )]
95 MaxMoonlightFutureNoncePerAccountExceeded(usize),
96 #[error("this transaction is too large to be serialized")]
97 TooLarge,
98 #[error("Maximum transaction size exceeded {0}")]
99 MaxSizeExceeded(usize),
100 #[error("A generic error occurred {0}")]
101 Generic(anyhow::Error),
102}
103
104impl From<anyhow::Error> for TxAcceptanceError {
105 fn from(err: anyhow::Error) -> Self {
106 Self::Generic(err)
107 }
108}
109
110impl From<BlobError> for TxAcceptanceError {
111 fn from(err: BlobError) -> Self {
112 match err {
113 BlobError::MissingSidecar(id) => {
114 TxAcceptanceError::BlobMissingSidecar(id)
115 }
116 BlobError::BlobEmpty => TxAcceptanceError::BlobEmpty,
117 BlobError::BlobTooMany(n) => TxAcceptanceError::BlobTooMany(n),
118 BlobError::BlobInvalid(msg) => TxAcceptanceError::BlobInvalid(msg),
119 }
120 }
121}
122
123impl From<TxPreconditionError> for TxAcceptanceError {
124 fn from(err: TxPreconditionError) -> Self {
125 match err {
126 TxPreconditionError::BlobLowLimit(min) => {
127 TxAcceptanceError::GasLimitTooLow(min)
128 }
129 TxPreconditionError::DeployChargeOverflow => {
130 TxAcceptanceError::VerificationFailed(
131 "deploy charge overflow".into(),
132 )
133 }
134 TxPreconditionError::BlobChargeOverflow => {
135 TxAcceptanceError::VerificationFailed(
136 "blob charge overflow".into(),
137 )
138 }
139 TxPreconditionError::DeployLowLimit(min) => {
140 TxAcceptanceError::GasLimitTooLow(min)
141 }
142 TxPreconditionError::DeployLowPrice(min) => {
143 TxAcceptanceError::GasPriceTooLow(min)
144 }
145 TxPreconditionError::BlobEmpty => TxAcceptanceError::BlobEmpty,
146 TxPreconditionError::BlobTooMany(n) => {
147 TxAcceptanceError::BlobTooMany(n)
148 }
149 TxPreconditionError::PhoenixFeeOverflow => {
150 TxAcceptanceError::VerificationFailed(
151 "phoenix fee overflow".into(),
152 )
153 }
154 TxPreconditionError::PhoenixFeeTampered => {
155 TxAcceptanceError::VerificationFailed(
156 "phoenix fee tampered".into(),
157 )
158 }
159 TxPreconditionError::PhoenixFeeRefundMismatch => {
160 TxAcceptanceError::VerificationFailed(
161 "phoenix fee refund stealth address mismatch".into(),
162 )
163 }
164 }
165 }
166}
167
168fn check_supported_ingress_tx_format(
169 tx: &CanonicalTransaction,
170) -> Result<(), TxAcceptanceError> {
171 if tx.format() == TransactionFormat::PreAegis {
174 return Err(TxAcceptanceError::UnsupportedIngressFormat {
175 actual: tx.format(),
176 minimum: TransactionFormat::Aegis,
177 });
178 }
179
180 Ok(())
181}
182
183fn normalize_ingress_tx(
184 tx: &LedgerTransaction,
185 block_height: u64,
186) -> Result<LedgerTransaction, TxAcceptanceError> {
187 check_supported_ingress_tx_format(tx.canonical())?;
188 Ok(tx.reformat_for_ingress(block_height))
189}
190
191pub struct MempoolSrv {
192 inbound: AsyncQueue<Message>,
193 conf: Params,
194 event_sender: Sender<Event>,
196 future_nonce_retry_queue: FutureNonceRetryHandle,
197}
198
199impl MempoolSrv {
200 pub fn new(conf: Params, event_sender: Sender<Event>) -> Self {
201 let queue = FutureNonceRetryHandle::new(
202 conf.max_queue_size,
203 conf.max_moonlight_future_nonce_per_account,
204 );
205 Self::with_future_nonce_retry_queue(conf, event_sender, queue)
206 }
207
208 pub fn with_future_nonce_retry_queue(
209 conf: Params,
210 event_sender: Sender<Event>,
211 future_nonce_retry_queue: FutureNonceRetryHandle,
212 ) -> Self {
213 info!("MempoolSrv::new with conf {}", conf);
214 Self {
215 inbound: AsyncQueue::bounded(
216 conf.max_queue_size,
217 "mempool_inbound",
218 ),
219 conf,
220 event_sender,
221 future_nonce_retry_queue,
222 }
223 }
224}
225
226#[async_trait]
227impl<N: Network, DB: database::DB, VM: vm::VMExecution>
228 LongLivedService<N, DB, VM> for MempoolSrv
229{
230 async fn execute(
231 &mut self,
232 network: Arc<RwLock<N>>,
233 db: Arc<RwLock<DB>>,
234 vm: Arc<RwLock<VM>>,
235 ) -> anyhow::Result<usize> {
236 LongLivedService::<N, DB, VM>::add_routes(
237 self,
238 TOPICS,
239 self.inbound.clone(),
240 &network,
241 )
242 .await?;
243
244 self.request_mempool(&network).await;
246
247 let idle_interval =
248 self.conf.idle_interval.unwrap_or(DEFAULT_IDLE_INTERVAL);
249
250 let mempool_expiry = self
251 .conf
252 .mempool_expiry
253 .unwrap_or(DEFAULT_EXPIRY_TIME)
254 .as_secs();
255
256 let retry_queue = self.future_nonce_retry_queue.clone();
257 let retry_event_sender = self.event_sender.clone();
258 let retry_max_mempool_txn_count = self.conf.max_mempool_txn_count;
259 let retry_network = network.clone();
260 let retry_db = db.clone();
261 let retry_vm = vm.clone();
262 tokio::spawn(async move {
263 MempoolSrv::run_retry_worker(
264 retry_queue,
265 retry_event_sender,
266 retry_max_mempool_txn_count,
267 retry_network,
268 retry_db,
269 retry_vm,
270 )
271 .await;
272 });
273
274 let mut on_idle_event = tokio::time::interval(idle_interval);
276 loop {
277 tokio::select! {
278 biased;
279 _ = on_idle_event.tick() => {
280 info!(event = "mempool_idle", interval = ?idle_interval);
281
282 let expiration_time = get_current_timestamp()
283 .checked_sub(mempool_expiry)
284 .expect("valid duration");
285
286 db.read().await.update(|db| {
288 let expired_txs = db.mempool_expired_txs(expiration_time).unwrap_or_else(|e| {
289 error!("cannot get expired txs: {e}");
290 vec![]
291 });
292 for tx_id in expired_txs {
293 info!(event = "expired_tx", hash = hex::encode(tx_id));
294 let deleted_txs = db.delete_mempool_tx(tx_id, true).unwrap_or_else(|e| {
295 error!("cannot delete expired tx: {e}");
296 vec![]
297 });
298 for deleted_tx_id in deleted_txs{
299 let event = TransactionEvent::Removed(deleted_tx_id);
300 info!(event = "mempool_deleted", hash = hex::encode(deleted_tx_id));
301 if let Err(e) = self.event_sender.try_send(event.into()) {
302 warn!("cannot notify mempool removed transaction {e}")
303 };
304 }
305 }
306 Ok(())
307 })?;
308
309 },
310 msg = self.inbound.recv() => {
311 if let Ok(msg) = msg {
312 match &msg.payload {
313 Payload::Transaction(tx) => {
314 if let Err(e) = self
315 .handle_tx_message(
316 &network,
317 &db,
318 &vm,
319 &msg,
320 )
321 .await
322 {
323 error!("Tx {} not accepted: {e}", hex::encode(tx.id()));
324 };
325 }
326 _ => error!("invalid inbound message payload"),
327 }
328 }
329 }
330 }
331 }
332 }
333
334 fn name(&self) -> &'static str {
336 "mempool"
337 }
338}
339
340impl MempoolSrv {
341 async fn run_retry_worker<
342 N: Network,
343 DB: database::DB,
344 VM: vm::VMExecution,
345 >(
346 future_nonce_retry_queue: FutureNonceRetryHandle,
347 event_sender: Sender<Event>,
348 max_mempool_txn_count: usize,
349 network: Arc<RwLock<N>>,
350 db: Arc<RwLock<DB>>,
351 vm: Arc<RwLock<VM>>,
352 ) {
353 let mut on_retry_event = tokio::time::interval(RETRY_POLL_INTERVAL);
354
355 loop {
356 on_retry_event.tick().await;
357 process_due_retries(
358 &future_nonce_retry_queue,
359 &event_sender,
360 max_mempool_txn_count,
361 &network,
362 &db,
363 &vm,
364 Instant::now(),
365 )
366 .await;
367 }
368 }
369
370 async fn broadcast_tx<N: Network>(network: &Arc<RwLock<N>>, msg: &Message) {
371 let network = network.read().await;
372 if let Err(e) = network.broadcast(msg).await {
373 warn!("Unable to broadcast accepted tx: {e}");
374 };
375 }
376
377 async fn broadcast_accepted_tx<N: Network>(
378 network: &Arc<RwLock<N>>,
379 msg: &Message,
380 tx: &LedgerTransaction,
381 source: Option<&str>,
382 queue_age_ms: Option<u64>,
383 ) {
384 if let Some(source) = source {
385 info!(
386 event = "future_nonce_retry_accepted",
387 hash = hex::encode(tx.id()),
388 source,
389 queue_age_ms
390 );
391 }
392 Self::broadcast_tx(network, msg).await;
393 }
394
395 async fn handle_tx_message<
396 N: Network,
397 DB: database::DB,
398 VM: vm::VMExecution,
399 >(
400 &mut self,
401 network: &Arc<RwLock<N>>,
402 db: &Arc<RwLock<DB>>,
403 vm: &Arc<RwLock<VM>>,
404 msg: &Message,
405 ) -> Result<(), TxAcceptanceError> {
406 let Payload::Transaction(tx) = &msg.payload else {
407 return Err(TxAcceptanceError::Generic(anyhow!(
408 "invalid inbound message payload"
409 )));
410 };
411
412 let next_block_height = db
413 .read()
414 .await
415 .view(|db| db.latest_block())
416 .map_err(|e| {
417 TxAcceptanceError::Generic(anyhow!(
418 "Cannot get tip block height from the database: {e}"
419 ))
420 })?
421 .header
422 .height
423 .saturating_add(1);
424 let tx = normalize_ingress_tx(tx, next_block_height)?;
425 let msg = {
426 let mut normalized = msg.clone();
427 normalized.payload = tx.clone().into();
428 normalized
429 };
430
431 match Self::accept_tx(
432 &self.event_sender,
433 self.conf.max_mempool_txn_count,
434 db,
435 vm,
436 &tx,
437 )
438 .await
439 {
440 Ok(()) => {
441 Self::broadcast_accepted_tx(network, &msg, &tx, None, None)
442 .await;
443 drain_unblocked_chain(
444 &self.future_nonce_retry_queue,
445 &self.event_sender,
446 self.conf.max_mempool_txn_count,
447 network,
448 db,
449 vm,
450 &tx,
451 )
452 .await;
453 Ok(())
454 }
455 Err(TxAcceptanceError::MissingIntermediateNonce(_)) => {
456 handle_enqueue_outcome(
457 &self.event_sender,
458 &tx,
459 self.future_nonce_retry_queue
460 .enqueue_message_with_outcome(&msg)
461 .await,
462 )
463 }
464 Err(err) => Err(err),
465 }
466 }
467
468 async fn accept_tx<DB: database::DB, VM: vm::VMExecution>(
469 event_sender: &Sender<Event>,
470 max_mempool_txn_count: usize,
471 db: &Arc<RwLock<DB>>,
472 vm: &Arc<RwLock<VM>>,
473 tx: &LedgerTransaction,
474 ) -> Result<(), TxAcceptanceError> {
475 let events =
476 MempoolSrv::check_tx(db, vm, tx, false, max_mempool_txn_count)
477 .await?;
478
479 tracing::info!(
480 event = "transaction accepted",
481 hash = hex::encode(tx.id())
482 );
483
484 for tx_event in events {
485 let node_event = tx_event.into();
486 if let Err(e) = event_sender.try_send(node_event) {
487 warn!("cannot notify mempool accepted transaction {e}")
488 };
489 }
490
491 Ok(())
492 }
493
494 pub async fn check_tx<'t, DB: database::DB, VM: vm::VMExecution>(
495 db: &Arc<RwLock<DB>>,
496 vm: &Arc<RwLock<VM>>,
497 tx: &'t LedgerTransaction,
498 dry_run: bool,
499 max_mempool_txn_count: usize,
500 ) -> Result<Vec<TransactionEvent<'t>>, TxAcceptanceError> {
501 let admission = TxAdmission::new(db, vm, max_mempool_txn_count)
502 .check(tx.canonical())
503 .await?;
504
505 let mut events = vec![];
506 db.read().await.update_dry_run(dry_run, |db| {
507 events = apply_mempool_admission(
508 db,
509 tx,
510 &admission.facts,
511 admission.tx_to_delete,
512 get_current_timestamp(),
513 )?;
514 Ok(())
515 })?;
516
517 Ok(events)
518 }
519
520 pub async fn check_canonical_tx_at_tip<
521 DB: database::DB,
522 VM: vm::VMExecution,
523 >(
524 db: &Arc<RwLock<DB>>,
525 vm: &Arc<RwLock<VM>>,
526 tx: &CanonicalTransaction,
527 tip_height: u64,
528 max_mempool_txn_count: usize,
529 ) -> Result<LedgerTransaction, TxAcceptanceError> {
530 let _ = TxAdmission::new(db, vm, max_mempool_txn_count)
531 .check_with_tip(tx, tip_height)
532 .await?;
533 Ok(tx.clone().into())
534 }
535
536 async fn request_mempool<N: Network>(&self, network: &Arc<RwLock<N>>) {
541 const WAIT_TIMEOUT: Duration = Duration::from_secs(5);
542 let max_peers = self
543 .conf
544 .mempool_download_redundancy
545 .unwrap_or(DEFAULT_DOWNLOAD_REDUNDANCY);
546
547 let net = network.read().await;
548 net.wait_for_alive_nodes(max_peers, WAIT_TIMEOUT).await;
549
550 let msg = payload::GetMempool::default().into();
551 if let Err(err) = net.send_to_alive_peers(msg, max_peers).await {
552 error!("could not request mempool from network: {err}");
553 }
554 }
555}
556
557fn check_tx_serialization(
558 tx: &dusk_core::transfer::Transaction,
559) -> Result<(), TxAcceptanceError> {
560 const SCRATCH_BUF_BYTES: usize = 1024;
563 const ARGBUF_LEN: usize = 64 * 1024;
564 let stripped_tx = tx.strip_off_bytecode().or(tx.blob_to_memo());
565 let mut sbuf = [0u8; SCRATCH_BUF_BYTES];
566 let mut buffer = [0u8; ARGBUF_LEN];
567 let scratch = BufferScratch::new(&mut sbuf);
568 let ser = BufferSerializer::new(&mut buffer);
569 let mut ser = CompositeSerializer::new(ser, scratch, Infallible);
570 if let Err(err) = ser.serialize_value(stripped_tx.as_ref().unwrap_or(tx)) {
571 match err {
572 CompositeSerializerError::SerializerError(err) => match err {
573 BufferSerializerError::Overflow { .. } => {
574 return Err(TxAcceptanceError::TooLarge);
575 }
576 },
577 err => return Err(TxAcceptanceError::Generic(anyhow!("{err}"))),
578 }
579 }
580 Ok(())
581}
582
583#[cfg(test)]
584mod tests {
585 use dusk_core::signatures::bls::{PublicKey, SecretKey};
586 use rand::rngs::StdRng;
587 use rand::{CryptoRng, Rng, RngCore, SeedableRng};
588 use wallet_core::transaction::moonlight_deployment;
589
590 use super::*;
591
592 fn new_moonlight_deploy_tx<R: RngCore + CryptoRng>(
593 rng: &mut R,
594 bytecode: Vec<u8>,
595 init_args: Vec<u8>,
596 ) -> dusk_core::transfer::Transaction {
597 const CHAIN_ID: u8 = 0xfa;
598 let sk = SecretKey::random(rng);
599 let pk = PublicKey::from(&SecretKey::random(rng));
600
601 let gas_limit: u64 = rng.r#gen();
602 let gas_price: u64 = rng.r#gen();
603 let nonce: u64 = rng.r#gen();
604 let deploy_nonce: u64 = rng.r#gen();
605
606 moonlight_deployment(
607 &sk,
608 bytecode,
609 &pk,
610 init_args,
611 gas_limit,
612 gas_price,
613 nonce,
614 deploy_nonce,
615 CHAIN_ID,
616 )
617 .expect("should create a transaction")
618 }
619
620 const MAX_MOONLIGHT_ARG_SIZE: usize = 64 * 1024 - 2320;
621
622 #[test]
623 fn test_tx_serialization_check_normal() {
624 let mut rng = StdRng::seed_from_u64(42);
625 let tx = new_moonlight_deploy_tx(
626 &mut rng,
627 vec![0; 64 * 1024],
628 vec![0; MAX_MOONLIGHT_ARG_SIZE],
629 );
630 let result = check_tx_serialization(&tx);
631 assert!(matches!(result, Ok(())));
632 }
633
634 #[test]
635 fn test_tx_serialization_check_tx_too_large() {
636 let mut rng = StdRng::seed_from_u64(42);
637 let tx = new_moonlight_deploy_tx(
638 &mut rng,
639 vec![0; 64 * 1024],
640 vec![0; MAX_MOONLIGHT_ARG_SIZE + 1],
641 );
642 let result = check_tx_serialization(&tx);
643 assert!(matches!(result, Err(TxAcceptanceError::TooLarge)));
644 }
645
646 #[test]
647 fn test_supported_ingress_format_check_rejects_pre_aegis() {
648 let mut rng = StdRng::seed_from_u64(42);
649 let tx = CanonicalTransaction::canonicalize(
650 new_moonlight_deploy_tx(&mut rng, vec![0; 32], vec![0; 32]),
651 TransactionFormat::PreAegis,
652 );
653
654 let result = check_supported_ingress_tx_format(&tx);
655
656 assert!(matches!(
657 result,
658 Err(TxAcceptanceError::UnsupportedIngressFormat {
659 actual: TransactionFormat::PreAegis,
660 minimum: TransactionFormat::Aegis,
661 })
662 ));
663 }
664
665 #[test]
666 fn test_supported_ingress_format_check_accepts_aegis() {
667 let mut rng = StdRng::seed_from_u64(42);
668 let tx = CanonicalTransaction::canonicalize(
669 new_moonlight_deploy_tx(&mut rng, vec![0; 32], vec![0; 32]),
670 node_data::hard_fork::ingress_tx_format_at(1),
671 );
672
673 let result = check_supported_ingress_tx_format(&tx);
674
675 assert!(matches!(result, Ok(())));
676 }
677
678 #[test]
679 fn test_normalize_ingress_tx_reformats_aegis_to_boreas() {
680 let mut rng = StdRng::seed_from_u64(42);
681 let tx = LedgerTransaction::from_protocol_with_format(
682 new_moonlight_deploy_tx(&mut rng, vec![0; 32], vec![0; 32]),
683 TransactionFormat::Aegis,
684 );
685
686 let normalized = normalize_ingress_tx(&tx, u64::MAX)
687 .expect("aegis ingress should normalize to boreas");
688
689 assert_eq!(normalized.format(), TransactionFormat::Boreas);
690 assert_eq!(normalized.id(), tx.id());
691 }
692
693 #[test]
694 fn test_normalize_ingress_tx_reformats_boreas_to_aegis() {
695 let mut rng = StdRng::seed_from_u64(42);
696 let tx = LedgerTransaction::from_protocol_with_format(
697 new_moonlight_deploy_tx(&mut rng, vec![0; 32], vec![0; 32]),
698 TransactionFormat::Boreas,
699 );
700
701 let normalized = normalize_ingress_tx(&tx, 1)
702 .expect("boreas ingress should normalize to aegis");
703
704 assert_eq!(normalized.format(), TransactionFormat::Aegis);
705 assert_eq!(normalized.id(), tx.id());
706 }
707}