Skip to main content

forest/message_pool/msgpool/
msg_set.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4//! Per-sender message set.
5//!
6//! [`MsgSet`] owns the pending messages for a single sender address and tracks
7//! the next sequence expected for the gap-filling / replace-by-fee rules.
8
9use ahash::{HashMap, HashMapExt};
10
11use crate::message::{MessageRead, SignedMessage};
12use crate::message_pool::errors::Error;
13use crate::message_pool::metrics;
14use crate::message_pool::msgpool::{RBF_DENOM, RBF_NUM, TrustPolicy};
15use crate::shim::econ::TokenAmount;
16
17/// Maximum allowed nonce gap for trusted message inserts under [`StrictnessPolicy::Strict`].
18pub(in crate::message_pool) const MAX_NONCE_GAP: u64 = 4;
19
20/// Per-actor pending-message limits.
21#[derive(Clone, Copy, Debug)]
22pub struct MsgSetLimits {
23    /// Cap applied when a message is inserted via the trusted path.
24    pub trusted: u64,
25    /// Cap applied when a message is inserted via the untrusted path.
26    pub untrusted: u64,
27}
28
29impl MsgSetLimits {
30    pub fn new(trusted: u64, untrusted: u64) -> Self {
31        Self { trusted, untrusted }
32    }
33}
34
35/// Strictness policy for pending insertion.
36#[derive(Clone, Copy, Debug, PartialEq, Eq)]
37pub enum StrictnessPolicy {
38    Strict,
39    Relaxed,
40}
41
42/// Simple structure that contains a hash-map of messages where k: message nonce,
43/// v: a message which corresponds to that nonce.
44#[derive(Clone, Default, Debug)]
45pub struct MsgSet {
46    pub(in crate::message_pool) msgs: HashMap<u64, SignedMessage>,
47    pub(in crate::message_pool) next_sequence: u64,
48}
49
50impl MsgSet {
51    /// Generate a new `MsgSet` with an empty hash-map and setting the sequence
52    /// specifically.
53    pub fn new(sequence: u64) -> Self {
54        MsgSet {
55            msgs: HashMap::new(),
56            next_sequence: sequence,
57        }
58    }
59
60    /// Insert a message into this set, maintaining `next_sequence`.
61    ///
62    /// - If the message nonce equals `next_sequence`, advance past any
63    ///   consecutive existing messages (gap-filling loop).
64    /// - If the nonce exceeds `next_sequence + max_nonce_gap` and [`StrictnessPolicy::Strict`],
65    ///   reject with [`Error::NonceGap`].
66    /// - Replace-by-fee for an existing nonce is rejected when strict and
67    ///   a nonce gap is present.
68    ///
69    /// [`StrictnessPolicy`] and [`TrustPolicy`] are independent: strictness controls
70    /// whether nonce gap checks run, while [`TrustPolicy`] sets `max_nonce_gap`
71    /// ([`MAX_NONCE_GAP`] for trusted, `0` for untrusted) and selects which cap
72    /// in [`MsgSetLimits`] applies.
73    pub(in crate::message_pool) fn add(
74        &mut self,
75        m: SignedMessage,
76        strictness: StrictnessPolicy,
77        trust: TrustPolicy,
78        limits: MsgSetLimits,
79    ) -> Result<(), Error> {
80        let strict = matches!(strictness, StrictnessPolicy::Strict);
81        let trusted = matches!(trust, TrustPolicy::Trusted);
82        let max_nonce_gap: u64 = if trusted { MAX_NONCE_GAP } else { 0 };
83        let max_actor_pending_messages = if trusted {
84            limits.trusted
85        } else {
86            limits.untrusted
87        };
88
89        let mut next_nonce = self.next_sequence;
90        let nonce_gap = if m.sequence() == next_nonce {
91            next_nonce += 1;
92            while self.msgs.contains_key(&next_nonce) {
93                next_nonce += 1;
94            }
95            false
96        } else if strict && m.sequence() > next_nonce + max_nonce_gap {
97            tracing::debug!(
98                nonce = m.sequence(),
99                next_nonce,
100                "message nonce has too big a gap from expected nonce"
101            );
102            return Err(Error::NonceGap);
103        } else {
104            m.sequence() > next_nonce
105        };
106
107        let has_existing = if let Some(exms) = self.msgs.get(&m.sequence()) {
108            if strict && nonce_gap {
109                tracing::debug!(
110                    nonce = m.sequence(),
111                    next_nonce,
112                    "rejecting replace by fee because of nonce gap"
113                );
114                return Err(Error::NonceGap);
115            }
116            if m.cid() != exms.cid() {
117                let premium = &exms.message().gas_premium;
118                let min_price = premium.clone()
119                    + ((premium * RBF_NUM).div_floor(RBF_DENOM))
120                    + TokenAmount::from_atto(1u8);
121                if m.message().gas_premium <= min_price {
122                    return Err(Error::GasPriceTooLow);
123                }
124            } else {
125                return Err(Error::DuplicateSequence);
126            }
127            true
128        } else {
129            false
130        };
131
132        // Only check the limit when adding a new message, not when replacing an existing one (RBF)
133        if !has_existing && self.msgs.len() as u64 >= max_actor_pending_messages {
134            return Err(Error::TooManyPendingMessages(
135                m.message.from().to_string(),
136                trusted,
137            ));
138        }
139
140        if strict && nonce_gap {
141            tracing::debug!(
142                from = %m.from(),
143                nonce = m.sequence(),
144                next_nonce,
145                "adding nonce-gapped message"
146            );
147        }
148
149        self.next_sequence = next_nonce;
150        if self.msgs.insert(m.sequence(), m).is_none() {
151            metrics::MPOOL_MESSAGE_TOTAL.inc();
152        }
153        Ok(())
154    }
155
156    /// Remove the message at `sequence` and adjust `next_sequence`.
157    ///
158    /// - **Applied** (included on-chain): advance `next_sequence` to
159    ///   `sequence + 1` if needed. For messages not in our pool, also run
160    ///   the gap-filling loop to advance past consecutive known messages.
161    /// - **Pruned** (evicted): rewind `next_sequence` to `sequence` if the
162    ///   removal creates a gap.
163    ///
164    /// Returns the removed message when one was present.
165    /// If the sequence was not in the set, no event is removed and [`None`] is returned.
166    pub fn rm(&mut self, sequence: u64, applied: bool) -> Option<SignedMessage> {
167        let Some(removed) = self.msgs.remove(&sequence) else {
168            if applied && sequence >= self.next_sequence {
169                self.next_sequence = sequence + 1;
170                while self.msgs.contains_key(&self.next_sequence) {
171                    self.next_sequence += 1;
172                }
173            }
174            return None;
175        };
176        metrics::MPOOL_MESSAGE_TOTAL.dec();
177
178        // adjust next sequence
179        if applied {
180            // we removed a (known) message because it was applied in a tipset
181            // we can't possibly have filled a gap in this case
182            if sequence >= self.next_sequence {
183                self.next_sequence = sequence + 1;
184            }
185        } else if sequence < self.next_sequence {
186            // we removed a message because it was pruned
187            // we have to adjust the sequence if it creates a gap or rewinds state
188            self.next_sequence = sequence;
189        }
190        Some(removed)
191    }
192}
193
194#[cfg(test)]
195mod tests {
196    use super::*;
197    use crate::shim::address::Address;
198    use crate::shim::econ::TokenAmount;
199    use crate::shim::message::Message as ShimMessage;
200
201    fn make_smsg(from: Address, seq: u64, premium: u64) -> SignedMessage {
202        SignedMessage::mock_bls_signed_message(ShimMessage {
203            from,
204            sequence: seq,
205            gas_premium: TokenAmount::from_atto(premium),
206            gas_limit: 1_000_000,
207            ..ShimMessage::default()
208        })
209    }
210
211    // Test that RBF (Replace By Fee) is allowed even when at max_actor_pending_messages capacity
212    // This matches Lotus behavior where the check is: https://github.com/filecoin-project/lotus/blob/5f32d00550ddd2f2d0f9abe97dbae07615f18547/chain/messagepool/messagepool.go#L296-L299
213    #[test]
214    fn rbf_at_capacity() {
215        let limits = MsgSetLimits::new(10, 10);
216        let mut mset = MsgSet::new(0);
217
218        // Fill up to capacity (10 messages)
219        for i in 0..10 {
220            let res = mset.add(
221                make_smsg(Address::default(), i, 100),
222                StrictnessPolicy::Relaxed,
223                TrustPolicy::Trusted,
224                limits,
225            );
226            assert!(res.is_ok(), "Failed to add message {i}");
227        }
228
229        // Should reject adding a NEW message (sequence 10) when at capacity
230        let res = mset.add(
231            make_smsg(Address::default(), 10, 100),
232            StrictnessPolicy::Relaxed,
233            TrustPolicy::Trusted,
234            limits,
235        );
236        assert!(matches!(res, Err(Error::TooManyPendingMessages(_, _))));
237
238        // Should ALLOW replacing an existing message (RBF) even when at capacity
239        // Replace message with sequence 5 with higher gas premium
240        let res = mset.add(
241            make_smsg(Address::default(), 5, 200),
242            StrictnessPolicy::Relaxed,
243            TrustPolicy::Trusted,
244            limits,
245        );
246        assert!(res.is_ok(), "RBF should be allowed at capacity");
247    }
248
249    #[test]
250    fn gap_filling_advances_next_sequence() {
251        let limits = MsgSetLimits::new(1000, 1000);
252        let mut mset = MsgSet::new(0);
253
254        mset.add(
255            make_smsg(Address::default(), 0, 100),
256            StrictnessPolicy::Relaxed,
257            TrustPolicy::Trusted,
258            limits,
259        )
260        .unwrap();
261        assert_eq!(mset.next_sequence, 1);
262
263        mset.add(
264            make_smsg(Address::default(), 2, 100),
265            StrictnessPolicy::Relaxed,
266            TrustPolicy::Trusted,
267            limits,
268        )
269        .unwrap();
270        assert_eq!(mset.next_sequence, 1, "gap at 1, so next_sequence stays");
271
272        mset.add(
273            make_smsg(Address::default(), 1, 100),
274            StrictnessPolicy::Relaxed,
275            TrustPolicy::Trusted,
276            limits,
277        )
278        .unwrap();
279        assert_eq!(
280            mset.next_sequence, 3,
281            "filling the gap should advance past all consecutive messages"
282        );
283    }
284
285    #[test]
286    fn trusted_allows_any_nonce_gap() {
287        let limits = MsgSetLimits::new(1000, 1000);
288        let mut mset = MsgSet::new(0);
289
290        mset.add(
291            make_smsg(Address::default(), 0, 100),
292            StrictnessPolicy::Relaxed,
293            TrustPolicy::Trusted,
294            limits,
295        )
296        .unwrap();
297        let res = mset.add(
298            make_smsg(Address::default(), 10, 100),
299            StrictnessPolicy::Relaxed,
300            TrustPolicy::Trusted,
301            limits,
302        );
303        assert!(
304            res.is_ok(),
305            "trusted adds skip nonce gap enforcement (StrictnessPolicy::Relaxed)"
306        );
307    }
308
309    #[test]
310    fn strict_allows_small_nonce_gap() {
311        let limits = MsgSetLimits::new(1000, 1000);
312        let mut mset = MsgSet::new(0);
313
314        // Strict + trusted -> max_nonce_gap=4 (non-local add path)
315        mset.add(
316            make_smsg(Address::default(), 0, 100),
317            StrictnessPolicy::Strict,
318            TrustPolicy::Trusted,
319            limits,
320        )
321        .unwrap();
322        let res = mset.add(
323            make_smsg(Address::default(), 3, 100),
324            StrictnessPolicy::Strict,
325            TrustPolicy::Trusted,
326            limits,
327        );
328        assert!(
329            res.is_ok(),
330            "strict+trusted: gap of 2 (within MAX_NONCE_GAP=4) should succeed"
331        );
332    }
333
334    #[test]
335    fn strict_rejects_large_nonce_gap() {
336        let limits = MsgSetLimits::new(1000, 1000);
337        let mut mset = MsgSet::new(0);
338
339        // Strict + trusted -> max_nonce_gap=4
340        mset.add(
341            make_smsg(Address::default(), 0, 100),
342            StrictnessPolicy::Strict,
343            TrustPolicy::Trusted,
344            limits,
345        )
346        .unwrap();
347        let res = mset.add(
348            make_smsg(Address::default(), 6, 100),
349            StrictnessPolicy::Strict,
350            TrustPolicy::Trusted,
351            limits,
352        );
353        assert_eq!(
354            res,
355            Err(Error::NonceGap),
356            "strict+trusted: gap of 5 (exceeds MAX_NONCE_GAP=4) should be rejected"
357        );
358    }
359
360    #[test]
361    fn strict_untrusted_rejects_any_gap() {
362        let limits = MsgSetLimits::new(1000, 1000);
363        let mut mset = MsgSet::new(0);
364
365        // Strict + untrusted -> max_nonce_gap=0
366        mset.add(
367            make_smsg(Address::default(), 0, 100),
368            StrictnessPolicy::Strict,
369            TrustPolicy::Untrusted,
370            limits,
371        )
372        .unwrap();
373        let res = mset.add(
374            make_smsg(Address::default(), 2, 100),
375            StrictnessPolicy::Strict,
376            TrustPolicy::Untrusted,
377            limits,
378        );
379        assert_eq!(
380            res,
381            Err(Error::NonceGap),
382            "strict+untrusted: any gap (maxNonceGap=0) is rejected"
383        );
384    }
385
386    #[test]
387    fn non_strict_untrusted_skips_gap_check() {
388        let limits = MsgSetLimits::new(1000, 1000);
389        let mut mset = MsgSet::new(0);
390
391        // Relaxed + untrusted -> gap check skipped (PushUntrusted path)
392        mset.add(
393            make_smsg(Address::default(), 0, 100),
394            StrictnessPolicy::Relaxed,
395            TrustPolicy::Untrusted,
396            limits,
397        )
398        .unwrap();
399        let res = mset.add(
400            make_smsg(Address::default(), 5, 100),
401            StrictnessPolicy::Relaxed,
402            TrustPolicy::Untrusted,
403            limits,
404        );
405        assert!(
406            res.is_ok(),
407            "non-strict untrusted (PushUntrusted) skips gap enforcement"
408        );
409    }
410
411    #[test]
412    fn strict_rbf_during_gap_rejected() {
413        let limits = MsgSetLimits::new(1000, 1000);
414        let mut mset = MsgSet::new(0);
415
416        // Set up a gap using relaxed trusted (local push path)
417        mset.add(
418            make_smsg(Address::default(), 0, 100),
419            StrictnessPolicy::Relaxed,
420            TrustPolicy::Trusted,
421            limits,
422        )
423        .unwrap();
424        mset.add(
425            make_smsg(Address::default(), 2, 100),
426            StrictnessPolicy::Relaxed,
427            TrustPolicy::Trusted,
428            limits,
429        )
430        .unwrap();
431
432        // Strict RBF at nonce 2 should be rejected due to gap at nonce 1
433        let res = mset.add(
434            make_smsg(Address::default(), 2, 200),
435            StrictnessPolicy::Strict,
436            TrustPolicy::Trusted,
437            limits,
438        );
439        assert_eq!(
440            res,
441            Err(Error::NonceGap),
442            "strict RBF should be rejected when nonce gap exists"
443        );
444    }
445
446    #[test]
447    fn rbf_without_gap_still_works() {
448        let limits = MsgSetLimits::new(1000, 1000);
449        let mut mset = MsgSet::new(0);
450
451        mset.add(
452            make_smsg(Address::default(), 0, 100),
453            StrictnessPolicy::Relaxed,
454            TrustPolicy::Trusted,
455            limits,
456        )
457        .unwrap();
458        mset.add(
459            make_smsg(Address::default(), 1, 100),
460            StrictnessPolicy::Relaxed,
461            TrustPolicy::Trusted,
462            limits,
463        )
464        .unwrap();
465        mset.add(
466            make_smsg(Address::default(), 2, 100),
467            StrictnessPolicy::Relaxed,
468            TrustPolicy::Trusted,
469            limits,
470        )
471        .unwrap();
472
473        let res = mset.add(
474            make_smsg(Address::default(), 1, 200),
475            StrictnessPolicy::Relaxed,
476            TrustPolicy::Trusted,
477            limits,
478        );
479        assert!(res.is_ok(), "RBF without a nonce gap should succeed");
480    }
481
482    #[test]
483    fn rm_applied_advances_next_sequence() {
484        let limits = MsgSetLimits::new(1000, 1000);
485        let mut mset = MsgSet::new(0);
486
487        mset.add(
488            make_smsg(Address::default(), 0, 100),
489            StrictnessPolicy::Relaxed,
490            TrustPolicy::Trusted,
491            limits,
492        )
493        .unwrap();
494        assert_eq!(mset.next_sequence, 1);
495
496        // applied=true, and sequence >= next_sequence path: remove advances
497        mset.rm(0, true);
498        assert_eq!(
499            mset.next_sequence, 1,
500            "applied rm at seq < next_sequence does not advance further"
501        );
502
503        // applied=true with an unknown sequence ahead of current: advances
504        mset.rm(5, true);
505        assert_eq!(
506            mset.next_sequence, 6,
507            "applied rm of unknown seq >= next_sequence advances to seq+1"
508        );
509    }
510
511    #[test]
512    fn rm_pruned_rewinds_next_sequence_on_gap() {
513        let limits = MsgSetLimits::new(1000, 1000);
514        let mut mset = MsgSet::new(0);
515
516        // Fill 0..=2 so next_sequence=3
517        for i in 0..3 {
518            mset.add(
519                make_smsg(Address::default(), i, 100),
520                StrictnessPolicy::Relaxed,
521                TrustPolicy::Trusted,
522                limits,
523            )
524            .unwrap();
525        }
526        assert_eq!(mset.next_sequence, 3);
527
528        // applied=false (prune) of seq=1 (< next_sequence): rewind to 1
529        mset.rm(1, false);
530        assert_eq!(
531            mset.next_sequence, 1,
532            "pruned rm creating a gap rewinds next_sequence"
533        );
534    }
535}