1use std::collections::HashMap;
8use std::sync::Arc;
9use std::time::Duration;
10
11use anyhow::anyhow;
12use node_data::events::{Event, TransactionEvent};
13use node_data::ledger::{LedgerTransaction, SpendingId};
14use node_data::message::{Message, Payload};
15use tokio::sync::mpsc::Sender;
16use tokio::sync::{Mutex as AsyncMutex, RwLock};
17use tokio::time::Instant;
18use tracing::{info, warn};
19
20use super::{MempoolSrv, TxAcceptanceError, should_replace_conflicting_tx};
21use crate::{Network, database, vm};
22
23pub(super) const RETRY_DELAY: Duration = Duration::from_millis(500);
24pub(super) const RETRY_POLL_INTERVAL: Duration = Duration::from_millis(100);
25pub(super) const MAX_RETRIES: usize = 10;
26
27pub(super) type PrequeueKey = (Vec<u8>, u64);
28
29#[derive(Clone, Copy, Debug)]
30pub(super) enum EnqueueOutcome {
31 Deferred,
32 Replaced([u8; 32]),
33 Ignored,
34}
35
36#[derive(Clone, Debug)]
37pub(super) struct PendingPrequeueTx {
38 pub(super) msg: Message,
39 pub(super) key: PrequeueKey,
40 pub(super) queued_at: Instant,
41 next_retry_at: Instant,
42 pub(super) retries_remaining: usize,
43}
44
45#[derive(Debug)]
46struct Prequeue {
47 max_size: usize,
48 max_per_account: usize,
49 pending: HashMap<PrequeueKey, PendingPrequeueTx>,
50 per_account_counts: HashMap<Vec<u8>, usize>,
51}
52
53#[derive(Clone, Debug)]
54pub struct FutureNonceRetryHandle {
55 inner: Arc<AsyncMutex<Prequeue>>,
56}
57
58impl Prequeue {
59 fn new(max_size: usize, max_per_account: usize) -> Self {
60 Self {
61 max_size,
62 max_per_account,
63 pending: HashMap::new(),
64 per_account_counts: HashMap::new(),
65 }
66 }
67
68 fn insert_pending(
69 &mut self,
70 key: PrequeueKey,
71 msg: Message,
72 queued_at: Instant,
73 next_retry_at: Instant,
74 retries_remaining: usize,
75 ) {
76 *self.per_account_counts.entry(key.0.clone()).or_default() += 1;
77 self.pending.insert(
78 key.clone(),
79 PendingPrequeueTx {
80 msg,
81 key,
82 queued_at,
83 next_retry_at,
84 retries_remaining,
85 },
86 );
87 }
88
89 fn enqueue(
90 &mut self,
91 msg: &Message,
92 ) -> Result<EnqueueOutcome, TxAcceptanceError> {
93 let tx = pending_tx(msg).ok_or_else(|| {
94 TxAcceptanceError::Generic(anyhow!(
95 "prequeue only supports Moonlight txs"
96 ))
97 })?;
98 let key = prequeue_key(tx).ok_or_else(|| {
99 TxAcceptanceError::Generic(anyhow!(
100 "prequeue only supports Moonlight txs"
101 ))
102 })?;
103 let tx_id = tx.id();
104
105 if let Some(existing) = self.pending.get_mut(&key) {
106 let existing_tx =
107 pending_tx(&existing.msg).expect("message is a tx");
108 if should_replace_queued_tx(existing_tx, tx) {
109 let replaced = existing_tx.id();
110 existing.msg = msg.clone();
111 existing.queued_at = Instant::now();
112 existing.next_retry_at = Instant::now() + RETRY_DELAY;
113 existing.retries_remaining = MAX_RETRIES;
114 info!(
115 event = "future_nonce_retry_replaced",
116 hash = hex::encode(tx_id),
117 nonce = key.1
118 );
119 return Ok(EnqueueOutcome::Replaced(replaced));
120 }
121 return Ok(EnqueueOutcome::Ignored);
122 }
123
124 let current_per_account = self
125 .per_account_counts
126 .get(&key.0)
127 .copied()
128 .unwrap_or_default();
129 if current_per_account >= self.max_per_account {
130 return Err(
131 TxAcceptanceError::MaxMoonlightFutureNoncePerAccountExceeded(
132 self.max_per_account,
133 ),
134 );
135 }
136
137 if self.pending.len() >= self.max_size {
138 warn!(
139 hash = hex::encode(tx_id),
140 "future nonce retry queue full, dropping tx"
141 );
142 return Err(TxAcceptanceError::MaxFutureNonceQueueExceeded(
143 self.max_size,
144 ));
145 }
146
147 self.insert_pending(
148 key.clone(),
149 msg.clone(),
150 Instant::now(),
151 Instant::now() + RETRY_DELAY,
152 MAX_RETRIES,
153 );
154 info!(
155 event = "future_nonce_retry_queued",
156 hash = hex::encode(tx_id),
157 nonce = key.1,
158 delay_ms = RETRY_DELAY.as_millis() as u64,
159 retries_remaining = MAX_RETRIES
160 );
161
162 Ok(EnqueueOutcome::Deferred)
163 }
164
165 fn take_due(&mut self, now: Instant) -> Vec<PendingPrequeueTx> {
166 let mut due = Vec::new();
167 self.pending.retain(|_, pending| {
168 if pending.next_retry_at <= now {
169 due.push(pending.clone());
170 false
171 } else {
172 true
173 }
174 });
175
176 for pending in &due {
177 decrement_account_count(
178 &mut self.per_account_counts,
179 &pending.key.0,
180 );
181 }
182
183 due.sort_by(|lhs, rhs| lhs.key.cmp(&rhs.key));
184 due
185 }
186
187 fn take_by_key(&mut self, key: &PrequeueKey) -> Option<PendingPrequeueTx> {
188 let pending = self.pending.remove(key)?;
189 decrement_account_count(&mut self.per_account_counts, &key.0);
190 Some(pending)
191 }
192
193 fn reschedule(&mut self, pending: PendingPrequeueTx, now: Instant) {
194 let tx = pending_tx(&pending.msg).expect("message is a tx");
195 let retries_remaining = pending.retries_remaining - 1;
196 let tx_id = tx.id();
197
198 self.insert_pending(
199 pending.key,
200 pending.msg,
201 pending.queued_at,
202 now + RETRY_DELAY,
203 retries_remaining,
204 );
205 info!(
206 event = "future_nonce_retry_rescheduled",
207 hash = hex::encode(tx_id),
208 retries_remaining,
209 queue_age_ms =
210 now.duration_since(pending.queued_at).as_millis() as u64
211 );
212 }
213}
214
215impl FutureNonceRetryHandle {
216 pub fn new(max_size: usize, max_per_account: usize) -> Self {
217 Self {
218 inner: Arc::new(AsyncMutex::new(Prequeue::new(
219 max_size,
220 max_per_account,
221 ))),
222 }
223 }
224
225 pub async fn enqueue_message_report(
226 &self,
227 msg: &Message,
228 ) -> (Vec<Event>, Result<(), TxAcceptanceError>) {
229 let outcome = self.inner.lock().await.enqueue(msg);
230 let Some(tx) = pending_tx(msg) else {
231 return (vec![], outcome.map(|_| ()));
232 };
233
234 enqueue_outcome_report(tx, outcome)
235 }
236
237 pub(super) async fn enqueue_message_with_outcome(
238 &self,
239 msg: &Message,
240 ) -> Result<EnqueueOutcome, TxAcceptanceError> {
241 self.inner.lock().await.enqueue(msg)
242 }
243
244 pub(super) async fn take_due(
245 &self,
246 now: Instant,
247 ) -> Vec<PendingPrequeueTx> {
248 self.inner.lock().await.take_due(now)
249 }
250
251 pub(super) async fn take_for_account_nonce(
252 &self,
253 account: &[u8],
254 nonce: u64,
255 ) -> Option<PendingPrequeueTx> {
256 let key = (account.to_vec(), nonce);
257 self.inner.lock().await.take_by_key(&key)
258 }
259
260 pub(super) async fn reschedule_message(
261 &self,
262 pending: PendingPrequeueTx,
263 now: Instant,
264 ) {
265 self.inner.lock().await.reschedule(pending, now);
266 }
267}
268
269fn emit_tx_event(event_sender: &Sender<Event>, event: TransactionEvent<'_>) {
270 if let Err(e) = event_sender.try_send(event.into()) {
271 warn!("cannot notify transaction event {e}");
272 }
273}
274
275fn emit_prequeue_dropped(
276 event_sender: &Sender<Event>,
277 tx: &LedgerTransaction,
278 reason: &'static str,
279) {
280 emit_tx_event(event_sender, TransactionEvent::Dropped(tx.id(), reason));
281}
282
283pub(super) fn handle_enqueue_outcome(
284 event_sender: &Sender<Event>,
285 tx: &LedgerTransaction,
286 outcome: Result<EnqueueOutcome, TxAcceptanceError>,
287) -> Result<(), TxAcceptanceError> {
288 let (events, result) = enqueue_outcome_report(tx, outcome);
289 for event in events {
290 if let Err(e) = event_sender.try_send(event) {
291 warn!("cannot notify transaction event {e}");
292 }
293 }
294 result
295}
296
297fn enqueue_outcome_report(
298 tx: &LedgerTransaction,
299 outcome: Result<EnqueueOutcome, TxAcceptanceError>,
300) -> (Vec<Event>, Result<(), TxAcceptanceError>) {
301 let mut events = Vec::new();
302 let result = match outcome {
303 Ok(EnqueueOutcome::Deferred) => {
304 events.push(
305 TransactionEvent::Deferred(
306 tx.id(),
307 "missing_intermediate_nonce",
308 )
309 .into(),
310 );
311 Ok(())
312 }
313 Ok(EnqueueOutcome::Replaced(replaced)) => {
314 events.push(
315 TransactionEvent::Dropped(replaced, "replaced_in_prequeue")
316 .into(),
317 );
318 events.push(
319 TransactionEvent::Deferred(
320 tx.id(),
321 "missing_intermediate_nonce",
322 )
323 .into(),
324 );
325 Ok(())
326 }
327 Ok(EnqueueOutcome::Ignored) => {
328 events.push(
329 TransactionEvent::Dropped(tx.id(), "superseded_by_staged_tx")
330 .into(),
331 );
332 Ok(())
333 }
334 Err(err) => {
335 match &err {
336 TxAcceptanceError::MaxFutureNonceQueueExceeded(_) => {
337 events.push(
338 TransactionEvent::Dropped(tx.id(), "prequeue_full")
339 .into(),
340 );
341 }
342 TxAcceptanceError::MaxMoonlightFutureNoncePerAccountExceeded(
343 _,
344 ) => {
345 events.push(
346 TransactionEvent::Dropped(
347 tx.id(),
348 "prequeue_account_limit",
349 )
350 .into(),
351 );
352 }
353 _ => {}
354 }
355 Err(err)
356 }
357 };
358
359 (events, result)
360}
361
362async fn process_pending_tx<
363 N: Network,
364 DB: database::DB,
365 VM: vm::VMExecution,
366>(
367 ctx: &PrequeueProcessCtx<'_, N, DB, VM>,
368 pending: PendingPrequeueTx,
369 now: Instant,
370 source: &'static str,
371 failure_reason: &'static str,
372) -> bool {
373 let Payload::Transaction(tx) = &pending.msg.payload else {
374 return false;
375 };
376 let queue_age_ms = now.duration_since(pending.queued_at).as_millis() as u64;
377
378 match MempoolSrv::accept_tx(
379 ctx.event_sender,
380 ctx.max_mempool_txn_count,
381 ctx.db,
382 ctx.vm,
383 tx,
384 )
385 .await
386 {
387 Ok(()) => {
388 MempoolSrv::broadcast_accepted_tx(
389 ctx.network,
390 &pending.msg,
391 tx,
392 Some(source),
393 Some(queue_age_ms),
394 )
395 .await;
396 true
397 }
398 Err(TxAcceptanceError::MissingIntermediateNonce(_))
399 if pending.retries_remaining > 1 =>
400 {
401 ctx.prequeue.reschedule_message(pending, now).await;
402 false
403 }
404 Err(err) => {
405 let reason = match err {
406 TxAcceptanceError::MissingIntermediateNonce(_) => {
407 "retry_exhausted"
408 }
409 _ => failure_reason,
410 };
411 emit_prequeue_dropped(ctx.event_sender, tx, reason);
412 warn!(
413 hash = hex::encode(tx.id()),
414 queue_age_ms, "future nonce {source} dropped: {err}"
415 );
416 false
417 }
418 }
419}
420
421struct PrequeueProcessCtx<'a, N, DB, VM> {
422 prequeue: &'a FutureNonceRetryHandle,
423 event_sender: &'a Sender<Event>,
424 max_mempool_txn_count: usize,
425 network: &'a Arc<RwLock<N>>,
426 db: &'a Arc<RwLock<DB>>,
427 vm: &'a Arc<RwLock<VM>>,
428}
429
430pub(super) async fn process_due_retries<
431 N: Network,
432 DB: database::DB,
433 VM: vm::VMExecution,
434>(
435 prequeue: &FutureNonceRetryHandle,
436 event_sender: &Sender<Event>,
437 max_mempool_txn_count: usize,
438 network: &Arc<RwLock<N>>,
439 db: &Arc<RwLock<DB>>,
440 vm: &Arc<RwLock<VM>>,
441 now: Instant,
442) {
443 let ctx = PrequeueProcessCtx {
444 prequeue,
445 event_sender,
446 max_mempool_txn_count,
447 network,
448 db,
449 vm,
450 };
451 let due = ctx.prequeue.take_due(now).await;
452 if !due.is_empty() {
453 let oldest_queue_age_ms = due
454 .iter()
455 .map(|pending| now.duration_since(pending.queued_at).as_millis())
456 .max()
457 .unwrap_or_default() as u64;
458 info!(
459 event = "future_nonce_retry_due_batch",
460 count = due.len(),
461 oldest_queue_age_ms
462 );
463 }
464
465 for pending in due {
466 let Payload::Transaction(tx) = &pending.msg.payload else {
467 continue;
468 };
469 let accepted_tx = tx.clone();
470 if process_pending_tx(&ctx, pending, now, "retry", "retry_failed").await
471 {
472 drain_unblocked_chain(
473 ctx.prequeue,
474 ctx.event_sender,
475 ctx.max_mempool_txn_count,
476 ctx.network,
477 ctx.db,
478 ctx.vm,
479 &accepted_tx,
480 )
481 .await;
482 }
483 }
484}
485
486pub(super) async fn drain_unblocked_chain<
487 N: Network,
488 DB: database::DB,
489 VM: vm::VMExecution,
490>(
491 prequeue: &FutureNonceRetryHandle,
492 event_sender: &Sender<Event>,
493 max_mempool_txn_count: usize,
494 network: &Arc<RwLock<N>>,
495 db: &Arc<RwLock<DB>>,
496 vm: &Arc<RwLock<VM>>,
497 accepted_tx: &LedgerTransaction,
498) {
499 let ctx = PrequeueProcessCtx {
500 prequeue,
501 event_sender,
502 max_mempool_txn_count,
503 network,
504 db,
505 vm,
506 };
507 let Some(key) = account_nonce_key(accepted_tx) else {
508 return;
509 };
510
511 let mut next_nonce = key.1 + 1;
512 loop {
513 let Some(pending) = ctx
514 .prequeue
515 .take_for_account_nonce(&key.0, next_nonce)
516 .await
517 else {
518 break;
519 };
520 if !process_pending_tx(
521 &ctx,
522 pending,
523 Instant::now(),
524 "contiguous_drain",
525 "contiguous_drain_failed",
526 )
527 .await
528 {
529 break;
530 }
531 next_nonce += 1;
532 }
533}
534
535fn pending_tx(msg: &Message) -> Option<&LedgerTransaction> {
536 let Payload::Transaction(tx) = &msg.payload else {
537 return None;
538 };
539 Some(tx)
540}
541
542fn prequeue_key(tx: &LedgerTransaction) -> Option<PrequeueKey> {
543 let spend_ids = tx.to_spend_ids();
544 let [SpendingId::AccountNonce(account, nonce)] = spend_ids.as_slice()
545 else {
546 return None;
547 };
548 Some((account.to_raw_bytes().to_vec(), *nonce))
549}
550
551pub(super) fn account_nonce_key(tx: &LedgerTransaction) -> Option<PrequeueKey> {
552 prequeue_key(tx)
553}
554
555fn should_replace_queued_tx(
556 existing: &LedgerTransaction,
557 incoming: &LedgerTransaction,
558) -> bool {
559 should_replace_conflicting_tx(existing, incoming)
560}
561
562fn decrement_account_count(
563 per_account_counts: &mut HashMap<Vec<u8>, usize>,
564 account: &[u8],
565) {
566 if let Some(count) = per_account_counts.get_mut(account) {
567 if *count <= 1 {
568 per_account_counts.remove(account);
569 } else {
570 *count -= 1;
571 }
572 }
573}