1pub mod conf;
8
9use std::sync::Arc;
10use std::time::Duration;
11
12use async_trait::async_trait;
13use conf::{
14 DEFAULT_DOWNLOAD_REDUNDANCY, DEFAULT_EXPIRY_TIME, DEFAULT_IDLE_INTERVAL,
15};
16use node_data::events::{Event, TransactionEvent};
17use node_data::get_current_timestamp;
18use node_data::ledger::{SpendingId, Transaction};
19use node_data::message::{payload, AsyncQueue, Payload, Topics};
20use thiserror::Error;
21use tokio::sync::mpsc::Sender;
22use tokio::sync::RwLock;
23use tracing::{error, info, warn};
24
25use crate::database::{Ledger, Mempool};
26use crate::mempool::conf::Params;
27use crate::vm::PreverificationResult;
28use crate::{database, vm, LongLivedService, Message, Network};
29
30const TOPICS: &[u8] = &[Topics::Tx as u8];
31
32#[derive(Debug, Error)]
33pub enum TxAcceptanceError {
34 #[error("this transaction exists in the mempool")]
35 AlreadyExistsInMempool,
36 #[error("this transaction exists in the ledger")]
37 AlreadyExistsInLedger,
38 #[error("this transaction's spendId exists in the mempool")]
39 SpendIdExistsInMempool,
40 #[error("this transaction is invalid {0}")]
41 VerificationFailed(String),
42 #[error("gas price lower than minimum {0}")]
43 GasPriceTooLow(u64),
44 #[error("gas limit lower than minimum {0}")]
45 GasLimitTooLow(u64),
46 #[error("Maximum count of transactions exceeded {0}")]
47 MaxTxnCountExceeded(usize),
48 #[error("A generic error occurred {0}")]
49 Generic(anyhow::Error),
50}
51
52impl From<anyhow::Error> for TxAcceptanceError {
53 fn from(err: anyhow::Error) -> Self {
54 Self::Generic(err)
55 }
56}
57
58pub struct MempoolSrv {
59 inbound: AsyncQueue<Message>,
60 conf: Params,
61 event_sender: Sender<Event>,
63}
64
65impl MempoolSrv {
66 pub fn new(conf: Params, event_sender: Sender<Event>) -> Self {
67 info!("MempoolSrv::new with conf {}", conf);
68 Self {
69 inbound: AsyncQueue::bounded(
70 conf.max_queue_size,
71 "mempool_inbound",
72 ),
73 conf,
74 event_sender,
75 }
76 }
77}
78
79#[async_trait]
80impl<N: Network, DB: database::DB, VM: vm::VMExecution>
81 LongLivedService<N, DB, VM> for MempoolSrv
82{
83 async fn execute(
84 &mut self,
85 network: Arc<RwLock<N>>,
86 db: Arc<RwLock<DB>>,
87 vm: Arc<RwLock<VM>>,
88 ) -> anyhow::Result<usize> {
89 LongLivedService::<N, DB, VM>::add_routes(
90 self,
91 TOPICS,
92 self.inbound.clone(),
93 &network,
94 )
95 .await?;
96
97 self.request_mempool(&network).await;
99
100 let idle_interval =
101 self.conf.idle_interval.unwrap_or(DEFAULT_IDLE_INTERVAL);
102
103 let mempool_expiry = self
104 .conf
105 .mempool_expiry
106 .unwrap_or(DEFAULT_EXPIRY_TIME)
107 .as_secs();
108
109 let mut on_idle_event = tokio::time::interval(idle_interval);
111 loop {
112 tokio::select! {
113 biased;
114 _ = on_idle_event.tick() => {
115 info!(event = "mempool_idle", interval = ?idle_interval);
116
117 let expiration_time = get_current_timestamp()
118 .checked_sub(mempool_expiry)
119 .expect("valid duration");
120
121 db.read().await.update(|db| {
123 let expired_txs = db.mempool_expired_txs(expiration_time).unwrap_or_else(|e| {
124 error!("cannot get expired txs: {e}");
125 vec![]
126 });
127 for tx_id in expired_txs {
128 info!(event = "expired_tx", hash = hex::encode(tx_id));
129 let deleted_txs = db.delete_mempool_tx(tx_id, true).unwrap_or_else(|e| {
130 error!("cannot delete expired tx: {e}");
131 vec![]
132 });
133 for deleted_tx_id in deleted_txs{
134 let event = TransactionEvent::Removed(deleted_tx_id);
135 info!(event = "mempool_deleted", hash = hex::encode(deleted_tx_id));
136 if let Err(e) = self.event_sender.try_send(event.into()) {
137 warn!("cannot notify mempool removed transaction {e}")
138 };
139 }
140 }
141 Ok(())
142 })?;
143
144 },
145 msg = self.inbound.recv() => {
146 if let Ok(msg) = msg {
147 match &msg.payload {
148 Payload::Transaction(tx) => {
149 let accept = self.accept_tx(&db, &vm, tx);
150 if let Err(e) = accept.await {
151 error!("Tx {} not accepted: {e}", hex::encode(tx.id()));
152 continue;
153 }
154
155 let network = network.read().await;
156 if let Err(e) = network.broadcast(&msg).await {
157 warn!("Unable to broadcast accepted tx: {e}")
158 };
159 }
160 _ => error!("invalid inbound message payload"),
161 }
162 }
163 }
164 }
165 }
166 }
167
168 fn name(&self) -> &'static str {
170 "mempool"
171 }
172}
173
174impl MempoolSrv {
175 async fn accept_tx<DB: database::DB, VM: vm::VMExecution>(
176 &mut self,
177 db: &Arc<RwLock<DB>>,
178 vm: &Arc<RwLock<VM>>,
179 tx: &Transaction,
180 ) -> Result<(), TxAcceptanceError> {
181 let max_mempool_txn_count = self.conf.max_mempool_txn_count;
182
183 let events =
184 MempoolSrv::check_tx(db, vm, tx, false, max_mempool_txn_count)
185 .await?;
186
187 tracing::info!(
188 event = "transaction accepted",
189 hash = hex::encode(tx.id())
190 );
191
192 for tx_event in events {
193 let node_event = tx_event.into();
194 if let Err(e) = self.event_sender.try_send(node_event) {
195 warn!("cannot notify mempool accepted transaction {e}")
196 };
197 }
198
199 Ok(())
200 }
201
202 pub async fn check_tx<'t, DB: database::DB, VM: vm::VMExecution>(
203 db: &Arc<RwLock<DB>>,
204 vm: &Arc<RwLock<VM>>,
205 tx: &'t Transaction,
206 dry_run: bool,
207 max_mempool_txn_count: usize,
208 ) -> Result<Vec<TransactionEvent<'t>>, TxAcceptanceError> {
209 let tx_id = tx.id();
210
211 if tx.gas_price() < 1 {
212 return Err(TxAcceptanceError::GasPriceTooLow(1));
213 }
214
215 if tx.inner.deploy().is_some() {
216 let vm = vm.read().await;
217 let min_deployment_gas_price = vm.min_deployment_gas_price();
218 if tx.gas_price() < min_deployment_gas_price {
219 return Err(TxAcceptanceError::GasPriceTooLow(
220 min_deployment_gas_price,
221 ));
222 }
223
224 let gas_per_deploy_byte = vm.gas_per_deploy_byte();
225 let deploy_charge = tx
226 .inner
227 .deploy_charge(gas_per_deploy_byte, vm.min_deploy_points());
228 if tx.inner.gas_limit() < deploy_charge {
229 return Err(TxAcceptanceError::GasLimitTooLow(deploy_charge));
230 }
231 } else {
232 let vm = vm.read().await;
233 let min_gas_limit = vm.min_gas_limit();
234 if tx.inner.gas_limit() < min_gas_limit {
235 return Err(TxAcceptanceError::GasLimitTooLow(min_gas_limit));
236 }
237 }
238
239 let tx_to_delete = db.read().await.view(|view| {
241 if view.mempool_tx_exists(tx_id)? {
243 return Err(TxAcceptanceError::AlreadyExistsInMempool);
244 }
245
246 if view.ledger_tx_exists(&tx_id)? {
248 return Err(TxAcceptanceError::AlreadyExistsInLedger);
249 }
250
251 let txs_count = view.mempool_txs_count();
252 if txs_count >= max_mempool_txn_count {
253 let (lowest_price, to_delete) = view
255 .mempool_txs_ids_sorted_by_low_fee()?
256 .next()
257 .ok_or(anyhow::anyhow!("Cannot get lowest fee tx"))?;
258
259 if tx.gas_price() < lowest_price {
260 Err(TxAcceptanceError::MaxTxnCountExceeded(
263 max_mempool_txn_count,
264 ))
265 } else {
266 Ok(Some(to_delete))
267 }
268 } else {
269 Ok(None)
270 }
271 })?;
272
273 let preverification_data =
275 vm.read().await.preverify(tx).map_err(|e| {
276 TxAcceptanceError::VerificationFailed(format!("{e:?}"))
277 })?;
278
279 if let PreverificationResult::FutureNonce {
280 account,
281 state,
282 nonce_used,
283 } = preverification_data
284 {
285 db.read().await.view(|db| {
286 for nonce in state.nonce + 1..nonce_used {
287 let spending_id = SpendingId::AccountNonce(account, nonce);
288 if db
289 .mempool_txs_by_spendable_ids(&[spending_id])
290 .is_empty()
291 {
292 return Err(TxAcceptanceError::VerificationFailed(
293 format!("Missing intermediate nonce {nonce}"),
294 ));
295 }
296 }
297 Ok(())
298 })?;
299 }
300
301 let mut events = vec![];
302
303 db.read().await.update_dry_run(dry_run, |db| {
305 let spend_ids = tx.to_spend_ids();
306
307 let mut replaced = false;
308 for m_tx_id in db.mempool_txs_by_spendable_ids(&spend_ids) {
310 if let Some(m_tx) = db.mempool_tx(m_tx_id)? {
311 if m_tx.inner.gas_price() < tx.inner.gas_price() {
312 for deleted in db.delete_mempool_tx(m_tx_id, false)? {
313 events.push(TransactionEvent::Removed(deleted));
314 replaced = true;
315 }
316 } else {
317 return Err(
318 TxAcceptanceError::SpendIdExistsInMempool.into()
319 );
320 }
321 }
322 }
323
324 events.push(TransactionEvent::Included(tx));
325
326 if !replaced {
327 if let Some(to_delete) = tx_to_delete {
328 for deleted in db.delete_mempool_tx(to_delete, true)? {
329 events.push(TransactionEvent::Removed(deleted));
330 }
331 }
332 }
333 let now = get_current_timestamp();
336
337 db.store_mempool_tx(tx, now)
338 })?;
339 Ok(events)
340 }
341
342 async fn request_mempool<N: Network>(&self, network: &Arc<RwLock<N>>) {
347 const WAIT_TIMEOUT: Duration = Duration::from_secs(5);
348 let max_peers = self
349 .conf
350 .mempool_download_redundancy
351 .unwrap_or(DEFAULT_DOWNLOAD_REDUNDANCY);
352
353 let net = network.read().await;
354 net.wait_for_alive_nodes(max_peers, WAIT_TIMEOUT).await;
355
356 let msg = payload::GetMempool::default().into();
357 if let Err(err) = net.send_to_alive_peers(msg, max_peers).await {
358 error!("could not request mempool from network: {err}");
359 }
360 }
361}