1pub mod conf;
8
9use std::sync::Arc;
10use std::time::Duration;
11
12use anyhow::anyhow;
13use async_trait::async_trait;
14use conf::{
15 DEFAULT_DOWNLOAD_REDUNDANCY, DEFAULT_EXPIRY_TIME, DEFAULT_IDLE_INTERVAL,
16};
17use dusk_consensus::config::MAX_BLOCK_SIZE;
18use node_data::events::{Event, TransactionEvent};
19use node_data::get_current_timestamp;
20use node_data::ledger::{Header, SpendingId, Transaction};
21use node_data::message::{payload, AsyncQueue, Payload, Topics};
22use rkyv::ser::serializers::{
23 BufferScratch, BufferSerializer, BufferSerializerError,
24 CompositeSerializer, CompositeSerializerError,
25};
26use rkyv::ser::Serializer;
27use rkyv::Infallible;
28use thiserror::Error;
29use tokio::sync::mpsc::Sender;
30use tokio::sync::RwLock;
31use tracing::{error, info, warn};
32
33use crate::database::{Ledger, Mempool};
34use crate::mempool::conf::Params;
35use crate::vm::PreverificationResult;
36use crate::{database, vm, LongLivedService, Message, Network};
37
38const TOPICS: &[u8] = &[Topics::Tx as u8];
39
40#[derive(Debug, Error)]
41pub enum TxAcceptanceError {
42 #[error("this transaction exists in the mempool")]
43 AlreadyExistsInMempool,
44 #[error("this transaction exists in the ledger")]
45 AlreadyExistsInLedger,
46 #[error("this transaction's spendId exists in the mempool")]
47 SpendIdExistsInMempool,
48 #[error("this transaction is invalid {0}")]
49 VerificationFailed(String),
50 #[error("gas price lower than minimum {0}")]
51 GasPriceTooLow(u64),
52 #[error("gas limit lower than minimum {0}")]
53 GasLimitTooLow(u64),
54 #[error("Maximum count of transactions exceeded {0}")]
55 MaxTxnCountExceeded(usize),
56 #[error("this transaction is too large to be serialized")]
57 TooLarge,
58 #[error("Maximum transaction size exceeded {0}")]
59 MaxSizeExceeded(usize),
60 #[error("A generic error occurred {0}")]
61 Generic(anyhow::Error),
62}
63
64impl From<anyhow::Error> for TxAcceptanceError {
65 fn from(err: anyhow::Error) -> Self {
66 Self::Generic(err)
67 }
68}
69
70pub struct MempoolSrv {
71 inbound: AsyncQueue<Message>,
72 conf: Params,
73 event_sender: Sender<Event>,
75}
76
77impl MempoolSrv {
78 pub fn new(conf: Params, event_sender: Sender<Event>) -> Self {
79 info!("MempoolSrv::new with conf {}", conf);
80 Self {
81 inbound: AsyncQueue::bounded(
82 conf.max_queue_size,
83 "mempool_inbound",
84 ),
85 conf,
86 event_sender,
87 }
88 }
89}
90
91#[async_trait]
92impl<N: Network, DB: database::DB, VM: vm::VMExecution>
93 LongLivedService<N, DB, VM> for MempoolSrv
94{
95 async fn execute(
96 &mut self,
97 network: Arc<RwLock<N>>,
98 db: Arc<RwLock<DB>>,
99 vm: Arc<RwLock<VM>>,
100 ) -> anyhow::Result<usize> {
101 LongLivedService::<N, DB, VM>::add_routes(
102 self,
103 TOPICS,
104 self.inbound.clone(),
105 &network,
106 )
107 .await?;
108
109 self.request_mempool(&network).await;
111
112 let idle_interval =
113 self.conf.idle_interval.unwrap_or(DEFAULT_IDLE_INTERVAL);
114
115 let mempool_expiry = self
116 .conf
117 .mempool_expiry
118 .unwrap_or(DEFAULT_EXPIRY_TIME)
119 .as_secs();
120
121 let mut on_idle_event = tokio::time::interval(idle_interval);
123 loop {
124 tokio::select! {
125 biased;
126 _ = on_idle_event.tick() => {
127 info!(event = "mempool_idle", interval = ?idle_interval);
128
129 let expiration_time = get_current_timestamp()
130 .checked_sub(mempool_expiry)
131 .expect("valid duration");
132
133 db.read().await.update(|db| {
135 let expired_txs = db.mempool_expired_txs(expiration_time).unwrap_or_else(|e| {
136 error!("cannot get expired txs: {e}");
137 vec![]
138 });
139 for tx_id in expired_txs {
140 info!(event = "expired_tx", hash = hex::encode(tx_id));
141 let deleted_txs = db.delete_mempool_tx(tx_id, true).unwrap_or_else(|e| {
142 error!("cannot delete expired tx: {e}");
143 vec![]
144 });
145 for deleted_tx_id in deleted_txs{
146 let event = TransactionEvent::Removed(deleted_tx_id);
147 info!(event = "mempool_deleted", hash = hex::encode(deleted_tx_id));
148 if let Err(e) = self.event_sender.try_send(event.into()) {
149 warn!("cannot notify mempool removed transaction {e}")
150 };
151 }
152 }
153 Ok(())
154 })?;
155
156 },
157 msg = self.inbound.recv() => {
158 if let Ok(msg) = msg {
159 match &msg.payload {
160 Payload::Transaction(tx) => {
161 let accept = self.accept_tx(&db, &vm, tx);
162 if let Err(e) = accept.await {
163 error!("Tx {} not accepted: {e}", hex::encode(tx.id()));
164 continue;
165 }
166
167 let network = network.read().await;
168 if let Err(e) = network.broadcast(&msg).await {
169 warn!("Unable to broadcast accepted tx: {e}")
170 };
171 }
172 _ => error!("invalid inbound message payload"),
173 }
174 }
175 }
176 }
177 }
178 }
179
180 fn name(&self) -> &'static str {
182 "mempool"
183 }
184}
185
186impl MempoolSrv {
187 async fn accept_tx<DB: database::DB, VM: vm::VMExecution>(
188 &mut self,
189 db: &Arc<RwLock<DB>>,
190 vm: &Arc<RwLock<VM>>,
191 tx: &Transaction,
192 ) -> Result<(), TxAcceptanceError> {
193 let max_mempool_txn_count = self.conf.max_mempool_txn_count;
194
195 let events =
196 MempoolSrv::check_tx(db, vm, tx, false, max_mempool_txn_count)
197 .await?;
198
199 tracing::info!(
200 event = "transaction accepted",
201 hash = hex::encode(tx.id())
202 );
203
204 for tx_event in events {
205 let node_event = tx_event.into();
206 if let Err(e) = self.event_sender.try_send(node_event) {
207 warn!("cannot notify mempool accepted transaction {e}")
208 };
209 }
210
211 Ok(())
212 }
213
214 pub async fn check_tx<'t, DB: database::DB, VM: vm::VMExecution>(
215 db: &Arc<RwLock<DB>>,
216 vm: &Arc<RwLock<VM>>,
217 tx: &'t Transaction,
218 dry_run: bool,
219 max_mempool_txn_count: usize,
220 ) -> Result<Vec<TransactionEvent<'t>>, TxAcceptanceError> {
221 let tx_id = tx.id();
222 let tx_size = tx.size();
223
224 let min_header_size = Header::default().size();
228 let max_tx_size = MAX_BLOCK_SIZE - min_header_size;
229 if tx_size > max_tx_size {
230 return Err(TxAcceptanceError::MaxSizeExceeded(tx_size));
231 }
232
233 check_tx_serialization(&tx.inner)?;
234
235 if tx.gas_price() < 1 {
236 return Err(TxAcceptanceError::GasPriceTooLow(1));
237 }
238
239 if tx.inner.deploy().is_some() {
240 let vm = vm.read().await;
241 let min_deployment_gas_price = vm.min_deployment_gas_price();
242 if tx.gas_price() < min_deployment_gas_price {
243 return Err(TxAcceptanceError::GasPriceTooLow(
244 min_deployment_gas_price,
245 ));
246 }
247
248 let gas_per_deploy_byte = vm.gas_per_deploy_byte();
249 let deploy_charge = tx
250 .inner
251 .deploy_charge(gas_per_deploy_byte, vm.min_deploy_points());
252 if tx.inner.gas_limit() < deploy_charge {
253 return Err(TxAcceptanceError::GasLimitTooLow(deploy_charge));
254 }
255 } else {
256 let vm = vm.read().await;
257 let min_gas_limit = vm.min_gas_limit();
258 if tx.inner.gas_limit() < min_gas_limit {
259 return Err(TxAcceptanceError::GasLimitTooLow(min_gas_limit));
260 }
261 }
262
263 let tx_to_delete = db.read().await.view(|view| {
265 if view.mempool_tx_exists(tx_id)? {
267 return Err(TxAcceptanceError::AlreadyExistsInMempool);
268 }
269
270 if view.ledger_tx_exists(&tx_id)? {
272 return Err(TxAcceptanceError::AlreadyExistsInLedger);
273 }
274
275 let txs_count = view.mempool_txs_count();
276 if txs_count >= max_mempool_txn_count {
277 let (lowest_price, to_delete) = view
279 .mempool_txs_ids_sorted_by_low_fee()
280 .next()
281 .ok_or(anyhow::anyhow!("Cannot get lowest fee tx"))?;
282
283 if tx.gas_price() < lowest_price {
284 Err(TxAcceptanceError::MaxTxnCountExceeded(
287 max_mempool_txn_count,
288 ))
289 } else {
290 Ok(Some(to_delete))
291 }
292 } else {
293 Ok(None)
294 }
295 })?;
296
297 let preverification_data =
299 vm.read().await.preverify(tx).map_err(|e| {
300 TxAcceptanceError::VerificationFailed(format!("{e:?}"))
301 })?;
302
303 if let PreverificationResult::FutureNonce {
304 account,
305 state,
306 nonce_used,
307 } = preverification_data
308 {
309 db.read().await.view(|db| {
310 for nonce in state.nonce + 1..nonce_used {
311 let spending_id = SpendingId::AccountNonce(account, nonce);
312 if db
313 .mempool_txs_by_spendable_ids(&[spending_id])
314 .is_empty()
315 {
316 return Err(TxAcceptanceError::VerificationFailed(
317 format!("Missing intermediate nonce {nonce}"),
318 ));
319 }
320 }
321 Ok(())
322 })?;
323 }
324
325 let mut events = vec![];
326
327 db.read().await.update_dry_run(dry_run, |db| {
329 let spend_ids = tx.to_spend_ids();
330
331 let mut replaced = false;
332 for m_tx_id in db.mempool_txs_by_spendable_ids(&spend_ids) {
334 if let Some(m_tx) = db.mempool_tx(m_tx_id)? {
335 if m_tx.inner.gas_price() < tx.inner.gas_price() {
336 for deleted in db.delete_mempool_tx(m_tx_id, false)? {
337 events.push(TransactionEvent::Removed(deleted));
338 replaced = true;
339 }
340 } else {
341 return Err(
342 TxAcceptanceError::SpendIdExistsInMempool.into()
343 );
344 }
345 }
346 }
347
348 events.push(TransactionEvent::Included(tx));
349
350 if !replaced {
351 if let Some(to_delete) = tx_to_delete {
352 for deleted in db.delete_mempool_tx(to_delete, true)? {
353 events.push(TransactionEvent::Removed(deleted));
354 }
355 }
356 }
357 let now = get_current_timestamp();
360
361 db.store_mempool_tx(tx, now)
362 })?;
363 Ok(events)
364 }
365
366 async fn request_mempool<N: Network>(&self, network: &Arc<RwLock<N>>) {
371 const WAIT_TIMEOUT: Duration = Duration::from_secs(5);
372 let max_peers = self
373 .conf
374 .mempool_download_redundancy
375 .unwrap_or(DEFAULT_DOWNLOAD_REDUNDANCY);
376
377 let net = network.read().await;
378 net.wait_for_alive_nodes(max_peers, WAIT_TIMEOUT).await;
379
380 let msg = payload::GetMempool::default().into();
381 if let Err(err) = net.send_to_alive_peers(msg, max_peers).await {
382 error!("could not request mempool from network: {err}");
383 }
384 }
385}
386
387fn check_tx_serialization(
388 tx: &dusk_core::transfer::Transaction,
389) -> Result<(), TxAcceptanceError> {
390 const SCRATCH_BUF_BYTES: usize = 1024;
393 const ARGBUF_LEN: usize = 64 * 1024;
394 let stripped_tx = tx.strip_off_bytecode();
395 let tx = stripped_tx.as_ref().unwrap_or(tx);
396 let mut sbuf = [0u8; SCRATCH_BUF_BYTES];
397 let mut buffer = [0u8; ARGBUF_LEN];
398 let scratch = BufferScratch::new(&mut sbuf);
399 let ser = BufferSerializer::new(&mut buffer);
400 let mut ser = CompositeSerializer::new(ser, scratch, Infallible);
401 if let Err(err) = ser.serialize_value(tx) {
402 match err {
403 CompositeSerializerError::SerializerError(err) => match err {
404 BufferSerializerError::Overflow { .. } => {
405 return Err(TxAcceptanceError::TooLarge);
406 }
407 },
408 err => return Err(TxAcceptanceError::Generic(anyhow!("{err}"))),
409 }
410 }
411 Ok(())
412}
413
414#[cfg(test)]
415mod tests {
416 use dusk_core::signatures::bls::{PublicKey, SecretKey};
417 use rand::rngs::StdRng;
418 use rand::Rng;
419 use rand::{CryptoRng, RngCore, SeedableRng};
420 use wallet_core::transaction::moonlight_deployment;
421
422 use super::*;
423
424 fn new_moonlight_deploy_tx<R: RngCore + CryptoRng>(
425 rng: &mut R,
426 bytecode: Vec<u8>,
427 init_args: Vec<u8>,
428 ) -> dusk_core::transfer::Transaction {
429 const CHAIN_ID: u8 = 0xfa;
430 let sk = SecretKey::random(rng);
431 let pk = PublicKey::from(&SecretKey::random(rng));
432
433 let gas_limit: u64 = rng.gen();
434 let gas_price: u64 = rng.gen();
435 let nonce: u64 = rng.gen();
436 let deploy_nonce: u64 = rng.gen();
437
438 moonlight_deployment(
439 &sk,
440 bytecode,
441 &pk,
442 init_args,
443 gas_limit,
444 gas_price,
445 nonce,
446 deploy_nonce,
447 CHAIN_ID,
448 )
449 .expect("should create a transaction")
450 }
451
452 const MAX_MOONLIGHT_ARG_SIZE: usize = 64 * 1024 - 2320;
453
454 #[test]
455 fn test_tx_serialization_check_normal() {
456 let mut rng = StdRng::seed_from_u64(42);
457 let tx = new_moonlight_deploy_tx(
458 &mut rng,
459 vec![0; 64 * 1024],
460 vec![0; MAX_MOONLIGHT_ARG_SIZE],
461 );
462 let result = check_tx_serialization(&tx);
463 assert!(matches!(result, Ok(())));
464 }
465
466 #[test]
467 fn test_tx_serialization_check_tx_too_large() {
468 let mut rng = StdRng::seed_from_u64(42);
469 let tx = new_moonlight_deploy_tx(
470 &mut rng,
471 vec![0; 64 * 1024],
472 vec![0; MAX_MOONLIGHT_ARG_SIZE + 1],
473 );
474 let result = check_tx_serialization(&tx);
475 assert!(matches!(result, Err(TxAcceptanceError::TooLarge)));
476 }
477}