Skip to main content

dusk_node/mempool/
prequeue.rs

1// This Source Code Form is subject to the terms of the Mozilla Public
2// License, v. 2.0. If a copy of the MPL was not distributed with this
3// file, You can obtain one at http://mozilla.org/MPL/2.0/.
4//
5// Copyright (c) DUSK NETWORK. All rights reserved.
6
7use std::collections::HashMap;
8use std::sync::Arc;
9use std::time::Duration;
10
11use anyhow::anyhow;
12use node_data::events::{Event, TransactionEvent};
13use node_data::ledger::{LedgerTransaction, SpendingId};
14use node_data::message::{Message, Payload};
15use tokio::sync::mpsc::Sender;
16use tokio::sync::{Mutex as AsyncMutex, RwLock};
17use tokio::time::Instant;
18use tracing::{info, warn};
19
20use super::{MempoolSrv, TxAcceptanceError, should_replace_conflicting_tx};
21use crate::{Network, database, vm};
22
23pub(super) const RETRY_DELAY: Duration = Duration::from_millis(500);
24pub(super) const RETRY_POLL_INTERVAL: Duration = Duration::from_millis(100);
25pub(super) const MAX_RETRIES: usize = 10;
26
27pub(super) type PrequeueKey = (Vec<u8>, u64);
28
29#[derive(Clone, Copy, Debug)]
30pub(super) enum EnqueueOutcome {
31    Deferred,
32    Replaced([u8; 32]),
33    Ignored,
34}
35
36#[derive(Clone, Debug)]
37pub(super) struct PendingPrequeueTx {
38    pub(super) msg: Message,
39    pub(super) key: PrequeueKey,
40    pub(super) queued_at: Instant,
41    next_retry_at: Instant,
42    pub(super) retries_remaining: usize,
43}
44
45#[derive(Debug)]
46struct Prequeue {
47    max_size: usize,
48    max_per_account: usize,
49    pending: HashMap<PrequeueKey, PendingPrequeueTx>,
50    per_account_counts: HashMap<Vec<u8>, usize>,
51}
52
53#[derive(Clone, Debug)]
54pub struct FutureNonceRetryHandle {
55    inner: Arc<AsyncMutex<Prequeue>>,
56}
57
58impl Prequeue {
59    fn new(max_size: usize, max_per_account: usize) -> Self {
60        Self {
61            max_size,
62            max_per_account,
63            pending: HashMap::new(),
64            per_account_counts: HashMap::new(),
65        }
66    }
67
68    fn insert_pending(
69        &mut self,
70        key: PrequeueKey,
71        msg: Message,
72        queued_at: Instant,
73        next_retry_at: Instant,
74        retries_remaining: usize,
75    ) {
76        *self.per_account_counts.entry(key.0.clone()).or_default() += 1;
77        self.pending.insert(
78            key.clone(),
79            PendingPrequeueTx {
80                msg,
81                key,
82                queued_at,
83                next_retry_at,
84                retries_remaining,
85            },
86        );
87    }
88
89    fn enqueue(
90        &mut self,
91        msg: &Message,
92    ) -> Result<EnqueueOutcome, TxAcceptanceError> {
93        let tx = pending_tx(msg).ok_or_else(|| {
94            TxAcceptanceError::Generic(anyhow!(
95                "prequeue only supports Moonlight txs"
96            ))
97        })?;
98        let key = prequeue_key(tx).ok_or_else(|| {
99            TxAcceptanceError::Generic(anyhow!(
100                "prequeue only supports Moonlight txs"
101            ))
102        })?;
103        let tx_id = tx.id();
104
105        if let Some(existing) = self.pending.get_mut(&key) {
106            let existing_tx =
107                pending_tx(&existing.msg).expect("message is a tx");
108            if should_replace_queued_tx(existing_tx, tx) {
109                let replaced = existing_tx.id();
110                existing.msg = msg.clone();
111                existing.queued_at = Instant::now();
112                existing.next_retry_at = Instant::now() + RETRY_DELAY;
113                existing.retries_remaining = MAX_RETRIES;
114                info!(
115                    event = "future_nonce_retry_replaced",
116                    hash = hex::encode(tx_id),
117                    nonce = key.1
118                );
119                return Ok(EnqueueOutcome::Replaced(replaced));
120            }
121            return Ok(EnqueueOutcome::Ignored);
122        }
123
124        let current_per_account = self
125            .per_account_counts
126            .get(&key.0)
127            .copied()
128            .unwrap_or_default();
129        if current_per_account >= self.max_per_account {
130            return Err(
131                TxAcceptanceError::MaxMoonlightFutureNoncePerAccountExceeded(
132                    self.max_per_account,
133                ),
134            );
135        }
136
137        if self.pending.len() >= self.max_size {
138            warn!(
139                hash = hex::encode(tx_id),
140                "future nonce retry queue full, dropping tx"
141            );
142            return Err(TxAcceptanceError::MaxFutureNonceQueueExceeded(
143                self.max_size,
144            ));
145        }
146
147        self.insert_pending(
148            key.clone(),
149            msg.clone(),
150            Instant::now(),
151            Instant::now() + RETRY_DELAY,
152            MAX_RETRIES,
153        );
154        info!(
155            event = "future_nonce_retry_queued",
156            hash = hex::encode(tx_id),
157            nonce = key.1,
158            delay_ms = RETRY_DELAY.as_millis() as u64,
159            retries_remaining = MAX_RETRIES
160        );
161
162        Ok(EnqueueOutcome::Deferred)
163    }
164
165    fn take_due(&mut self, now: Instant) -> Vec<PendingPrequeueTx> {
166        let mut due = Vec::new();
167        self.pending.retain(|_, pending| {
168            if pending.next_retry_at <= now {
169                due.push(pending.clone());
170                false
171            } else {
172                true
173            }
174        });
175
176        for pending in &due {
177            decrement_account_count(
178                &mut self.per_account_counts,
179                &pending.key.0,
180            );
181        }
182
183        due.sort_by(|lhs, rhs| lhs.key.cmp(&rhs.key));
184        due
185    }
186
187    fn take_by_key(&mut self, key: &PrequeueKey) -> Option<PendingPrequeueTx> {
188        let pending = self.pending.remove(key)?;
189        decrement_account_count(&mut self.per_account_counts, &key.0);
190        Some(pending)
191    }
192
193    fn reschedule(&mut self, pending: PendingPrequeueTx, now: Instant) {
194        let tx = pending_tx(&pending.msg).expect("message is a tx");
195        let retries_remaining = pending.retries_remaining - 1;
196        let tx_id = tx.id();
197
198        self.insert_pending(
199            pending.key,
200            pending.msg,
201            pending.queued_at,
202            now + RETRY_DELAY,
203            retries_remaining,
204        );
205        info!(
206            event = "future_nonce_retry_rescheduled",
207            hash = hex::encode(tx_id),
208            retries_remaining,
209            queue_age_ms =
210                now.duration_since(pending.queued_at).as_millis() as u64
211        );
212    }
213}
214
215impl FutureNonceRetryHandle {
216    pub fn new(max_size: usize, max_per_account: usize) -> Self {
217        Self {
218            inner: Arc::new(AsyncMutex::new(Prequeue::new(
219                max_size,
220                max_per_account,
221            ))),
222        }
223    }
224
225    pub async fn enqueue_message_report(
226        &self,
227        msg: &Message,
228    ) -> (Vec<Event>, Result<(), TxAcceptanceError>) {
229        let outcome = self.inner.lock().await.enqueue(msg);
230        let Some(tx) = pending_tx(msg) else {
231            return (vec![], outcome.map(|_| ()));
232        };
233
234        enqueue_outcome_report(tx, outcome)
235    }
236
237    pub(super) async fn enqueue_message_with_outcome(
238        &self,
239        msg: &Message,
240    ) -> Result<EnqueueOutcome, TxAcceptanceError> {
241        self.inner.lock().await.enqueue(msg)
242    }
243
244    pub(super) async fn take_due(
245        &self,
246        now: Instant,
247    ) -> Vec<PendingPrequeueTx> {
248        self.inner.lock().await.take_due(now)
249    }
250
251    pub(super) async fn take_for_account_nonce(
252        &self,
253        account: &[u8],
254        nonce: u64,
255    ) -> Option<PendingPrequeueTx> {
256        let key = (account.to_vec(), nonce);
257        self.inner.lock().await.take_by_key(&key)
258    }
259
260    pub(super) async fn reschedule_message(
261        &self,
262        pending: PendingPrequeueTx,
263        now: Instant,
264    ) {
265        self.inner.lock().await.reschedule(pending, now);
266    }
267}
268
269fn emit_tx_event(event_sender: &Sender<Event>, event: TransactionEvent<'_>) {
270    if let Err(e) = event_sender.try_send(event.into()) {
271        warn!("cannot notify transaction event {e}");
272    }
273}
274
275fn emit_prequeue_dropped(
276    event_sender: &Sender<Event>,
277    tx: &LedgerTransaction,
278    reason: &'static str,
279) {
280    emit_tx_event(event_sender, TransactionEvent::Dropped(tx.id(), reason));
281}
282
283pub(super) fn handle_enqueue_outcome(
284    event_sender: &Sender<Event>,
285    tx: &LedgerTransaction,
286    outcome: Result<EnqueueOutcome, TxAcceptanceError>,
287) -> Result<(), TxAcceptanceError> {
288    let (events, result) = enqueue_outcome_report(tx, outcome);
289    for event in events {
290        if let Err(e) = event_sender.try_send(event) {
291            warn!("cannot notify transaction event {e}");
292        }
293    }
294    result
295}
296
297fn enqueue_outcome_report(
298    tx: &LedgerTransaction,
299    outcome: Result<EnqueueOutcome, TxAcceptanceError>,
300) -> (Vec<Event>, Result<(), TxAcceptanceError>) {
301    let mut events = Vec::new();
302    let result = match outcome {
303        Ok(EnqueueOutcome::Deferred) => {
304            events.push(
305                TransactionEvent::Deferred(
306                    tx.id(),
307                    "missing_intermediate_nonce",
308                )
309                .into(),
310            );
311            Ok(())
312        }
313        Ok(EnqueueOutcome::Replaced(replaced)) => {
314            events.push(
315                TransactionEvent::Dropped(replaced, "replaced_in_prequeue")
316                    .into(),
317            );
318            events.push(
319                TransactionEvent::Deferred(
320                    tx.id(),
321                    "missing_intermediate_nonce",
322                )
323                .into(),
324            );
325            Ok(())
326        }
327        Ok(EnqueueOutcome::Ignored) => {
328            events.push(
329                TransactionEvent::Dropped(tx.id(), "superseded_by_staged_tx")
330                    .into(),
331            );
332            Ok(())
333        }
334        Err(err) => {
335            match &err {
336                TxAcceptanceError::MaxFutureNonceQueueExceeded(_) => {
337                    events.push(
338                        TransactionEvent::Dropped(tx.id(), "prequeue_full")
339                            .into(),
340                    );
341                }
342                TxAcceptanceError::MaxMoonlightFutureNoncePerAccountExceeded(
343                    _,
344                ) => {
345                    events.push(
346                        TransactionEvent::Dropped(
347                            tx.id(),
348                            "prequeue_account_limit",
349                        )
350                        .into(),
351                    );
352                }
353                _ => {}
354            }
355            Err(err)
356        }
357    };
358
359    (events, result)
360}
361
362async fn process_pending_tx<
363    N: Network,
364    DB: database::DB,
365    VM: vm::VMExecution,
366>(
367    ctx: &PrequeueProcessCtx<'_, N, DB, VM>,
368    pending: PendingPrequeueTx,
369    now: Instant,
370    source: &'static str,
371    failure_reason: &'static str,
372) -> bool {
373    let Payload::Transaction(tx) = &pending.msg.payload else {
374        return false;
375    };
376    let queue_age_ms = now.duration_since(pending.queued_at).as_millis() as u64;
377
378    match MempoolSrv::accept_tx(
379        ctx.event_sender,
380        ctx.max_mempool_txn_count,
381        ctx.db,
382        ctx.vm,
383        tx,
384    )
385    .await
386    {
387        Ok(()) => {
388            MempoolSrv::broadcast_accepted_tx(
389                ctx.network,
390                &pending.msg,
391                tx,
392                Some(source),
393                Some(queue_age_ms),
394            )
395            .await;
396            true
397        }
398        Err(TxAcceptanceError::MissingIntermediateNonce(_))
399            if pending.retries_remaining > 1 =>
400        {
401            ctx.prequeue.reschedule_message(pending, now).await;
402            false
403        }
404        Err(err) => {
405            let reason = match err {
406                TxAcceptanceError::MissingIntermediateNonce(_) => {
407                    "retry_exhausted"
408                }
409                _ => failure_reason,
410            };
411            emit_prequeue_dropped(ctx.event_sender, tx, reason);
412            warn!(
413                hash = hex::encode(tx.id()),
414                queue_age_ms, "future nonce {source} dropped: {err}"
415            );
416            false
417        }
418    }
419}
420
421struct PrequeueProcessCtx<'a, N, DB, VM> {
422    prequeue: &'a FutureNonceRetryHandle,
423    event_sender: &'a Sender<Event>,
424    max_mempool_txn_count: usize,
425    network: &'a Arc<RwLock<N>>,
426    db: &'a Arc<RwLock<DB>>,
427    vm: &'a Arc<RwLock<VM>>,
428}
429
430pub(super) async fn process_due_retries<
431    N: Network,
432    DB: database::DB,
433    VM: vm::VMExecution,
434>(
435    prequeue: &FutureNonceRetryHandle,
436    event_sender: &Sender<Event>,
437    max_mempool_txn_count: usize,
438    network: &Arc<RwLock<N>>,
439    db: &Arc<RwLock<DB>>,
440    vm: &Arc<RwLock<VM>>,
441    now: Instant,
442) {
443    let ctx = PrequeueProcessCtx {
444        prequeue,
445        event_sender,
446        max_mempool_txn_count,
447        network,
448        db,
449        vm,
450    };
451    let due = ctx.prequeue.take_due(now).await;
452    if !due.is_empty() {
453        let oldest_queue_age_ms = due
454            .iter()
455            .map(|pending| now.duration_since(pending.queued_at).as_millis())
456            .max()
457            .unwrap_or_default() as u64;
458        info!(
459            event = "future_nonce_retry_due_batch",
460            count = due.len(),
461            oldest_queue_age_ms
462        );
463    }
464
465    for pending in due {
466        let Payload::Transaction(tx) = &pending.msg.payload else {
467            continue;
468        };
469        let accepted_tx = tx.clone();
470        if process_pending_tx(&ctx, pending, now, "retry", "retry_failed").await
471        {
472            drain_unblocked_chain(
473                ctx.prequeue,
474                ctx.event_sender,
475                ctx.max_mempool_txn_count,
476                ctx.network,
477                ctx.db,
478                ctx.vm,
479                &accepted_tx,
480            )
481            .await;
482        }
483    }
484}
485
486pub(super) async fn drain_unblocked_chain<
487    N: Network,
488    DB: database::DB,
489    VM: vm::VMExecution,
490>(
491    prequeue: &FutureNonceRetryHandle,
492    event_sender: &Sender<Event>,
493    max_mempool_txn_count: usize,
494    network: &Arc<RwLock<N>>,
495    db: &Arc<RwLock<DB>>,
496    vm: &Arc<RwLock<VM>>,
497    accepted_tx: &LedgerTransaction,
498) {
499    let ctx = PrequeueProcessCtx {
500        prequeue,
501        event_sender,
502        max_mempool_txn_count,
503        network,
504        db,
505        vm,
506    };
507    let Some(key) = account_nonce_key(accepted_tx) else {
508        return;
509    };
510
511    let mut next_nonce = key.1 + 1;
512    loop {
513        let Some(pending) = ctx
514            .prequeue
515            .take_for_account_nonce(&key.0, next_nonce)
516            .await
517        else {
518            break;
519        };
520        if !process_pending_tx(
521            &ctx,
522            pending,
523            Instant::now(),
524            "contiguous_drain",
525            "contiguous_drain_failed",
526        )
527        .await
528        {
529            break;
530        }
531        next_nonce += 1;
532    }
533}
534
535fn pending_tx(msg: &Message) -> Option<&LedgerTransaction> {
536    let Payload::Transaction(tx) = &msg.payload else {
537        return None;
538    };
539    Some(tx)
540}
541
542fn prequeue_key(tx: &LedgerTransaction) -> Option<PrequeueKey> {
543    let spend_ids = tx.to_spend_ids();
544    let [SpendingId::AccountNonce(account, nonce)] = spend_ids.as_slice()
545    else {
546        return None;
547    };
548    Some((account.to_raw_bytes().to_vec(), *nonce))
549}
550
551pub(super) fn account_nonce_key(tx: &LedgerTransaction) -> Option<PrequeueKey> {
552    prequeue_key(tx)
553}
554
555fn should_replace_queued_tx(
556    existing: &LedgerTransaction,
557    incoming: &LedgerTransaction,
558) -> bool {
559    should_replace_conflicting_tx(existing, incoming)
560}
561
562fn decrement_account_count(
563    per_account_counts: &mut HashMap<Vec<u8>, usize>,
564    account: &[u8],
565) {
566    if let Some(count) = per_account_counts.get_mut(account) {
567        if *count <= 1 {
568            per_account_counts.remove(account);
569        } else {
570            *count -= 1;
571        }
572    }
573}