1pub mod conf;
8
9use std::sync::Arc;
10use std::time::Duration;
11
12use anyhow::anyhow;
13use async_trait::async_trait;
14use conf::{
15 DEFAULT_DOWNLOAD_REDUNDANCY, DEFAULT_EXPIRY_TIME, DEFAULT_IDLE_INTERVAL,
16};
17use dusk_consensus::config::MAX_BLOCK_SIZE;
18use dusk_consensus::errors::BlobError;
19use dusk_core::stake::STAKE_CONTRACT;
20use dusk_core::transfer::TRANSFER_CONTRACT;
21use dusk_core::TxPreconditionError;
22use node_data::events::{Event, TransactionEvent};
23use node_data::get_current_timestamp;
24use node_data::ledger::{Header, SpendingId, Transaction};
25use node_data::message::{payload, AsyncQueue, Payload, Topics};
26use rkyv::ser::serializers::{
27 BufferScratch, BufferSerializer, BufferSerializerError,
28 CompositeSerializer, CompositeSerializerError,
29};
30use rkyv::ser::Serializer;
31use rkyv::Infallible;
32use thiserror::Error;
33use tokio::sync::mpsc::Sender;
34use tokio::sync::RwLock;
35use tracing::{error, info, warn};
36
37use crate::database::{Ledger, Mempool};
38use crate::mempool::conf::Params;
39use crate::vm::PreverificationResult;
40use crate::{database, vm, LongLivedService, Message, Network};
41
42const TOPICS: &[u8] = &[Topics::Tx as u8];
43
44#[derive(Debug, Error)]
45pub enum TxAcceptanceError {
46 #[error("this transaction exists in the mempool")]
47 AlreadyExistsInMempool,
48 #[error("this transaction exists in the ledger")]
49 AlreadyExistsInLedger,
50 #[error("Transaction blob id {} is missing sidecar", hex::encode(.0))]
51 BlobMissingSidecar([u8; 32]),
52 #[error("No blobs provided")]
53 BlobEmpty,
54 #[error("Transaction has too many blobs: {0}")]
55 BlobTooMany(usize),
56 #[error("Invalid blob: {0}")]
57 BlobInvalid(String),
58 #[error("this transaction's spendId exists in the mempool")]
59 SpendIdExistsInMempool,
60 #[error("this transaction is invalid {0}")]
61 VerificationFailed(String),
62 #[error("gas price lower than minimum {0}")]
63 GasPriceTooLow(u64),
64 #[error("gas limit lower than minimum {0}")]
65 GasLimitTooLow(u64),
66 #[error("Maximum count of transactions exceeded {0}")]
67 MaxTxnCountExceeded(usize),
68 #[error("this transaction is too large to be serialized")]
69 TooLarge,
70 #[error("Maximum transaction size exceeded {0}")]
71 MaxSizeExceeded(usize),
72 #[error("A generic error occurred {0}")]
73 Generic(anyhow::Error),
74}
75
76impl From<anyhow::Error> for TxAcceptanceError {
77 fn from(err: anyhow::Error) -> Self {
78 Self::Generic(err)
79 }
80}
81
82impl From<BlobError> for TxAcceptanceError {
83 fn from(err: BlobError) -> Self {
84 match err {
85 BlobError::MissingSidecar(id) => {
86 TxAcceptanceError::BlobMissingSidecar(id)
87 }
88 BlobError::BlobEmpty => TxAcceptanceError::BlobEmpty,
89 BlobError::BlobTooMany(n) => TxAcceptanceError::BlobTooMany(n),
90 BlobError::BlobInvalid(msg) => TxAcceptanceError::BlobInvalid(msg),
91 }
92 }
93}
94
95impl From<TxPreconditionError> for TxAcceptanceError {
96 fn from(err: TxPreconditionError) -> Self {
97 match err {
98 TxPreconditionError::BlobLowLimit(min) => {
99 TxAcceptanceError::GasLimitTooLow(min)
100 }
101 TxPreconditionError::DeployLowLimit(min) => {
102 TxAcceptanceError::GasLimitTooLow(min)
103 }
104 TxPreconditionError::DeployLowPrice(min) => {
105 TxAcceptanceError::GasPriceTooLow(min)
106 }
107 TxPreconditionError::BlobEmpty => TxAcceptanceError::BlobEmpty,
108 TxPreconditionError::BlobTooMany(n) => {
109 TxAcceptanceError::BlobTooMany(n)
110 }
111 }
112 }
113}
114
115pub struct MempoolSrv {
116 inbound: AsyncQueue<Message>,
117 conf: Params,
118 event_sender: Sender<Event>,
120}
121
122impl MempoolSrv {
123 pub fn new(conf: Params, event_sender: Sender<Event>) -> Self {
124 info!("MempoolSrv::new with conf {}", conf);
125 Self {
126 inbound: AsyncQueue::bounded(
127 conf.max_queue_size,
128 "mempool_inbound",
129 ),
130 conf,
131 event_sender,
132 }
133 }
134}
135
136#[async_trait]
137impl<N: Network, DB: database::DB, VM: vm::VMExecution>
138 LongLivedService<N, DB, VM> for MempoolSrv
139{
140 async fn execute(
141 &mut self,
142 network: Arc<RwLock<N>>,
143 db: Arc<RwLock<DB>>,
144 vm: Arc<RwLock<VM>>,
145 ) -> anyhow::Result<usize> {
146 LongLivedService::<N, DB, VM>::add_routes(
147 self,
148 TOPICS,
149 self.inbound.clone(),
150 &network,
151 )
152 .await?;
153
154 self.request_mempool(&network).await;
156
157 let idle_interval =
158 self.conf.idle_interval.unwrap_or(DEFAULT_IDLE_INTERVAL);
159
160 let mempool_expiry = self
161 .conf
162 .mempool_expiry
163 .unwrap_or(DEFAULT_EXPIRY_TIME)
164 .as_secs();
165
166 let mut on_idle_event = tokio::time::interval(idle_interval);
168 loop {
169 tokio::select! {
170 biased;
171 _ = on_idle_event.tick() => {
172 info!(event = "mempool_idle", interval = ?idle_interval);
173
174 let expiration_time = get_current_timestamp()
175 .checked_sub(mempool_expiry)
176 .expect("valid duration");
177
178 db.read().await.update(|db| {
180 let expired_txs = db.mempool_expired_txs(expiration_time).unwrap_or_else(|e| {
181 error!("cannot get expired txs: {e}");
182 vec![]
183 });
184 for tx_id in expired_txs {
185 info!(event = "expired_tx", hash = hex::encode(tx_id));
186 let deleted_txs = db.delete_mempool_tx(tx_id, true).unwrap_or_else(|e| {
187 error!("cannot delete expired tx: {e}");
188 vec![]
189 });
190 for deleted_tx_id in deleted_txs{
191 let event = TransactionEvent::Removed(deleted_tx_id);
192 info!(event = "mempool_deleted", hash = hex::encode(deleted_tx_id));
193 if let Err(e) = self.event_sender.try_send(event.into()) {
194 warn!("cannot notify mempool removed transaction {e}")
195 };
196 }
197 }
198 Ok(())
199 })?;
200
201 },
202 msg = self.inbound.recv() => {
203 if let Ok(msg) = msg {
204 match &msg.payload {
205 Payload::Transaction(tx) => {
206 let accept = self.accept_tx(&db, &vm, tx);
207 if let Err(e) = accept.await {
208 error!("Tx {} not accepted: {e}", hex::encode(tx.id()));
209 continue;
210 }
211
212 let network = network.read().await;
213 if let Err(e) = network.broadcast(&msg).await {
214 warn!("Unable to broadcast accepted tx: {e}")
215 };
216 }
217 _ => error!("invalid inbound message payload"),
218 }
219 }
220 }
221 }
222 }
223 }
224
225 fn name(&self) -> &'static str {
227 "mempool"
228 }
229}
230
231impl MempoolSrv {
232 async fn accept_tx<DB: database::DB, VM: vm::VMExecution>(
233 &mut self,
234 db: &Arc<RwLock<DB>>,
235 vm: &Arc<RwLock<VM>>,
236 tx: &Transaction,
237 ) -> Result<(), TxAcceptanceError> {
238 let max_mempool_txn_count = self.conf.max_mempool_txn_count;
239
240 let events =
241 MempoolSrv::check_tx(db, vm, tx, false, max_mempool_txn_count)
242 .await?;
243
244 tracing::info!(
245 event = "transaction accepted",
246 hash = hex::encode(tx.id())
247 );
248
249 for tx_event in events {
250 let node_event = tx_event.into();
251 if let Err(e) = self.event_sender.try_send(node_event) {
252 warn!("cannot notify mempool accepted transaction {e}")
253 };
254 }
255
256 Ok(())
257 }
258
259 pub async fn check_tx<'t, DB: database::DB, VM: vm::VMExecution>(
260 db: &Arc<RwLock<DB>>,
261 vm: &Arc<RwLock<VM>>,
262 tx: &'t Transaction,
263 dry_run: bool,
264 max_mempool_txn_count: usize,
265 ) -> Result<Vec<TransactionEvent<'t>>, TxAcceptanceError> {
266 let tx_id = tx.id();
267 let tx_size = tx.size();
268
269 let min_header_size = Header::default().size();
273 let max_tx_size = MAX_BLOCK_SIZE - min_header_size;
274 if tx_size > max_tx_size {
275 return Err(TxAcceptanceError::MaxSizeExceeded(tx_size));
276 }
277
278 check_tx_serialization(&tx.inner)?;
279
280 if tx.gas_price() < 1 {
281 return Err(TxAcceptanceError::GasPriceTooLow(1));
282 }
283
284 let tip_height = db
285 .read()
286 .await
287 .view(|db| db.latest_block())
288 .map_err(|e| {
289 anyhow!("Cannot get tip block height from the database: {e}")
290 })?
291 .header
292 .height;
293
294 {
295 let vm = vm.read().await;
297
298 let disable_wasm_32 = vm.wasm32_disabled(tip_height);
299 let disable_wasm_64 = vm.wasm64_disabled(tip_height);
300 let disable_3rd_party = vm.third_party_disabled(tip_height);
301
302 if let Some(_contract_deploy) = tx.inner.deploy() {
303 match (disable_wasm_32, disable_wasm_64) {
304 (true, true) => {
305 Err(TxAcceptanceError::Generic(anyhow::anyhow!(
306 "contract deployment is not enabled in the VM"
307 )))
308 }
309 _ => Ok(()),
312 }?
313 }
314
315 if disable_3rd_party {
316 if let Some(call) = tx.inner.call() {
317 if call.contract != TRANSFER_CONTRACT
318 && call.contract != STAKE_CONTRACT
319 {
320 Err(TxAcceptanceError::Generic(anyhow::anyhow!(
321 "3rd party contracts are not enabled in the VM"
322 )))?;
323 }
324 }
325 }
326
327 if tx.inner.deploy().is_some() {
329 let min_deployment_gas_price = vm.min_deployment_gas_price();
330 let gas_per_deploy_byte = vm.gas_per_deploy_byte();
331 let min_deploy_points = vm.min_deploy_points();
332 tx.inner.deploy_check(
333 gas_per_deploy_byte,
334 min_deployment_gas_price,
335 min_deploy_points,
336 )?;
337 }
338
339 if tx.inner.blob().is_some() {
341 if !vm.blob_active(tip_height) {
342 return Err(TxAcceptanceError::Generic(anyhow::anyhow!(
343 "blobs are not enabled in the VM"
344 )));
345 }
346
347 let gas_per_blob = vm.gas_per_blob();
348 tx.inner.blob_check(gas_per_blob)?;
349 dusk_consensus::validate_blob_sidecars(tx)?;
350 }
351
352 let min_gas_limit = vm.min_gas_limit();
354 if tx.inner.gas_limit() < min_gas_limit {
355 return Err(TxAcceptanceError::GasLimitTooLow(min_gas_limit));
356 }
357 }
358
359 let tx_to_delete = db.read().await.view(|view| {
361 if view.mempool_tx_exists(tx_id)? {
363 return Err(TxAcceptanceError::AlreadyExistsInMempool);
364 }
365
366 if view.ledger_tx_exists(&tx_id)? {
368 return Err(TxAcceptanceError::AlreadyExistsInLedger);
369 }
370
371 let txs_count = view.mempool_txs_count();
372 if txs_count >= max_mempool_txn_count {
373 let (lowest_price, to_delete) = view
375 .mempool_txs_ids_sorted_by_low_fee()
376 .next()
377 .ok_or(anyhow::anyhow!("Cannot get lowest fee tx"))?;
378
379 if tx.gas_price() < lowest_price {
380 Err(TxAcceptanceError::MaxTxnCountExceeded(
383 max_mempool_txn_count,
384 ))
385 } else {
386 Ok(Some(to_delete))
387 }
388 } else {
389 Ok(None)
390 }
391 })?;
392
393 let preverification_data =
395 vm.read().await.preverify(tx).map_err(|e| {
396 TxAcceptanceError::VerificationFailed(format!("{e}"))
397 })?;
398
399 if let PreverificationResult::FutureNonce {
400 account,
401 state,
402 nonce_used,
403 } = preverification_data
404 {
405 db.read().await.view(|db| {
406 for nonce in state.nonce + 1..nonce_used {
407 let spending_id = SpendingId::AccountNonce(account, nonce);
408 if db
409 .mempool_txs_by_spendable_ids(&[spending_id])
410 .is_empty()
411 {
412 return Err(TxAcceptanceError::VerificationFailed(
413 format!("Missing intermediate nonce {nonce}"),
414 ));
415 }
416 }
417 Ok(())
418 })?;
419 }
420
421 let mut events = vec![];
422
423 db.read().await.update_dry_run(dry_run, |db| {
425 let spend_ids = tx.to_spend_ids();
426
427 let mut replaced = false;
428 for m_tx_id in db.mempool_txs_by_spendable_ids(&spend_ids) {
430 if let Some(m_tx) = db.mempool_tx(m_tx_id)? {
431 if m_tx.inner.gas_price() < tx.inner.gas_price()
439 || (m_tx.inner.gas_price() == tx.inner.gas_price()
440 && m_tx.inner.gas_limit() < tx.inner.gas_limit())
441 {
442 for deleted in db.delete_mempool_tx(m_tx_id, false)? {
443 events.push(TransactionEvent::Removed(deleted));
444 replaced = true;
445 }
446 } else {
447 return Err(
448 TxAcceptanceError::SpendIdExistsInMempool.into()
449 );
450 }
451 }
452 }
453
454 events.push(TransactionEvent::Included(tx));
455
456 if !replaced {
457 if let Some(to_delete) = tx_to_delete {
458 for deleted in db.delete_mempool_tx(to_delete, true)? {
459 events.push(TransactionEvent::Removed(deleted));
460 }
461 }
462 }
463 let now = get_current_timestamp();
466
467 db.store_mempool_tx(tx, now)
468 })?;
469 Ok(events)
470 }
471
472 async fn request_mempool<N: Network>(&self, network: &Arc<RwLock<N>>) {
477 const WAIT_TIMEOUT: Duration = Duration::from_secs(5);
478 let max_peers = self
479 .conf
480 .mempool_download_redundancy
481 .unwrap_or(DEFAULT_DOWNLOAD_REDUNDANCY);
482
483 let net = network.read().await;
484 net.wait_for_alive_nodes(max_peers, WAIT_TIMEOUT).await;
485
486 let msg = payload::GetMempool::default().into();
487 if let Err(err) = net.send_to_alive_peers(msg, max_peers).await {
488 error!("could not request mempool from network: {err}");
489 }
490 }
491}
492
493fn check_tx_serialization(
494 tx: &dusk_core::transfer::Transaction,
495) -> Result<(), TxAcceptanceError> {
496 const SCRATCH_BUF_BYTES: usize = 1024;
499 const ARGBUF_LEN: usize = 64 * 1024;
500 let stripped_tx = tx.strip_off_bytecode().or(tx.blob_to_memo());
501 let mut sbuf = [0u8; SCRATCH_BUF_BYTES];
502 let mut buffer = [0u8; ARGBUF_LEN];
503 let scratch = BufferScratch::new(&mut sbuf);
504 let ser = BufferSerializer::new(&mut buffer);
505 let mut ser = CompositeSerializer::new(ser, scratch, Infallible);
506 if let Err(err) = ser.serialize_value(stripped_tx.as_ref().unwrap_or(tx)) {
507 match err {
508 CompositeSerializerError::SerializerError(err) => match err {
509 BufferSerializerError::Overflow { .. } => {
510 return Err(TxAcceptanceError::TooLarge);
511 }
512 },
513 err => return Err(TxAcceptanceError::Generic(anyhow!("{err}"))),
514 }
515 }
516 Ok(())
517}
518
519#[cfg(test)]
520mod tests {
521 use dusk_core::signatures::bls::{PublicKey, SecretKey};
522 use rand::rngs::StdRng;
523 use rand::Rng;
524 use rand::{CryptoRng, RngCore, SeedableRng};
525 use wallet_core::transaction::moonlight_deployment;
526
527 use super::*;
528
529 fn new_moonlight_deploy_tx<R: RngCore + CryptoRng>(
530 rng: &mut R,
531 bytecode: Vec<u8>,
532 init_args: Vec<u8>,
533 ) -> dusk_core::transfer::Transaction {
534 const CHAIN_ID: u8 = 0xfa;
535 let sk = SecretKey::random(rng);
536 let pk = PublicKey::from(&SecretKey::random(rng));
537
538 let gas_limit: u64 = rng.gen();
539 let gas_price: u64 = rng.gen();
540 let nonce: u64 = rng.gen();
541 let deploy_nonce: u64 = rng.gen();
542
543 moonlight_deployment(
544 &sk,
545 bytecode,
546 &pk,
547 init_args,
548 gas_limit,
549 gas_price,
550 nonce,
551 deploy_nonce,
552 CHAIN_ID,
553 )
554 .expect("should create a transaction")
555 }
556
557 const MAX_MOONLIGHT_ARG_SIZE: usize = 64 * 1024 - 2320;
558
559 #[test]
560 fn test_tx_serialization_check_normal() {
561 let mut rng = StdRng::seed_from_u64(42);
562 let tx = new_moonlight_deploy_tx(
563 &mut rng,
564 vec![0; 64 * 1024],
565 vec![0; MAX_MOONLIGHT_ARG_SIZE],
566 );
567 let result = check_tx_serialization(&tx);
568 assert!(matches!(result, Ok(())));
569 }
570
571 #[test]
572 fn test_tx_serialization_check_tx_too_large() {
573 let mut rng = StdRng::seed_from_u64(42);
574 let tx = new_moonlight_deploy_tx(
575 &mut rng,
576 vec![0; 64 * 1024],
577 vec![0; MAX_MOONLIGHT_ARG_SIZE + 1],
578 );
579 let result = check_tx_serialization(&tx);
580 assert!(matches!(result, Err(TxAcceptanceError::TooLarge)));
581 }
582}