1pub mod conf;
8
9use std::sync::Arc;
10use std::time::Duration;
11
12use anyhow::anyhow;
13use async_trait::async_trait;
14use conf::{
15 DEFAULT_DOWNLOAD_REDUNDANCY, DEFAULT_EXPIRY_TIME, DEFAULT_IDLE_INTERVAL,
16};
17use dusk_consensus::config::MAX_BLOCK_SIZE;
18use dusk_consensus::errors::BlobError;
19use dusk_core::TxPreconditionError;
20use node_data::events::{Event, TransactionEvent};
21use node_data::get_current_timestamp;
22use node_data::ledger::{Header, SpendingId, Transaction};
23use node_data::message::{payload, AsyncQueue, Payload, Topics};
24use rkyv::ser::serializers::{
25 BufferScratch, BufferSerializer, BufferSerializerError,
26 CompositeSerializer, CompositeSerializerError,
27};
28use rkyv::ser::Serializer;
29use rkyv::Infallible;
30use thiserror::Error;
31use tokio::sync::mpsc::Sender;
32use tokio::sync::RwLock;
33use tracing::{error, info, warn};
34
35use crate::database::{Ledger, Mempool};
36use crate::mempool::conf::Params;
37use crate::vm::PreverificationResult;
38use crate::{database, vm, LongLivedService, Message, Network};
39
40const TOPICS: &[u8] = &[Topics::Tx as u8];
41
42#[derive(Debug, Error)]
43pub enum TxAcceptanceError {
44 #[error("this transaction exists in the mempool")]
45 AlreadyExistsInMempool,
46 #[error("this transaction exists in the ledger")]
47 AlreadyExistsInLedger,
48 #[error("Transaction blob id {} is missing sidecar", hex::encode(.0))]
49 BlobMissingSidecar([u8; 32]),
50 #[error("No blobs provided")]
51 BlobEmpty,
52 #[error("Transaction has too many blobs: {0}")]
53 BlobTooMany(usize),
54 #[error("Invalid blob: {0}")]
55 BlobInvalid(String),
56 #[error("this transaction's spendId exists in the mempool")]
57 SpendIdExistsInMempool,
58 #[error("this transaction is invalid {0}")]
59 VerificationFailed(String),
60 #[error("gas price lower than minimum {0}")]
61 GasPriceTooLow(u64),
62 #[error("gas limit lower than minimum {0}")]
63 GasLimitTooLow(u64),
64 #[error("Maximum count of transactions exceeded {0}")]
65 MaxTxnCountExceeded(usize),
66 #[error("this transaction is too large to be serialized")]
67 TooLarge,
68 #[error("Maximum transaction size exceeded {0}")]
69 MaxSizeExceeded(usize),
70 #[error("A generic error occurred {0}")]
71 Generic(anyhow::Error),
72}
73
74impl From<anyhow::Error> for TxAcceptanceError {
75 fn from(err: anyhow::Error) -> Self {
76 Self::Generic(err)
77 }
78}
79
80impl From<BlobError> for TxAcceptanceError {
81 fn from(err: BlobError) -> Self {
82 match err {
83 BlobError::MissingSidecar(id) => {
84 TxAcceptanceError::BlobMissingSidecar(id)
85 }
86 BlobError::BlobEmpty => TxAcceptanceError::BlobEmpty,
87 BlobError::BlobTooMany(n) => TxAcceptanceError::BlobTooMany(n),
88 BlobError::BlobInvalid(msg) => TxAcceptanceError::BlobInvalid(msg),
89 }
90 }
91}
92
93impl From<TxPreconditionError> for TxAcceptanceError {
94 fn from(err: TxPreconditionError) -> Self {
95 match err {
96 TxPreconditionError::BlobLowLimit(min) => {
97 TxAcceptanceError::GasLimitTooLow(min)
98 }
99 TxPreconditionError::DeployLowLimit(min) => {
100 TxAcceptanceError::GasLimitTooLow(min)
101 }
102 TxPreconditionError::DeployLowPrice(min) => {
103 TxAcceptanceError::GasPriceTooLow(min)
104 }
105 TxPreconditionError::BlobEmpty => TxAcceptanceError::BlobEmpty,
106 TxPreconditionError::BlobTooMany(n) => {
107 TxAcceptanceError::BlobTooMany(n)
108 }
109 }
110 }
111}
112
113pub struct MempoolSrv {
114 inbound: AsyncQueue<Message>,
115 conf: Params,
116 event_sender: Sender<Event>,
118}
119
120impl MempoolSrv {
121 pub fn new(conf: Params, event_sender: Sender<Event>) -> Self {
122 info!("MempoolSrv::new with conf {}", conf);
123 Self {
124 inbound: AsyncQueue::bounded(
125 conf.max_queue_size,
126 "mempool_inbound",
127 ),
128 conf,
129 event_sender,
130 }
131 }
132}
133
134#[async_trait]
135impl<N: Network, DB: database::DB, VM: vm::VMExecution>
136 LongLivedService<N, DB, VM> for MempoolSrv
137{
138 async fn execute(
139 &mut self,
140 network: Arc<RwLock<N>>,
141 db: Arc<RwLock<DB>>,
142 vm: Arc<RwLock<VM>>,
143 ) -> anyhow::Result<usize> {
144 LongLivedService::<N, DB, VM>::add_routes(
145 self,
146 TOPICS,
147 self.inbound.clone(),
148 &network,
149 )
150 .await?;
151
152 self.request_mempool(&network).await;
154
155 let idle_interval =
156 self.conf.idle_interval.unwrap_or(DEFAULT_IDLE_INTERVAL);
157
158 let mempool_expiry = self
159 .conf
160 .mempool_expiry
161 .unwrap_or(DEFAULT_EXPIRY_TIME)
162 .as_secs();
163
164 let mut on_idle_event = tokio::time::interval(idle_interval);
166 loop {
167 tokio::select! {
168 biased;
169 _ = on_idle_event.tick() => {
170 info!(event = "mempool_idle", interval = ?idle_interval);
171
172 let expiration_time = get_current_timestamp()
173 .checked_sub(mempool_expiry)
174 .expect("valid duration");
175
176 db.read().await.update(|db| {
178 let expired_txs = db.mempool_expired_txs(expiration_time).unwrap_or_else(|e| {
179 error!("cannot get expired txs: {e}");
180 vec![]
181 });
182 for tx_id in expired_txs {
183 info!(event = "expired_tx", hash = hex::encode(tx_id));
184 let deleted_txs = db.delete_mempool_tx(tx_id, true).unwrap_or_else(|e| {
185 error!("cannot delete expired tx: {e}");
186 vec![]
187 });
188 for deleted_tx_id in deleted_txs{
189 let event = TransactionEvent::Removed(deleted_tx_id);
190 info!(event = "mempool_deleted", hash = hex::encode(deleted_tx_id));
191 if let Err(e) = self.event_sender.try_send(event.into()) {
192 warn!("cannot notify mempool removed transaction {e}")
193 };
194 }
195 }
196 Ok(())
197 })?;
198
199 },
200 msg = self.inbound.recv() => {
201 if let Ok(msg) = msg {
202 match &msg.payload {
203 Payload::Transaction(tx) => {
204 let accept = self.accept_tx(&db, &vm, tx);
205 if let Err(e) = accept.await {
206 error!("Tx {} not accepted: {e}", hex::encode(tx.id()));
207 continue;
208 }
209
210 let network = network.read().await;
211 if let Err(e) = network.broadcast(&msg).await {
212 warn!("Unable to broadcast accepted tx: {e}")
213 };
214 }
215 _ => error!("invalid inbound message payload"),
216 }
217 }
218 }
219 }
220 }
221 }
222
223 fn name(&self) -> &'static str {
225 "mempool"
226 }
227}
228
229impl MempoolSrv {
230 async fn accept_tx<DB: database::DB, VM: vm::VMExecution>(
231 &mut self,
232 db: &Arc<RwLock<DB>>,
233 vm: &Arc<RwLock<VM>>,
234 tx: &Transaction,
235 ) -> Result<(), TxAcceptanceError> {
236 let max_mempool_txn_count = self.conf.max_mempool_txn_count;
237
238 let events =
239 MempoolSrv::check_tx(db, vm, tx, false, max_mempool_txn_count)
240 .await?;
241
242 tracing::info!(
243 event = "transaction accepted",
244 hash = hex::encode(tx.id())
245 );
246
247 for tx_event in events {
248 let node_event = tx_event.into();
249 if let Err(e) = self.event_sender.try_send(node_event) {
250 warn!("cannot notify mempool accepted transaction {e}")
251 };
252 }
253
254 Ok(())
255 }
256
257 pub async fn check_tx<'t, DB: database::DB, VM: vm::VMExecution>(
258 db: &Arc<RwLock<DB>>,
259 vm: &Arc<RwLock<VM>>,
260 tx: &'t Transaction,
261 dry_run: bool,
262 max_mempool_txn_count: usize,
263 ) -> Result<Vec<TransactionEvent<'t>>, TxAcceptanceError> {
264 let tx_id = tx.id();
265 let tx_size = tx.size();
266
267 let min_header_size = Header::default().size();
271 let max_tx_size = MAX_BLOCK_SIZE - min_header_size;
272 if tx_size > max_tx_size {
273 return Err(TxAcceptanceError::MaxSizeExceeded(tx_size));
274 }
275
276 check_tx_serialization(&tx.inner)?;
277
278 if tx.gas_price() < 1 {
279 return Err(TxAcceptanceError::GasPriceTooLow(1));
280 }
281
282 {
283 let vm = vm.read().await;
285
286 if tx.inner.deploy().is_some() {
288 let min_deployment_gas_price = vm.min_deployment_gas_price();
289 let gas_per_deploy_byte = vm.gas_per_deploy_byte();
290 let min_deploy_points = vm.min_deploy_points();
291 tx.inner.deploy_check(
292 gas_per_deploy_byte,
293 min_deployment_gas_price,
294 min_deploy_points,
295 )?;
296 }
297
298 if tx.inner.blob().is_some() {
300 db.read()
301 .await
302 .view(|db| {
303 db.block_label_by_height(vm.blob_activation_height())
304 })
305 .map_err(|e| {
306 anyhow!("Cannot get blob activation height: {e}")
307 })?
308 .ok_or(anyhow!(
309 "Blobs acceptance will start at block height: {}",
310 vm.blob_activation_height()
311 ))?;
312
313 let gas_per_blob = vm.gas_per_blob();
314 tx.inner.blob_check(gas_per_blob)?;
315 dusk_consensus::validate_blob_sidecars(tx)?;
316 }
317
318 let min_gas_limit = vm.min_gas_limit();
320 if tx.inner.gas_limit() < min_gas_limit {
321 return Err(TxAcceptanceError::GasLimitTooLow(min_gas_limit));
322 }
323 }
324
325 let tx_to_delete = db.read().await.view(|view| {
327 if view.mempool_tx_exists(tx_id)? {
329 return Err(TxAcceptanceError::AlreadyExistsInMempool);
330 }
331
332 if view.ledger_tx_exists(&tx_id)? {
334 return Err(TxAcceptanceError::AlreadyExistsInLedger);
335 }
336
337 let txs_count = view.mempool_txs_count();
338 if txs_count >= max_mempool_txn_count {
339 let (lowest_price, to_delete) = view
341 .mempool_txs_ids_sorted_by_low_fee()
342 .next()
343 .ok_or(anyhow::anyhow!("Cannot get lowest fee tx"))?;
344
345 if tx.gas_price() < lowest_price {
346 Err(TxAcceptanceError::MaxTxnCountExceeded(
349 max_mempool_txn_count,
350 ))
351 } else {
352 Ok(Some(to_delete))
353 }
354 } else {
355 Ok(None)
356 }
357 })?;
358
359 let preverification_data =
361 vm.read().await.preverify(tx).map_err(|e| {
362 TxAcceptanceError::VerificationFailed(format!("{e}"))
363 })?;
364
365 if let PreverificationResult::FutureNonce {
366 account,
367 state,
368 nonce_used,
369 } = preverification_data
370 {
371 db.read().await.view(|db| {
372 for nonce in state.nonce + 1..nonce_used {
373 let spending_id = SpendingId::AccountNonce(account, nonce);
374 if db
375 .mempool_txs_by_spendable_ids(&[spending_id])
376 .is_empty()
377 {
378 return Err(TxAcceptanceError::VerificationFailed(
379 format!("Missing intermediate nonce {nonce}"),
380 ));
381 }
382 }
383 Ok(())
384 })?;
385 }
386
387 let mut events = vec![];
388
389 db.read().await.update_dry_run(dry_run, |db| {
391 let spend_ids = tx.to_spend_ids();
392
393 let mut replaced = false;
394 for m_tx_id in db.mempool_txs_by_spendable_ids(&spend_ids) {
396 if let Some(m_tx) = db.mempool_tx(m_tx_id)? {
397 if m_tx.inner.gas_price() < tx.inner.gas_price()
405 || (m_tx.inner.gas_price() == tx.inner.gas_price()
406 && m_tx.inner.gas_limit() < tx.inner.gas_limit())
407 {
408 for deleted in db.delete_mempool_tx(m_tx_id, false)? {
409 events.push(TransactionEvent::Removed(deleted));
410 replaced = true;
411 }
412 } else {
413 return Err(
414 TxAcceptanceError::SpendIdExistsInMempool.into()
415 );
416 }
417 }
418 }
419
420 events.push(TransactionEvent::Included(tx));
421
422 if !replaced {
423 if let Some(to_delete) = tx_to_delete {
424 for deleted in db.delete_mempool_tx(to_delete, true)? {
425 events.push(TransactionEvent::Removed(deleted));
426 }
427 }
428 }
429 let now = get_current_timestamp();
432
433 db.store_mempool_tx(tx, now)
434 })?;
435 Ok(events)
436 }
437
438 async fn request_mempool<N: Network>(&self, network: &Arc<RwLock<N>>) {
443 const WAIT_TIMEOUT: Duration = Duration::from_secs(5);
444 let max_peers = self
445 .conf
446 .mempool_download_redundancy
447 .unwrap_or(DEFAULT_DOWNLOAD_REDUNDANCY);
448
449 let net = network.read().await;
450 net.wait_for_alive_nodes(max_peers, WAIT_TIMEOUT).await;
451
452 let msg = payload::GetMempool::default().into();
453 if let Err(err) = net.send_to_alive_peers(msg, max_peers).await {
454 error!("could not request mempool from network: {err}");
455 }
456 }
457}
458
459fn check_tx_serialization(
460 tx: &dusk_core::transfer::Transaction,
461) -> Result<(), TxAcceptanceError> {
462 const SCRATCH_BUF_BYTES: usize = 1024;
465 const ARGBUF_LEN: usize = 64 * 1024;
466 let stripped_tx = tx.strip_off_bytecode().or(tx.blob_to_memo());
467 let mut sbuf = [0u8; SCRATCH_BUF_BYTES];
468 let mut buffer = [0u8; ARGBUF_LEN];
469 let scratch = BufferScratch::new(&mut sbuf);
470 let ser = BufferSerializer::new(&mut buffer);
471 let mut ser = CompositeSerializer::new(ser, scratch, Infallible);
472 if let Err(err) = ser.serialize_value(stripped_tx.as_ref().unwrap_or(tx)) {
473 match err {
474 CompositeSerializerError::SerializerError(err) => match err {
475 BufferSerializerError::Overflow { .. } => {
476 return Err(TxAcceptanceError::TooLarge);
477 }
478 },
479 err => return Err(TxAcceptanceError::Generic(anyhow!("{err}"))),
480 }
481 }
482 Ok(())
483}
484
485#[cfg(test)]
486mod tests {
487 use dusk_core::signatures::bls::{PublicKey, SecretKey};
488 use rand::rngs::StdRng;
489 use rand::Rng;
490 use rand::{CryptoRng, RngCore, SeedableRng};
491 use wallet_core::transaction::moonlight_deployment;
492
493 use super::*;
494
495 fn new_moonlight_deploy_tx<R: RngCore + CryptoRng>(
496 rng: &mut R,
497 bytecode: Vec<u8>,
498 init_args: Vec<u8>,
499 ) -> dusk_core::transfer::Transaction {
500 const CHAIN_ID: u8 = 0xfa;
501 let sk = SecretKey::random(rng);
502 let pk = PublicKey::from(&SecretKey::random(rng));
503
504 let gas_limit: u64 = rng.gen();
505 let gas_price: u64 = rng.gen();
506 let nonce: u64 = rng.gen();
507 let deploy_nonce: u64 = rng.gen();
508
509 moonlight_deployment(
510 &sk,
511 bytecode,
512 &pk,
513 init_args,
514 gas_limit,
515 gas_price,
516 nonce,
517 deploy_nonce,
518 CHAIN_ID,
519 )
520 .expect("should create a transaction")
521 }
522
523 const MAX_MOONLIGHT_ARG_SIZE: usize = 64 * 1024 - 2320;
524
525 #[test]
526 fn test_tx_serialization_check_normal() {
527 let mut rng = StdRng::seed_from_u64(42);
528 let tx = new_moonlight_deploy_tx(
529 &mut rng,
530 vec![0; 64 * 1024],
531 vec![0; MAX_MOONLIGHT_ARG_SIZE],
532 );
533 let result = check_tx_serialization(&tx);
534 assert!(matches!(result, Ok(())));
535 }
536
537 #[test]
538 fn test_tx_serialization_check_tx_too_large() {
539 let mut rng = StdRng::seed_from_u64(42);
540 let tx = new_moonlight_deploy_tx(
541 &mut rng,
542 vec![0; 64 * 1024],
543 vec![0; MAX_MOONLIGHT_ARG_SIZE + 1],
544 );
545 let result = check_tx_serialization(&tx);
546 assert!(matches!(result, Err(TxAcceptanceError::TooLarge)));
547 }
548}