Skip to main content

forest/message_pool/msgpool/
pending_store.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4//! Pending message storage and event broadcast.
5
6use std::sync::Arc;
7
8use ahash::{HashMap, HashMapExt};
9use parking_lot::RwLock as SyncRwLock;
10use tokio::sync::broadcast;
11
12use crate::message::SignedMessage;
13use crate::message_pool::errors::Error;
14use crate::message_pool::msgpool::events::{MPOOL_UPDATE_CHANNEL_CAPACITY, MpoolUpdate};
15use crate::message_pool::msgpool::msg_pool::TrustPolicy;
16use crate::message_pool::msgpool::msg_set::{MsgSet, MsgSetLimits, StrictnessPolicy};
17use crate::shim::address::Address;
18use crate::utils::ShallowClone;
19use crate::utils::broadcast::has_subscribers;
20
21/// Owns the per-actor [`MsgSet`] map and the [`MpoolUpdate`] broadcast
22/// channel. The single place where the pending map is mutated.
23pub(in crate::message_pool) struct PendingStore {
24    inner: Arc<Inner>,
25}
26
27struct Inner {
28    /// Per-resolved-address pending messages.
29    pending: SyncRwLock<HashMap<Address, MsgSet>>,
30    /// Broadcast channel for [`MpoolUpdate`] events.
31    events: broadcast::Sender<MpoolUpdate>,
32    /// Per-actor pending-message caps captured once from the provider.
33    limits: MsgSetLimits,
34}
35
36impl ShallowClone for PendingStore {
37    fn shallow_clone(&self) -> Self {
38        Self {
39            inner: self.inner.shallow_clone(),
40        }
41    }
42}
43
44impl PendingStore {
45    /// Construct an empty store with the given per-actor limits.
46    pub(in crate::message_pool) fn new(limits: MsgSetLimits) -> Self {
47        let (events, _) = broadcast::channel(MPOOL_UPDATE_CHANNEL_CAPACITY);
48        Self {
49            inner: Arc::new(Inner {
50                pending: SyncRwLock::new(HashMap::new()),
51                events,
52                limits,
53            }),
54        }
55    }
56
57    /// Insert a signed message under its already-resolved sender address.
58    ///
59    /// On success, emits a single [`MpoolUpdate::Add`] carrying the inserted
60    /// message.
61    pub(in crate::message_pool) fn insert(
62        &self,
63        resolved_from: Address,
64        msg: SignedMessage,
65        state_sequence: u64,
66        trust: TrustPolicy,
67        strictness: StrictnessPolicy,
68    ) -> Result<(), Error> {
69        let event_msg = has_subscribers(&self.inner.events).then(|| msg.clone());
70
71        {
72            let mut pending = self.inner.pending.write();
73            let mset = pending
74                .entry(resolved_from)
75                .or_insert_with(|| MsgSet::new(state_sequence));
76            mset.add(msg, strictness, trust, self.inner.limits)?;
77        }
78
79        if let Some(m) = event_msg {
80            // send() only fails when there are zero receivers.
81            let _ = self.inner.events.send(MpoolUpdate::Add(m));
82        }
83        Ok(())
84    }
85
86    /// Remove the message at `sequence` for `from` (which must already be in
87    /// resolved-key form).
88    /// Returns the removed message if one was present. Emits a single
89    /// [`MpoolUpdate::Remove`] per actual removal
90    pub(in crate::message_pool) fn remove(
91        &self,
92        from: &Address,
93        sequence: u64,
94        applied: bool,
95    ) -> Option<SignedMessage> {
96        let removed = {
97            let mut pending = self.inner.pending.write();
98            let mset = pending.get_mut(from)?;
99            let removed = mset.rm(sequence, applied);
100            if mset.msgs.is_empty() {
101                pending.remove(from);
102            }
103            removed
104        };
105
106        if let Some(msg) = &removed
107            && has_subscribers(&self.inner.events)
108        {
109            let _ = self.inner.events.send(MpoolUpdate::Remove(msg.clone()));
110        }
111        removed
112    }
113
114    /// Deep-clone of the pending map — one read-lock acquisition.
115    pub(in crate::message_pool) fn snapshot(&self) -> HashMap<Address, MsgSet> {
116        self.inner.pending.read().clone()
117    }
118
119    /// Deep-clone the [`MsgSet`] for a single sender, if present.
120    pub(in crate::message_pool) fn snapshot_for(&self, addr: &Address) -> Option<MsgSet> {
121        self.inner.pending.read().get(addr).cloned()
122    }
123
124    /// Subscribe to the [`MpoolUpdate`] stream. Returned receiver is
125    /// independent; dropping it does not affect other subscribers.
126    #[allow(dead_code)] // consumed by MessagePool::subscribe_to_updates / external subscribers.
127    pub fn subscribe(&self) -> broadcast::Receiver<MpoolUpdate> {
128        self.inner.events.subscribe()
129    }
130}
131
132#[cfg(test)]
133mod tests {
134    use super::*;
135    use crate::message::MessageRead as _;
136    use crate::shim::econ::TokenAmount;
137    use crate::shim::message::Message as ShimMessage;
138    use tokio::sync::broadcast::error::TryRecvError;
139
140    /// Default limits used by `PendingStore` unit tests. Picked high enough
141    /// that nonce/gap behaviour, not capacity, drives the outcomes.
142    const TEST_LIMITS: MsgSetLimits = MsgSetLimits {
143        trusted: 1000,
144        untrusted: 1000,
145    };
146
147    fn make_smsg(from: Address, seq: u64, premium: u64) -> SignedMessage {
148        SignedMessage::mock_bls_signed_message(ShimMessage {
149            from,
150            sequence: seq,
151            gas_premium: TokenAmount::from_atto(premium),
152            gas_limit: 1_000_000,
153            ..ShimMessage::default()
154        })
155    }
156
157    fn assert_add(update: MpoolUpdate, expected_seq: u64) {
158        match update {
159            MpoolUpdate::Add(m) => assert_eq!(m.sequence(), expected_seq),
160            other => panic!("expected Add, got {other:?}"),
161        }
162    }
163
164    fn assert_remove(update: MpoolUpdate, expected_seq: u64) {
165        match update {
166            MpoolUpdate::Remove(m) => assert_eq!(m.sequence(), expected_seq),
167            other => panic!("expected Remove, got {other:?}"),
168        }
169    }
170
171    #[test]
172    fn insert_emits_add_and_stores_message() {
173        let store = PendingStore::new(TEST_LIMITS);
174        let mut rx = store.subscribe();
175        let addr = Address::new_id(1);
176
177        store
178            .insert(
179                addr,
180                make_smsg(addr, 0, 100),
181                0,
182                TrustPolicy::Trusted,
183                StrictnessPolicy::Relaxed,
184            )
185            .unwrap();
186
187        assert_add(rx.try_recv().unwrap(), 0);
188        assert!(
189            matches!(rx.try_recv(), Err(TryRecvError::Empty)),
190            "expected empty channel"
191        );
192        assert_eq!(store.snapshot_for(&addr).unwrap().next_sequence, 1);
193    }
194
195    #[test]
196    fn rbf_replacement_emits_add_for_the_new_message() {
197        let store = PendingStore::new(TEST_LIMITS);
198        let mut rx = store.subscribe();
199        let addr = Address::new_id(1);
200
201        store
202            .insert(
203                addr,
204                make_smsg(addr, 0, 100),
205                0,
206                TrustPolicy::Trusted,
207                StrictnessPolicy::Relaxed,
208            )
209            .unwrap();
210        store
211            .insert(
212                addr,
213                make_smsg(addr, 0, 200), // higher premium → RBF
214                0,
215                TrustPolicy::Trusted,
216                StrictnessPolicy::Relaxed,
217            )
218            .unwrap();
219
220        assert_add(rx.try_recv().unwrap(), 0);
221        assert_add(rx.try_recv().unwrap(), 0);
222        assert!(
223            matches!(rx.try_recv(), Err(TryRecvError::Empty)),
224            "expected empty channel"
225        );
226    }
227
228    #[test]
229    fn remove_emits_remove_once_then_is_idempotent() {
230        let store = PendingStore::new(TEST_LIMITS);
231        let mut rx = store.subscribe();
232        let addr = Address::new_id(1);
233
234        store
235            .insert(
236                addr,
237                make_smsg(addr, 0, 100),
238                0,
239                TrustPolicy::Trusted,
240                StrictnessPolicy::Relaxed,
241            )
242            .unwrap();
243        let _add = rx.try_recv().unwrap();
244
245        assert!(store.remove(&addr, 0, true).is_some());
246        assert_remove(rx.try_recv().unwrap(), 0);
247
248        // Second remove is a no-op — sender is already gone.
249        assert!(store.remove(&addr, 0, true).is_none());
250        assert!(
251            matches!(rx.try_recv(), Err(TryRecvError::Empty)),
252            "expected empty channel"
253        );
254    }
255
256    #[test]
257    fn remove_of_unknown_sender_is_silent() {
258        let store = PendingStore::new(TEST_LIMITS);
259        let mut rx = store.subscribe();
260        let addr = Address::new_id(42);
261
262        assert!(store.remove(&addr, 0, true).is_none());
263        assert!(
264            matches!(rx.try_recv(), Err(TryRecvError::Empty)),
265            "expected empty channel"
266        );
267    }
268
269    #[test]
270    fn insert_without_subscribers_skips_message_clone() {
271        // Regression guard for the has_subscribers fast-path: insert must
272        // succeed and the store must reflect the message even when the emit
273        // branch is elided entirely.
274        let store = PendingStore::new(TEST_LIMITS);
275        let addr = Address::new_id(1);
276
277        assert!(!has_subscribers(&store.inner.events));
278        store
279            .insert(
280                addr,
281                make_smsg(addr, 0, 100),
282                0,
283                TrustPolicy::Trusted,
284                StrictnessPolicy::Relaxed,
285            )
286            .unwrap();
287        assert_eq!(store.snapshot_for(&addr).unwrap().next_sequence, 1);
288    }
289
290    #[test]
291    fn snapshot_is_a_deep_copy() {
292        let store = PendingStore::new(TEST_LIMITS);
293        let addr = Address::new_id(1);
294        store
295            .insert(
296                addr,
297                make_smsg(addr, 0, 100),
298                0,
299                TrustPolicy::Trusted,
300                StrictnessPolicy::Relaxed,
301            )
302            .unwrap();
303
304        let mut snap = store.snapshot();
305        snap.clear();
306        assert!(
307            !store.snapshot().is_empty(),
308            "mutating the snapshot must not affect the store"
309        );
310    }
311
312    #[test]
313    fn clone_is_cheap_and_shares_state() {
314        // The handle pattern: cloning the store gives another view over the
315        // same pending map and the same broadcast channel.
316        let store = PendingStore::new(TEST_LIMITS);
317        let handle = store.shallow_clone();
318        let mut rx = handle.subscribe();
319        let addr = Address::new_id(7);
320
321        store
322            .insert(
323                addr,
324                make_smsg(addr, 0, 100),
325                0,
326                TrustPolicy::Trusted,
327                StrictnessPolicy::Relaxed,
328            )
329            .unwrap();
330
331        assert_add(rx.try_recv().unwrap(), 0);
332        assert_eq!(handle.snapshot_for(&addr).unwrap().next_sequence, 1);
333    }
334
335    #[test]
336    fn remove_clears_empty_sender_bucket() {
337        let store = PendingStore::new(TEST_LIMITS);
338        let addr = Address::new_id(1);
339        store
340            .insert(
341                addr,
342                make_smsg(addr, 0, 100),
343                0,
344                TrustPolicy::Trusted,
345                StrictnessPolicy::Relaxed,
346            )
347            .unwrap();
348        store.remove(&addr, 0, true);
349        assert!(
350            store.snapshot().is_empty(),
351            "removing the last message for an actor should drop the bucket"
352        );
353    }
354}