pub mod conf;
use std::sync::Arc;
use std::time::Duration;
use anyhow::anyhow;
use async_trait::async_trait;
use conf::{
DEFAULT_DOWNLOAD_REDUNDANCY, DEFAULT_EXPIRY_TIME, DEFAULT_IDLE_INTERVAL,
};
use dusk_consensus::config::MAX_BLOCK_SIZE;
use dusk_consensus::errors::BlobError;
use dusk_core::stake::STAKE_CONTRACT;
use dusk_core::transfer::TRANSFER_CONTRACT;
use dusk_core::TxPreconditionError;
use node_data::events::{Event, TransactionEvent};
use node_data::get_current_timestamp;
use node_data::ledger::{Header, SpendingId, Transaction};
use node_data::message::{payload, AsyncQueue, Payload, Topics};
use rkyv::ser::serializers::{
BufferScratch, BufferSerializer, BufferSerializerError,
CompositeSerializer, CompositeSerializerError,
};
use rkyv::ser::Serializer;
use rkyv::Infallible;
use thiserror::Error;
use tokio::sync::mpsc::Sender;
use tokio::sync::RwLock;
use tracing::{error, info, warn};
use crate::database::{Ledger, Mempool};
use crate::mempool::conf::Params;
use crate::vm::PreverificationResult;
use crate::{database, vm, LongLivedService, Message, Network};
const TOPICS: &[u8] = &[Topics::Tx as u8];
#[derive(Debug, Error)]
pub enum TxAcceptanceError {
#[error("this transaction exists in the mempool")]
AlreadyExistsInMempool,
#[error("this transaction exists in the ledger")]
AlreadyExistsInLedger,
#[error("Transaction blob id {} is missing sidecar", hex::encode(.0))]
BlobMissingSidecar([u8; 32]),
#[error("No blobs provided")]
BlobEmpty,
#[error("Transaction has too many blobs: {0}")]
BlobTooMany(usize),
#[error("Invalid blob: {0}")]
BlobInvalid(String),
#[error("this transaction's spendId exists in the mempool")]
SpendIdExistsInMempool,
#[error("this transaction is invalid {0}")]
VerificationFailed(String),
#[error("gas price lower than minimum {0}")]
GasPriceTooLow(u64),
#[error("gas limit lower than minimum {0}")]
GasLimitTooLow(u64),
#[error("Maximum count of transactions exceeded {0}")]
MaxTxnCountExceeded(usize),
#[error("this transaction is too large to be serialized")]
TooLarge,
#[error("Maximum transaction size exceeded {0}")]
MaxSizeExceeded(usize),
#[error("A generic error occurred {0}")]
Generic(anyhow::Error),
}
impl From<anyhow::Error> for TxAcceptanceError {
fn from(err: anyhow::Error) -> Self {
Self::Generic(err)
}
}
impl From<BlobError> for TxAcceptanceError {
fn from(err: BlobError) -> Self {
match err {
BlobError::MissingSidecar(id) => {
TxAcceptanceError::BlobMissingSidecar(id)
}
BlobError::BlobEmpty => TxAcceptanceError::BlobEmpty,
BlobError::BlobTooMany(n) => TxAcceptanceError::BlobTooMany(n),
BlobError::BlobInvalid(msg) => TxAcceptanceError::BlobInvalid(msg),
}
}
}
impl From<TxPreconditionError> for TxAcceptanceError {
fn from(err: TxPreconditionError) -> Self {
match err {
TxPreconditionError::BlobLowLimit(min) => {
TxAcceptanceError::GasLimitTooLow(min)
}
TxPreconditionError::DeployLowLimit(min) => {
TxAcceptanceError::GasLimitTooLow(min)
}
TxPreconditionError::DeployLowPrice(min) => {
TxAcceptanceError::GasPriceTooLow(min)
}
TxPreconditionError::BlobEmpty => TxAcceptanceError::BlobEmpty,
TxPreconditionError::BlobTooMany(n) => {
TxAcceptanceError::BlobTooMany(n)
}
}
}
}
pub struct MempoolSrv {
inbound: AsyncQueue<Message>,
conf: Params,
event_sender: Sender<Event>,
}
impl MempoolSrv {
pub fn new(conf: Params, event_sender: Sender<Event>) -> Self {
info!("MempoolSrv::new with conf {}", conf);
Self {
inbound: AsyncQueue::bounded(
conf.max_queue_size,
"mempool_inbound",
),
conf,
event_sender,
}
}
}
#[async_trait]
impl<N: Network, DB: database::DB, VM: vm::VMExecution>
LongLivedService<N, DB, VM> for MempoolSrv
{
async fn execute(
&mut self,
network: Arc<RwLock<N>>,
db: Arc<RwLock<DB>>,
vm: Arc<RwLock<VM>>,
) -> anyhow::Result<usize> {
LongLivedService::<N, DB, VM>::add_routes(
self,
TOPICS,
self.inbound.clone(),
&network,
)
.await?;
self.request_mempool(&network).await;
let idle_interval =
self.conf.idle_interval.unwrap_or(DEFAULT_IDLE_INTERVAL);
let mempool_expiry = self
.conf
.mempool_expiry
.unwrap_or(DEFAULT_EXPIRY_TIME)
.as_secs();
let mut on_idle_event = tokio::time::interval(idle_interval);
loop {
tokio::select! {
biased;
_ = on_idle_event.tick() => {
info!(event = "mempool_idle", interval = ?idle_interval);
let expiration_time = get_current_timestamp()
.checked_sub(mempool_expiry)
.expect("valid duration");
db.read().await.update(|db| {
let expired_txs = db.mempool_expired_txs(expiration_time).unwrap_or_else(|e| {
error!("cannot get expired txs: {e}");
vec![]
});
for tx_id in expired_txs {
info!(event = "expired_tx", hash = hex::encode(tx_id));
let deleted_txs = db.delete_mempool_tx(tx_id, true).unwrap_or_else(|e| {
error!("cannot delete expired tx: {e}");
vec![]
});
for deleted_tx_id in deleted_txs{
let event = TransactionEvent::Removed(deleted_tx_id);
info!(event = "mempool_deleted", hash = hex::encode(deleted_tx_id));
if let Err(e) = self.event_sender.try_send(event.into()) {
warn!("cannot notify mempool removed transaction {e}")
};
}
}
Ok(())
})?;
},
msg = self.inbound.recv() => {
if let Ok(msg) = msg {
match &msg.payload {
Payload::Transaction(tx) => {
let accept = self.accept_tx(&db, &vm, tx);
if let Err(e) = accept.await {
error!("Tx {} not accepted: {e}", hex::encode(tx.id()));
continue;
}
let network = network.read().await;
if let Err(e) = network.broadcast(&msg).await {
warn!("Unable to broadcast accepted tx: {e}")
};
}
_ => error!("invalid inbound message payload"),
}
}
}
}
}
}
fn name(&self) -> &'static str {
"mempool"
}
}
impl MempoolSrv {
async fn accept_tx<DB: database::DB, VM: vm::VMExecution>(
&mut self,
db: &Arc<RwLock<DB>>,
vm: &Arc<RwLock<VM>>,
tx: &Transaction,
) -> Result<(), TxAcceptanceError> {
let max_mempool_txn_count = self.conf.max_mempool_txn_count;
let events =
MempoolSrv::check_tx(db, vm, tx, false, max_mempool_txn_count)
.await?;
tracing::info!(
event = "transaction accepted",
hash = hex::encode(tx.id())
);
for tx_event in events {
let node_event = tx_event.into();
if let Err(e) = self.event_sender.try_send(node_event) {
warn!("cannot notify mempool accepted transaction {e}")
};
}
Ok(())
}
pub async fn check_tx<'t, DB: database::DB, VM: vm::VMExecution>(
db: &Arc<RwLock<DB>>,
vm: &Arc<RwLock<VM>>,
tx: &'t Transaction,
dry_run: bool,
max_mempool_txn_count: usize,
) -> Result<Vec<TransactionEvent<'t>>, TxAcceptanceError> {
let tx_id = tx.id();
let tx_size = tx.size();
let min_header_size = Header::default().size();
let max_tx_size = MAX_BLOCK_SIZE - min_header_size;
if tx_size > max_tx_size {
return Err(TxAcceptanceError::MaxSizeExceeded(tx_size));
}
check_tx_serialization(&tx.inner)?;
if tx.gas_price() < 1 {
return Err(TxAcceptanceError::GasPriceTooLow(1));
}
let tip_height = db
.read()
.await
.view(|db| db.latest_block())
.map_err(|e| {
anyhow!("Cannot get tip block height from the database: {e}")
})?
.header
.height;
{
let vm = vm.read().await;
let disable_wasm_32 = vm.wasm32_disabled(tip_height);
let disable_wasm_64 = vm.wasm64_disabled(tip_height);
let disable_3rd_party = vm.third_party_disabled(tip_height);
if let Some(_contract_deploy) = tx.inner.deploy() {
match (disable_wasm_32, disable_wasm_64) {
(true, true) => {
Err(TxAcceptanceError::Generic(anyhow::anyhow!(
"contract deployment is not enabled in the VM"
)))
}
_ => Ok(()),
}?
}
if disable_3rd_party {
if let Some(call) = tx.inner.call() {
if call.contract != TRANSFER_CONTRACT
&& call.contract != STAKE_CONTRACT
{
Err(TxAcceptanceError::Generic(anyhow::anyhow!(
"3rd party contracts are not enabled in the VM"
)))?;
}
}
}
if tx.inner.deploy().is_some() {
let min_deployment_gas_price = vm.min_deployment_gas_price();
let gas_per_deploy_byte = vm.gas_per_deploy_byte();
let min_deploy_points = vm.min_deploy_points();
tx.inner.deploy_check(
gas_per_deploy_byte,
min_deployment_gas_price,
min_deploy_points,
)?;
}
if tx.inner.blob().is_some() {
if !vm.blob_active(tip_height) {
return Err(TxAcceptanceError::Generic(anyhow::anyhow!(
"blobs are not enabled in the VM"
)));
}
let gas_per_blob = vm.gas_per_blob();
tx.inner.blob_check(gas_per_blob)?;
dusk_consensus::validate_blob_sidecars(tx)?;
}
let min_gas_limit = vm.min_gas_limit();
if tx.inner.gas_limit() < min_gas_limit {
return Err(TxAcceptanceError::GasLimitTooLow(min_gas_limit));
}
}
let tx_to_delete = db.read().await.view(|view| {
if view.mempool_tx_exists(tx_id)? {
return Err(TxAcceptanceError::AlreadyExistsInMempool);
}
if view.ledger_tx_exists(&tx_id)? {
return Err(TxAcceptanceError::AlreadyExistsInLedger);
}
let txs_count = view.mempool_txs_count();
if txs_count >= max_mempool_txn_count {
let (lowest_price, to_delete) = view
.mempool_txs_ids_sorted_by_low_fee()
.next()
.ok_or(anyhow::anyhow!("Cannot get lowest fee tx"))?;
if tx.gas_price() < lowest_price {
Err(TxAcceptanceError::MaxTxnCountExceeded(
max_mempool_txn_count,
))
} else {
Ok(Some(to_delete))
}
} else {
Ok(None)
}
})?;
let preverification_data =
vm.read().await.preverify(tx).map_err(|e| {
TxAcceptanceError::VerificationFailed(format!("{e}"))
})?;
if let PreverificationResult::FutureNonce {
account,
state,
nonce_used,
} = preverification_data
{
db.read().await.view(|db| {
for nonce in state.nonce + 1..nonce_used {
let spending_id = SpendingId::AccountNonce(account, nonce);
if db
.mempool_txs_by_spendable_ids(&[spending_id])
.is_empty()
{
return Err(TxAcceptanceError::VerificationFailed(
format!("Missing intermediate nonce {nonce}"),
));
}
}
Ok(())
})?;
}
let mut events = vec![];
db.read().await.update_dry_run(dry_run, |db| {
let spend_ids = tx.to_spend_ids();
let mut replaced = false;
for m_tx_id in db.mempool_txs_by_spendable_ids(&spend_ids) {
if let Some(m_tx) = db.mempool_tx(m_tx_id)? {
if m_tx.inner.gas_price() < tx.inner.gas_price()
|| (m_tx.inner.gas_price() == tx.inner.gas_price()
&& m_tx.inner.gas_limit() < tx.inner.gas_limit())
{
for deleted in db.delete_mempool_tx(m_tx_id, false)? {
events.push(TransactionEvent::Removed(deleted));
replaced = true;
}
} else {
return Err(
TxAcceptanceError::SpendIdExistsInMempool.into()
);
}
}
}
events.push(TransactionEvent::Included(tx));
if !replaced {
if let Some(to_delete) = tx_to_delete {
for deleted in db.delete_mempool_tx(to_delete, true)? {
events.push(TransactionEvent::Removed(deleted));
}
}
}
let now = get_current_timestamp();
db.store_mempool_tx(tx, now)
})?;
Ok(events)
}
async fn request_mempool<N: Network>(&self, network: &Arc<RwLock<N>>) {
const WAIT_TIMEOUT: Duration = Duration::from_secs(5);
let max_peers = self
.conf
.mempool_download_redundancy
.unwrap_or(DEFAULT_DOWNLOAD_REDUNDANCY);
let net = network.read().await;
net.wait_for_alive_nodes(max_peers, WAIT_TIMEOUT).await;
let msg = payload::GetMempool::default().into();
if let Err(err) = net.send_to_alive_peers(msg, max_peers).await {
error!("could not request mempool from network: {err}");
}
}
}
fn check_tx_serialization(
tx: &dusk_core::transfer::Transaction,
) -> Result<(), TxAcceptanceError> {
const SCRATCH_BUF_BYTES: usize = 1024;
const ARGBUF_LEN: usize = 64 * 1024;
let stripped_tx = tx.strip_off_bytecode().or(tx.blob_to_memo());
let mut sbuf = [0u8; SCRATCH_BUF_BYTES];
let mut buffer = [0u8; ARGBUF_LEN];
let scratch = BufferScratch::new(&mut sbuf);
let ser = BufferSerializer::new(&mut buffer);
let mut ser = CompositeSerializer::new(ser, scratch, Infallible);
if let Err(err) = ser.serialize_value(stripped_tx.as_ref().unwrap_or(tx)) {
match err {
CompositeSerializerError::SerializerError(err) => match err {
BufferSerializerError::Overflow { .. } => {
return Err(TxAcceptanceError::TooLarge);
}
},
err => return Err(TxAcceptanceError::Generic(anyhow!("{err}"))),
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use dusk_core::signatures::bls::{PublicKey, SecretKey};
use rand::rngs::StdRng;
use rand::Rng;
use rand::{CryptoRng, RngCore, SeedableRng};
use wallet_core::transaction::moonlight_deployment;
use super::*;
fn new_moonlight_deploy_tx<R: RngCore + CryptoRng>(
rng: &mut R,
bytecode: Vec<u8>,
init_args: Vec<u8>,
) -> dusk_core::transfer::Transaction {
const CHAIN_ID: u8 = 0xfa;
let sk = SecretKey::random(rng);
let pk = PublicKey::from(&SecretKey::random(rng));
let gas_limit: u64 = rng.gen();
let gas_price: u64 = rng.gen();
let nonce: u64 = rng.gen();
let deploy_nonce: u64 = rng.gen();
moonlight_deployment(
&sk,
bytecode,
&pk,
init_args,
gas_limit,
gas_price,
nonce,
deploy_nonce,
CHAIN_ID,
)
.expect("should create a transaction")
}
const MAX_MOONLIGHT_ARG_SIZE: usize = 64 * 1024 - 2320;
#[test]
fn test_tx_serialization_check_normal() {
let mut rng = StdRng::seed_from_u64(42);
let tx = new_moonlight_deploy_tx(
&mut rng,
vec![0; 64 * 1024],
vec![0; MAX_MOONLIGHT_ARG_SIZE],
);
let result = check_tx_serialization(&tx);
assert!(matches!(result, Ok(())));
}
#[test]
fn test_tx_serialization_check_tx_too_large() {
let mut rng = StdRng::seed_from_u64(42);
let tx = new_moonlight_deploy_tx(
&mut rng,
vec![0; 64 * 1024],
vec![0; MAX_MOONLIGHT_ARG_SIZE + 1],
);
let result = check_tx_serialization(&tx);
assert!(matches!(result, Err(TxAcceptanceError::TooLarge)));
}
}