1pub mod conf;
8
9use std::sync::Arc;
10use std::time::Duration;
11
12use anyhow::anyhow;
13use async_trait::async_trait;
14use conf::{
15 DEFAULT_DOWNLOAD_REDUNDANCY, DEFAULT_EXPIRY_TIME, DEFAULT_IDLE_INTERVAL,
16};
17use dusk_consensus::config::MAX_BLOCK_SIZE;
18use dusk_consensus::errors::BlobError;
19use dusk_core::TxPreconditionError;
20use dusk_core::stake::STAKE_CONTRACT;
21use dusk_core::transfer::TRANSFER_CONTRACT;
22use node_data::events::{Event, TransactionEvent};
23use node_data::get_current_timestamp;
24use node_data::ledger::{Header, SpendingId, Transaction};
25use node_data::message::{AsyncQueue, Payload, Topics, payload};
26use rkyv::Infallible;
27use rkyv::ser::Serializer;
28use rkyv::ser::serializers::{
29 BufferScratch, BufferSerializer, BufferSerializerError,
30 CompositeSerializer, CompositeSerializerError,
31};
32use thiserror::Error;
33use tokio::sync::RwLock;
34use tokio::sync::mpsc::Sender;
35use tracing::{error, info, warn};
36
37use crate::database::{Ledger, Mempool};
38use crate::mempool::conf::Params;
39use crate::vm::PreverificationResult;
40use crate::{LongLivedService, Message, Network, database, vm};
41
42const TOPICS: &[u8] = &[Topics::Tx as u8];
43
44#[derive(Debug, Error)]
45pub enum TxAcceptanceError {
46 #[error("this transaction exists in the mempool")]
47 AlreadyExistsInMempool,
48 #[error("this transaction exists in the ledger")]
49 AlreadyExistsInLedger,
50 #[error("Transaction blob id {} is missing sidecar", hex::encode(.0))]
51 BlobMissingSidecar([u8; 32]),
52 #[error("No blobs provided")]
53 BlobEmpty,
54 #[error("Transaction has too many blobs: {0}")]
55 BlobTooMany(usize),
56 #[error("Invalid blob: {0}")]
57 BlobInvalid(String),
58 #[error("this transaction's spendId exists in the mempool")]
59 SpendIdExistsInMempool,
60 #[error("this transaction is invalid {0}")]
61 VerificationFailed(String),
62 #[error("gas price lower than minimum {0}")]
63 GasPriceTooLow(u64),
64 #[error("gas limit lower than minimum {0}")]
65 GasLimitTooLow(u64),
66 #[error("Maximum count of transactions exceeded {0}")]
67 MaxTxnCountExceeded(usize),
68 #[error("this transaction is too large to be serialized")]
69 TooLarge,
70 #[error("Maximum transaction size exceeded {0}")]
71 MaxSizeExceeded(usize),
72 #[error("A generic error occurred {0}")]
73 Generic(anyhow::Error),
74}
75
76impl From<anyhow::Error> for TxAcceptanceError {
77 fn from(err: anyhow::Error) -> Self {
78 Self::Generic(err)
79 }
80}
81
82impl From<BlobError> for TxAcceptanceError {
83 fn from(err: BlobError) -> Self {
84 match err {
85 BlobError::MissingSidecar(id) => {
86 TxAcceptanceError::BlobMissingSidecar(id)
87 }
88 BlobError::BlobEmpty => TxAcceptanceError::BlobEmpty,
89 BlobError::BlobTooMany(n) => TxAcceptanceError::BlobTooMany(n),
90 BlobError::BlobInvalid(msg) => TxAcceptanceError::BlobInvalid(msg),
91 }
92 }
93}
94
95impl From<TxPreconditionError> for TxAcceptanceError {
96 fn from(err: TxPreconditionError) -> Self {
97 match err {
98 TxPreconditionError::BlobLowLimit(min) => {
99 TxAcceptanceError::GasLimitTooLow(min)
100 }
101 TxPreconditionError::DeployLowLimit(min) => {
102 TxAcceptanceError::GasLimitTooLow(min)
103 }
104 TxPreconditionError::DeployLowPrice(min) => {
105 TxAcceptanceError::GasPriceTooLow(min)
106 }
107 TxPreconditionError::BlobEmpty => TxAcceptanceError::BlobEmpty,
108 TxPreconditionError::BlobTooMany(n) => {
109 TxAcceptanceError::BlobTooMany(n)
110 }
111 TxPreconditionError::PhoenixFeeOverflow => {
112 TxAcceptanceError::VerificationFailed(
113 "phoenix fee overflow".into(),
114 )
115 }
116 TxPreconditionError::PhoenixFeeTampered => {
117 TxAcceptanceError::VerificationFailed(
118 "phoenix fee tampered".into(),
119 )
120 }
121 TxPreconditionError::PhoenixFeeRefundMismatch => {
122 TxAcceptanceError::VerificationFailed(
123 "phoenix fee refund stealth address mismatch".into(),
124 )
125 }
126 }
127 }
128}
129
130pub struct MempoolSrv {
131 inbound: AsyncQueue<Message>,
132 conf: Params,
133 event_sender: Sender<Event>,
135}
136
137impl MempoolSrv {
138 pub fn new(conf: Params, event_sender: Sender<Event>) -> Self {
139 info!("MempoolSrv::new with conf {}", conf);
140 Self {
141 inbound: AsyncQueue::bounded(
142 conf.max_queue_size,
143 "mempool_inbound",
144 ),
145 conf,
146 event_sender,
147 }
148 }
149}
150
151#[async_trait]
152impl<N: Network, DB: database::DB, VM: vm::VMExecution>
153 LongLivedService<N, DB, VM> for MempoolSrv
154{
155 async fn execute(
156 &mut self,
157 network: Arc<RwLock<N>>,
158 db: Arc<RwLock<DB>>,
159 vm: Arc<RwLock<VM>>,
160 ) -> anyhow::Result<usize> {
161 LongLivedService::<N, DB, VM>::add_routes(
162 self,
163 TOPICS,
164 self.inbound.clone(),
165 &network,
166 )
167 .await?;
168
169 self.request_mempool(&network).await;
171
172 let idle_interval =
173 self.conf.idle_interval.unwrap_or(DEFAULT_IDLE_INTERVAL);
174
175 let mempool_expiry = self
176 .conf
177 .mempool_expiry
178 .unwrap_or(DEFAULT_EXPIRY_TIME)
179 .as_secs();
180
181 let mut on_idle_event = tokio::time::interval(idle_interval);
183 loop {
184 tokio::select! {
185 biased;
186 _ = on_idle_event.tick() => {
187 info!(event = "mempool_idle", interval = ?idle_interval);
188
189 let expiration_time = get_current_timestamp()
190 .checked_sub(mempool_expiry)
191 .expect("valid duration");
192
193 db.read().await.update(|db| {
195 let expired_txs = db.mempool_expired_txs(expiration_time).unwrap_or_else(|e| {
196 error!("cannot get expired txs: {e}");
197 vec![]
198 });
199 for tx_id in expired_txs {
200 info!(event = "expired_tx", hash = hex::encode(tx_id));
201 let deleted_txs = db.delete_mempool_tx(tx_id, true).unwrap_or_else(|e| {
202 error!("cannot delete expired tx: {e}");
203 vec![]
204 });
205 for deleted_tx_id in deleted_txs{
206 let event = TransactionEvent::Removed(deleted_tx_id);
207 info!(event = "mempool_deleted", hash = hex::encode(deleted_tx_id));
208 if let Err(e) = self.event_sender.try_send(event.into()) {
209 warn!("cannot notify mempool removed transaction {e}")
210 };
211 }
212 }
213 Ok(())
214 })?;
215
216 },
217 msg = self.inbound.recv() => {
218 if let Ok(msg) = msg {
219 match &msg.payload {
220 Payload::Transaction(tx) => {
221 let accept = self.accept_tx(&db, &vm, tx);
222 if let Err(e) = accept.await {
223 error!("Tx {} not accepted: {e}", hex::encode(tx.id()));
224 continue;
225 }
226
227 let network = network.read().await;
228 if let Err(e) = network.broadcast(&msg).await {
229 warn!("Unable to broadcast accepted tx: {e}")
230 };
231 }
232 _ => error!("invalid inbound message payload"),
233 }
234 }
235 }
236 }
237 }
238 }
239
240 fn name(&self) -> &'static str {
242 "mempool"
243 }
244}
245
246impl MempoolSrv {
247 async fn accept_tx<DB: database::DB, VM: vm::VMExecution>(
248 &mut self,
249 db: &Arc<RwLock<DB>>,
250 vm: &Arc<RwLock<VM>>,
251 tx: &Transaction,
252 ) -> Result<(), TxAcceptanceError> {
253 let max_mempool_txn_count = self.conf.max_mempool_txn_count;
254
255 let events =
256 MempoolSrv::check_tx(db, vm, tx, false, max_mempool_txn_count)
257 .await?;
258
259 tracing::info!(
260 event = "transaction accepted",
261 hash = hex::encode(tx.id())
262 );
263
264 for tx_event in events {
265 let node_event = tx_event.into();
266 if let Err(e) = self.event_sender.try_send(node_event) {
267 warn!("cannot notify mempool accepted transaction {e}")
268 };
269 }
270
271 Ok(())
272 }
273
274 pub async fn check_tx<'t, DB: database::DB, VM: vm::VMExecution>(
275 db: &Arc<RwLock<DB>>,
276 vm: &Arc<RwLock<VM>>,
277 tx: &'t Transaction,
278 dry_run: bool,
279 max_mempool_txn_count: usize,
280 ) -> Result<Vec<TransactionEvent<'t>>, TxAcceptanceError> {
281 let tx_id = tx.id();
282 let tx_size = tx.size();
283
284 let min_header_size = Header::default().size();
288 let max_tx_size = MAX_BLOCK_SIZE - min_header_size;
289 if tx_size > max_tx_size {
290 return Err(TxAcceptanceError::MaxSizeExceeded(tx_size));
291 }
292
293 check_tx_serialization(&tx.inner)?;
294
295 if tx.gas_price() < 1 {
296 return Err(TxAcceptanceError::GasPriceTooLow(1));
297 }
298
299 let tip_height = db
300 .read()
301 .await
302 .view(|db| db.latest_block())
303 .map_err(|e| {
304 anyhow!("Cannot get tip block height from the database: {e}")
305 })?
306 .header
307 .height;
308
309 {
310 let vm = vm.read().await;
312
313 let disable_wasm_32 = vm.wasm32_disabled(tip_height);
314 let disable_wasm_64 = vm.wasm64_disabled(tip_height);
315 let disable_3rd_party = vm.third_party_disabled(tip_height);
316
317 if let Some(_contract_deploy) = tx.inner.deploy() {
318 match (disable_wasm_32, disable_wasm_64) {
319 (true, true) => {
320 Err(TxAcceptanceError::Generic(anyhow::anyhow!(
321 "contract deployment is not enabled in the VM"
322 )))
323 }
324 _ => Ok(()),
327 }?
328 }
329
330 if disable_3rd_party {
331 if let Some(call) = tx.inner.call() {
332 if call.contract != TRANSFER_CONTRACT
333 && call.contract != STAKE_CONTRACT
334 {
335 Err(TxAcceptanceError::Generic(anyhow::anyhow!(
336 "3rd party contracts are not enabled in the VM"
337 )))?;
338 }
339 }
340 }
341
342 tx.inner.phoenix_fee_check()?;
343
344 if vm.phoenix_refund_check_active(tip_height) {
345 tx.inner.phoenix_refund_check()?;
346 }
347
348 if tx.inner.deploy().is_some() {
350 let min_deployment_gas_price = vm.min_deployment_gas_price();
351 let gas_per_deploy_byte = vm.gas_per_deploy_byte();
352 let min_deploy_points = vm.min_deploy_points();
353 tx.inner.deploy_check(
354 gas_per_deploy_byte,
355 min_deployment_gas_price,
356 min_deploy_points,
357 )?;
358 }
359
360 if tx.inner.blob().is_some() {
362 if !vm.blob_active(tip_height) {
363 return Err(TxAcceptanceError::Generic(anyhow::anyhow!(
364 "blobs are not enabled in the VM"
365 )));
366 }
367
368 let gas_per_blob = vm.gas_per_blob();
369 tx.inner.blob_check(gas_per_blob)?;
370 dusk_consensus::validate_blob_sidecars(tx)?;
371 }
372
373 let min_gas_limit = vm.min_gas_limit();
375 if tx.inner.gas_limit() < min_gas_limit {
376 return Err(TxAcceptanceError::GasLimitTooLow(min_gas_limit));
377 }
378 }
379
380 let tx_to_delete = db.read().await.view(|view| {
382 if view.mempool_tx_exists(tx_id)? {
384 return Err(TxAcceptanceError::AlreadyExistsInMempool);
385 }
386
387 if view.ledger_tx_exists(&tx_id)? {
389 return Err(TxAcceptanceError::AlreadyExistsInLedger);
390 }
391
392 let txs_count = view.mempool_txs_count();
393 if txs_count >= max_mempool_txn_count {
394 let (lowest_price, to_delete) = view
396 .mempool_txs_ids_sorted_by_low_fee()
397 .next()
398 .ok_or(anyhow::anyhow!("Cannot get lowest fee tx"))?;
399
400 if tx.gas_price() < lowest_price {
401 Err(TxAcceptanceError::MaxTxnCountExceeded(
404 max_mempool_txn_count,
405 ))
406 } else {
407 Ok(Some(to_delete))
408 }
409 } else {
410 Ok(None)
411 }
412 })?;
413
414 let preverification_data =
416 vm.read().await.preverify(tx, tip_height).map_err(|e| {
417 TxAcceptanceError::VerificationFailed(format!("{e}"))
418 })?;
419
420 if let PreverificationResult::FutureNonce {
421 account,
422 state,
423 nonce_used,
424 } = preverification_data
425 {
426 db.read().await.view(|db| {
427 for nonce in state.nonce + 1..nonce_used {
428 let spending_id = SpendingId::AccountNonce(account, nonce);
429 if db
430 .mempool_txs_by_spendable_ids(&[spending_id])
431 .is_empty()
432 {
433 return Err(TxAcceptanceError::VerificationFailed(
434 format!("Missing intermediate nonce {nonce}"),
435 ));
436 }
437 }
438 Ok(())
439 })?;
440 }
441
442 let mut events = vec![];
443
444 db.read().await.update_dry_run(dry_run, |db| {
446 let spend_ids = tx.to_spend_ids();
447
448 let mut replaced = false;
449 for m_tx_id in db.mempool_txs_by_spendable_ids(&spend_ids) {
451 if let Some(m_tx) = db.mempool_tx(m_tx_id)? {
452 if m_tx.inner.gas_price() < tx.inner.gas_price()
460 || (m_tx.inner.gas_price() == tx.inner.gas_price()
461 && m_tx.inner.gas_limit() < tx.inner.gas_limit())
462 {
463 for deleted in db.delete_mempool_tx(m_tx_id, false)? {
464 events.push(TransactionEvent::Removed(deleted));
465 replaced = true;
466 }
467 } else {
468 return Err(
469 TxAcceptanceError::SpendIdExistsInMempool.into()
470 );
471 }
472 }
473 }
474
475 events.push(TransactionEvent::Included(tx));
476
477 if !replaced {
478 if let Some(to_delete) = tx_to_delete {
479 for deleted in db.delete_mempool_tx(to_delete, true)? {
480 events.push(TransactionEvent::Removed(deleted));
481 }
482 }
483 }
484 let now = get_current_timestamp();
487
488 db.store_mempool_tx(tx, now)
489 })?;
490 Ok(events)
491 }
492
493 async fn request_mempool<N: Network>(&self, network: &Arc<RwLock<N>>) {
498 const WAIT_TIMEOUT: Duration = Duration::from_secs(5);
499 let max_peers = self
500 .conf
501 .mempool_download_redundancy
502 .unwrap_or(DEFAULT_DOWNLOAD_REDUNDANCY);
503
504 let net = network.read().await;
505 net.wait_for_alive_nodes(max_peers, WAIT_TIMEOUT).await;
506
507 let msg = payload::GetMempool::default().into();
508 if let Err(err) = net.send_to_alive_peers(msg, max_peers).await {
509 error!("could not request mempool from network: {err}");
510 }
511 }
512}
513
514fn check_tx_serialization(
515 tx: &dusk_core::transfer::Transaction,
516) -> Result<(), TxAcceptanceError> {
517 const SCRATCH_BUF_BYTES: usize = 1024;
520 const ARGBUF_LEN: usize = 64 * 1024;
521 let stripped_tx = tx.strip_off_bytecode().or(tx.blob_to_memo());
522 let mut sbuf = [0u8; SCRATCH_BUF_BYTES];
523 let mut buffer = [0u8; ARGBUF_LEN];
524 let scratch = BufferScratch::new(&mut sbuf);
525 let ser = BufferSerializer::new(&mut buffer);
526 let mut ser = CompositeSerializer::new(ser, scratch, Infallible);
527 if let Err(err) = ser.serialize_value(stripped_tx.as_ref().unwrap_or(tx)) {
528 match err {
529 CompositeSerializerError::SerializerError(err) => match err {
530 BufferSerializerError::Overflow { .. } => {
531 return Err(TxAcceptanceError::TooLarge);
532 }
533 },
534 err => return Err(TxAcceptanceError::Generic(anyhow!("{err}"))),
535 }
536 }
537 Ok(())
538}
539
540#[cfg(test)]
541mod tests {
542 use dusk_core::signatures::bls::{PublicKey, SecretKey};
543 use rand::Rng;
544 use rand::rngs::StdRng;
545 use rand::{CryptoRng, RngCore, SeedableRng};
546 use wallet_core::transaction::moonlight_deployment;
547
548 use super::*;
549
550 fn new_moonlight_deploy_tx<R: RngCore + CryptoRng>(
551 rng: &mut R,
552 bytecode: Vec<u8>,
553 init_args: Vec<u8>,
554 ) -> dusk_core::transfer::Transaction {
555 const CHAIN_ID: u8 = 0xfa;
556 let sk = SecretKey::random(rng);
557 let pk = PublicKey::from(&SecretKey::random(rng));
558
559 let gas_limit: u64 = rng.r#gen();
560 let gas_price: u64 = rng.r#gen();
561 let nonce: u64 = rng.r#gen();
562 let deploy_nonce: u64 = rng.r#gen();
563
564 moonlight_deployment(
565 &sk,
566 bytecode,
567 &pk,
568 init_args,
569 gas_limit,
570 gas_price,
571 nonce,
572 deploy_nonce,
573 CHAIN_ID,
574 )
575 .expect("should create a transaction")
576 }
577
578 const MAX_MOONLIGHT_ARG_SIZE: usize = 64 * 1024 - 2320;
579
580 #[test]
581 fn test_tx_serialization_check_normal() {
582 let mut rng = StdRng::seed_from_u64(42);
583 let tx = new_moonlight_deploy_tx(
584 &mut rng,
585 vec![0; 64 * 1024],
586 vec![0; MAX_MOONLIGHT_ARG_SIZE],
587 );
588 let result = check_tx_serialization(&tx);
589 assert!(matches!(result, Ok(())));
590 }
591
592 #[test]
593 fn test_tx_serialization_check_tx_too_large() {
594 let mut rng = StdRng::seed_from_u64(42);
595 let tx = new_moonlight_deploy_tx(
596 &mut rng,
597 vec![0; 64 * 1024],
598 vec![0; MAX_MOONLIGHT_ARG_SIZE + 1],
599 );
600 let result = check_tx_serialization(&tx);
601 assert!(matches!(result, Err(TxAcceptanceError::TooLarge)));
602 }
603}