1use ahash::{HashMap, HashMapExt};
10
11use crate::message::{MessageRead, SignedMessage};
12use crate::message_pool::errors::Error;
13use crate::message_pool::metrics;
14use crate::message_pool::msgpool::{RBF_DENOM, RBF_NUM, TrustPolicy};
15use crate::shim::econ::TokenAmount;
16
17pub(in crate::message_pool) const MAX_NONCE_GAP: u64 = 4;
19
20#[derive(Clone, Copy, Debug)]
22pub struct MsgSetLimits {
23 pub trusted: u64,
25 pub untrusted: u64,
27}
28
29impl MsgSetLimits {
30 pub fn new(trusted: u64, untrusted: u64) -> Self {
31 Self { trusted, untrusted }
32 }
33}
34
35#[derive(Clone, Copy, Debug, PartialEq, Eq)]
37pub enum StrictnessPolicy {
38 Strict,
39 Relaxed,
40}
41
42#[derive(Clone, Default, Debug)]
45pub struct MsgSet {
46 pub(in crate::message_pool) msgs: HashMap<u64, SignedMessage>,
47 pub(in crate::message_pool) next_sequence: u64,
48}
49
50impl MsgSet {
51 pub fn new(sequence: u64) -> Self {
54 MsgSet {
55 msgs: HashMap::new(),
56 next_sequence: sequence,
57 }
58 }
59
60 pub(in crate::message_pool) fn add(
74 &mut self,
75 m: SignedMessage,
76 strictness: StrictnessPolicy,
77 trust: TrustPolicy,
78 limits: MsgSetLimits,
79 ) -> Result<(), Error> {
80 let strict = matches!(strictness, StrictnessPolicy::Strict);
81 let trusted = matches!(trust, TrustPolicy::Trusted);
82 let max_nonce_gap: u64 = if trusted { MAX_NONCE_GAP } else { 0 };
83 let max_actor_pending_messages = if trusted {
84 limits.trusted
85 } else {
86 limits.untrusted
87 };
88
89 let mut next_nonce = self.next_sequence;
90 let nonce_gap = if m.sequence() == next_nonce {
91 next_nonce += 1;
92 while self.msgs.contains_key(&next_nonce) {
93 next_nonce += 1;
94 }
95 false
96 } else if strict && m.sequence() > next_nonce + max_nonce_gap {
97 tracing::debug!(
98 nonce = m.sequence(),
99 next_nonce,
100 "message nonce has too big a gap from expected nonce"
101 );
102 return Err(Error::NonceGap);
103 } else {
104 m.sequence() > next_nonce
105 };
106
107 let has_existing = if let Some(exms) = self.msgs.get(&m.sequence()) {
108 if strict && nonce_gap {
109 tracing::debug!(
110 nonce = m.sequence(),
111 next_nonce,
112 "rejecting replace by fee because of nonce gap"
113 );
114 return Err(Error::NonceGap);
115 }
116 if m.cid() != exms.cid() {
117 let premium = &exms.message().gas_premium;
118 let min_price = premium.clone()
119 + ((premium * RBF_NUM).div_floor(RBF_DENOM))
120 + TokenAmount::from_atto(1u8);
121 if m.message().gas_premium <= min_price {
122 return Err(Error::GasPriceTooLow);
123 }
124 } else {
125 return Err(Error::DuplicateSequence);
126 }
127 true
128 } else {
129 false
130 };
131
132 if !has_existing && self.msgs.len() as u64 >= max_actor_pending_messages {
134 return Err(Error::TooManyPendingMessages(
135 m.message.from().to_string(),
136 trusted,
137 ));
138 }
139
140 if strict && nonce_gap {
141 tracing::debug!(
142 from = %m.from(),
143 nonce = m.sequence(),
144 next_nonce,
145 "adding nonce-gapped message"
146 );
147 }
148
149 self.next_sequence = next_nonce;
150 if self.msgs.insert(m.sequence(), m).is_none() {
151 metrics::MPOOL_MESSAGE_TOTAL.inc();
152 }
153 Ok(())
154 }
155
156 pub fn rm(&mut self, sequence: u64, applied: bool) -> Option<SignedMessage> {
167 let Some(removed) = self.msgs.remove(&sequence) else {
168 if applied && sequence >= self.next_sequence {
169 self.next_sequence = sequence + 1;
170 while self.msgs.contains_key(&self.next_sequence) {
171 self.next_sequence += 1;
172 }
173 }
174 return None;
175 };
176 metrics::MPOOL_MESSAGE_TOTAL.dec();
177
178 if applied {
180 if sequence >= self.next_sequence {
183 self.next_sequence = sequence + 1;
184 }
185 } else if sequence < self.next_sequence {
186 self.next_sequence = sequence;
189 }
190 Some(removed)
191 }
192}
193
194#[cfg(test)]
195mod tests {
196 use super::*;
197 use crate::shim::address::Address;
198 use crate::shim::econ::TokenAmount;
199 use crate::shim::message::Message as ShimMessage;
200
201 fn make_smsg(from: Address, seq: u64, premium: u64) -> SignedMessage {
202 SignedMessage::mock_bls_signed_message(ShimMessage {
203 from,
204 sequence: seq,
205 gas_premium: TokenAmount::from_atto(premium),
206 gas_limit: 1_000_000,
207 ..ShimMessage::default()
208 })
209 }
210
211 #[test]
214 fn rbf_at_capacity() {
215 let limits = MsgSetLimits::new(10, 10);
216 let mut mset = MsgSet::new(0);
217
218 for i in 0..10 {
220 let res = mset.add(
221 make_smsg(Address::default(), i, 100),
222 StrictnessPolicy::Relaxed,
223 TrustPolicy::Trusted,
224 limits,
225 );
226 assert!(res.is_ok(), "Failed to add message {i}");
227 }
228
229 let res = mset.add(
231 make_smsg(Address::default(), 10, 100),
232 StrictnessPolicy::Relaxed,
233 TrustPolicy::Trusted,
234 limits,
235 );
236 assert!(matches!(res, Err(Error::TooManyPendingMessages(_, _))));
237
238 let res = mset.add(
241 make_smsg(Address::default(), 5, 200),
242 StrictnessPolicy::Relaxed,
243 TrustPolicy::Trusted,
244 limits,
245 );
246 assert!(res.is_ok(), "RBF should be allowed at capacity");
247 }
248
249 #[test]
250 fn gap_filling_advances_next_sequence() {
251 let limits = MsgSetLimits::new(1000, 1000);
252 let mut mset = MsgSet::new(0);
253
254 mset.add(
255 make_smsg(Address::default(), 0, 100),
256 StrictnessPolicy::Relaxed,
257 TrustPolicy::Trusted,
258 limits,
259 )
260 .unwrap();
261 assert_eq!(mset.next_sequence, 1);
262
263 mset.add(
264 make_smsg(Address::default(), 2, 100),
265 StrictnessPolicy::Relaxed,
266 TrustPolicy::Trusted,
267 limits,
268 )
269 .unwrap();
270 assert_eq!(mset.next_sequence, 1, "gap at 1, so next_sequence stays");
271
272 mset.add(
273 make_smsg(Address::default(), 1, 100),
274 StrictnessPolicy::Relaxed,
275 TrustPolicy::Trusted,
276 limits,
277 )
278 .unwrap();
279 assert_eq!(
280 mset.next_sequence, 3,
281 "filling the gap should advance past all consecutive messages"
282 );
283 }
284
285 #[test]
286 fn trusted_allows_any_nonce_gap() {
287 let limits = MsgSetLimits::new(1000, 1000);
288 let mut mset = MsgSet::new(0);
289
290 mset.add(
291 make_smsg(Address::default(), 0, 100),
292 StrictnessPolicy::Relaxed,
293 TrustPolicy::Trusted,
294 limits,
295 )
296 .unwrap();
297 let res = mset.add(
298 make_smsg(Address::default(), 10, 100),
299 StrictnessPolicy::Relaxed,
300 TrustPolicy::Trusted,
301 limits,
302 );
303 assert!(
304 res.is_ok(),
305 "trusted adds skip nonce gap enforcement (StrictnessPolicy::Relaxed)"
306 );
307 }
308
309 #[test]
310 fn strict_allows_small_nonce_gap() {
311 let limits = MsgSetLimits::new(1000, 1000);
312 let mut mset = MsgSet::new(0);
313
314 mset.add(
316 make_smsg(Address::default(), 0, 100),
317 StrictnessPolicy::Strict,
318 TrustPolicy::Trusted,
319 limits,
320 )
321 .unwrap();
322 let res = mset.add(
323 make_smsg(Address::default(), 3, 100),
324 StrictnessPolicy::Strict,
325 TrustPolicy::Trusted,
326 limits,
327 );
328 assert!(
329 res.is_ok(),
330 "strict+trusted: gap of 2 (within MAX_NONCE_GAP=4) should succeed"
331 );
332 }
333
334 #[test]
335 fn strict_rejects_large_nonce_gap() {
336 let limits = MsgSetLimits::new(1000, 1000);
337 let mut mset = MsgSet::new(0);
338
339 mset.add(
341 make_smsg(Address::default(), 0, 100),
342 StrictnessPolicy::Strict,
343 TrustPolicy::Trusted,
344 limits,
345 )
346 .unwrap();
347 let res = mset.add(
348 make_smsg(Address::default(), 6, 100),
349 StrictnessPolicy::Strict,
350 TrustPolicy::Trusted,
351 limits,
352 );
353 assert_eq!(
354 res,
355 Err(Error::NonceGap),
356 "strict+trusted: gap of 5 (exceeds MAX_NONCE_GAP=4) should be rejected"
357 );
358 }
359
360 #[test]
361 fn strict_untrusted_rejects_any_gap() {
362 let limits = MsgSetLimits::new(1000, 1000);
363 let mut mset = MsgSet::new(0);
364
365 mset.add(
367 make_smsg(Address::default(), 0, 100),
368 StrictnessPolicy::Strict,
369 TrustPolicy::Untrusted,
370 limits,
371 )
372 .unwrap();
373 let res = mset.add(
374 make_smsg(Address::default(), 2, 100),
375 StrictnessPolicy::Strict,
376 TrustPolicy::Untrusted,
377 limits,
378 );
379 assert_eq!(
380 res,
381 Err(Error::NonceGap),
382 "strict+untrusted: any gap (maxNonceGap=0) is rejected"
383 );
384 }
385
386 #[test]
387 fn non_strict_untrusted_skips_gap_check() {
388 let limits = MsgSetLimits::new(1000, 1000);
389 let mut mset = MsgSet::new(0);
390
391 mset.add(
393 make_smsg(Address::default(), 0, 100),
394 StrictnessPolicy::Relaxed,
395 TrustPolicy::Untrusted,
396 limits,
397 )
398 .unwrap();
399 let res = mset.add(
400 make_smsg(Address::default(), 5, 100),
401 StrictnessPolicy::Relaxed,
402 TrustPolicy::Untrusted,
403 limits,
404 );
405 assert!(
406 res.is_ok(),
407 "non-strict untrusted (PushUntrusted) skips gap enforcement"
408 );
409 }
410
411 #[test]
412 fn strict_rbf_during_gap_rejected() {
413 let limits = MsgSetLimits::new(1000, 1000);
414 let mut mset = MsgSet::new(0);
415
416 mset.add(
418 make_smsg(Address::default(), 0, 100),
419 StrictnessPolicy::Relaxed,
420 TrustPolicy::Trusted,
421 limits,
422 )
423 .unwrap();
424 mset.add(
425 make_smsg(Address::default(), 2, 100),
426 StrictnessPolicy::Relaxed,
427 TrustPolicy::Trusted,
428 limits,
429 )
430 .unwrap();
431
432 let res = mset.add(
434 make_smsg(Address::default(), 2, 200),
435 StrictnessPolicy::Strict,
436 TrustPolicy::Trusted,
437 limits,
438 );
439 assert_eq!(
440 res,
441 Err(Error::NonceGap),
442 "strict RBF should be rejected when nonce gap exists"
443 );
444 }
445
446 #[test]
447 fn rbf_without_gap_still_works() {
448 let limits = MsgSetLimits::new(1000, 1000);
449 let mut mset = MsgSet::new(0);
450
451 mset.add(
452 make_smsg(Address::default(), 0, 100),
453 StrictnessPolicy::Relaxed,
454 TrustPolicy::Trusted,
455 limits,
456 )
457 .unwrap();
458 mset.add(
459 make_smsg(Address::default(), 1, 100),
460 StrictnessPolicy::Relaxed,
461 TrustPolicy::Trusted,
462 limits,
463 )
464 .unwrap();
465 mset.add(
466 make_smsg(Address::default(), 2, 100),
467 StrictnessPolicy::Relaxed,
468 TrustPolicy::Trusted,
469 limits,
470 )
471 .unwrap();
472
473 let res = mset.add(
474 make_smsg(Address::default(), 1, 200),
475 StrictnessPolicy::Relaxed,
476 TrustPolicy::Trusted,
477 limits,
478 );
479 assert!(res.is_ok(), "RBF without a nonce gap should succeed");
480 }
481
482 #[test]
483 fn rm_applied_advances_next_sequence() {
484 let limits = MsgSetLimits::new(1000, 1000);
485 let mut mset = MsgSet::new(0);
486
487 mset.add(
488 make_smsg(Address::default(), 0, 100),
489 StrictnessPolicy::Relaxed,
490 TrustPolicy::Trusted,
491 limits,
492 )
493 .unwrap();
494 assert_eq!(mset.next_sequence, 1);
495
496 mset.rm(0, true);
498 assert_eq!(
499 mset.next_sequence, 1,
500 "applied rm at seq < next_sequence does not advance further"
501 );
502
503 mset.rm(5, true);
505 assert_eq!(
506 mset.next_sequence, 6,
507 "applied rm of unknown seq >= next_sequence advances to seq+1"
508 );
509 }
510
511 #[test]
512 fn rm_pruned_rewinds_next_sequence_on_gap() {
513 let limits = MsgSetLimits::new(1000, 1000);
514 let mut mset = MsgSet::new(0);
515
516 for i in 0..3 {
518 mset.add(
519 make_smsg(Address::default(), i, 100),
520 StrictnessPolicy::Relaxed,
521 TrustPolicy::Trusted,
522 limits,
523 )
524 .unwrap();
525 }
526 assert_eq!(mset.next_sequence, 3);
527
528 mset.rm(1, false);
530 assert_eq!(
531 mset.next_sequence, 1,
532 "pruned rm creating a gap rewinds next_sequence"
533 );
534 }
535}