forest/message_pool/msgpool/
pending_store.rs1use std::sync::Arc;
7
8use ahash::{HashMap, HashMapExt};
9use parking_lot::RwLock as SyncRwLock;
10use tokio::sync::broadcast;
11
12use crate::message::SignedMessage;
13use crate::message_pool::errors::Error;
14use crate::message_pool::msgpool::events::{MPOOL_UPDATE_CHANNEL_CAPACITY, MpoolUpdate};
15use crate::message_pool::msgpool::msg_pool::TrustPolicy;
16use crate::message_pool::msgpool::msg_set::{MsgSet, MsgSetLimits, StrictnessPolicy};
17use crate::shim::address::Address;
18use crate::utils::ShallowClone;
19use crate::utils::broadcast::has_subscribers;
20
21pub(in crate::message_pool) struct PendingStore {
24 inner: Arc<Inner>,
25}
26
27struct Inner {
28 pending: SyncRwLock<HashMap<Address, MsgSet>>,
30 events: broadcast::Sender<MpoolUpdate>,
32 limits: MsgSetLimits,
34}
35
36impl ShallowClone for PendingStore {
37 fn shallow_clone(&self) -> Self {
38 Self {
39 inner: self.inner.shallow_clone(),
40 }
41 }
42}
43
44impl PendingStore {
45 pub(in crate::message_pool) fn new(limits: MsgSetLimits) -> Self {
47 let (events, _) = broadcast::channel(MPOOL_UPDATE_CHANNEL_CAPACITY);
48 Self {
49 inner: Arc::new(Inner {
50 pending: SyncRwLock::new(HashMap::new()),
51 events,
52 limits,
53 }),
54 }
55 }
56
57 pub(in crate::message_pool) fn insert(
62 &self,
63 resolved_from: Address,
64 msg: SignedMessage,
65 state_sequence: u64,
66 trust: TrustPolicy,
67 strictness: StrictnessPolicy,
68 ) -> Result<(), Error> {
69 let event_msg = has_subscribers(&self.inner.events).then(|| msg.clone());
70
71 {
72 let mut pending = self.inner.pending.write();
73 let mset = pending
74 .entry(resolved_from)
75 .or_insert_with(|| MsgSet::new(state_sequence));
76 mset.add(msg, strictness, trust, self.inner.limits)?;
77 }
78
79 if let Some(m) = event_msg {
80 let _ = self.inner.events.send(MpoolUpdate::Add(m));
82 }
83 Ok(())
84 }
85
86 pub(in crate::message_pool) fn remove(
91 &self,
92 from: &Address,
93 sequence: u64,
94 applied: bool,
95 ) -> Option<SignedMessage> {
96 let removed = {
97 let mut pending = self.inner.pending.write();
98 let mset = pending.get_mut(from)?;
99 let removed = mset.rm(sequence, applied);
100 if mset.msgs.is_empty() {
101 pending.remove(from);
102 }
103 removed
104 };
105
106 if let Some(msg) = &removed
107 && has_subscribers(&self.inner.events)
108 {
109 let _ = self.inner.events.send(MpoolUpdate::Remove(msg.clone()));
110 }
111 removed
112 }
113
114 pub(in crate::message_pool) fn snapshot(&self) -> HashMap<Address, MsgSet> {
116 self.inner.pending.read().clone()
117 }
118
119 pub(in crate::message_pool) fn snapshot_for(&self, addr: &Address) -> Option<MsgSet> {
121 self.inner.pending.read().get(addr).cloned()
122 }
123
124 #[allow(dead_code)] pub fn subscribe(&self) -> broadcast::Receiver<MpoolUpdate> {
128 self.inner.events.subscribe()
129 }
130}
131
132#[cfg(test)]
133mod tests {
134 use super::*;
135 use crate::message::MessageRead as _;
136 use crate::shim::econ::TokenAmount;
137 use crate::shim::message::Message as ShimMessage;
138 use tokio::sync::broadcast::error::TryRecvError;
139
140 const TEST_LIMITS: MsgSetLimits = MsgSetLimits {
143 trusted: 1000,
144 untrusted: 1000,
145 };
146
147 fn make_smsg(from: Address, seq: u64, premium: u64) -> SignedMessage {
148 SignedMessage::mock_bls_signed_message(ShimMessage {
149 from,
150 sequence: seq,
151 gas_premium: TokenAmount::from_atto(premium),
152 gas_limit: 1_000_000,
153 ..ShimMessage::default()
154 })
155 }
156
157 fn assert_add(update: MpoolUpdate, expected_seq: u64) {
158 match update {
159 MpoolUpdate::Add(m) => assert_eq!(m.sequence(), expected_seq),
160 other => panic!("expected Add, got {other:?}"),
161 }
162 }
163
164 fn assert_remove(update: MpoolUpdate, expected_seq: u64) {
165 match update {
166 MpoolUpdate::Remove(m) => assert_eq!(m.sequence(), expected_seq),
167 other => panic!("expected Remove, got {other:?}"),
168 }
169 }
170
171 #[test]
172 fn insert_emits_add_and_stores_message() {
173 let store = PendingStore::new(TEST_LIMITS);
174 let mut rx = store.subscribe();
175 let addr = Address::new_id(1);
176
177 store
178 .insert(
179 addr,
180 make_smsg(addr, 0, 100),
181 0,
182 TrustPolicy::Trusted,
183 StrictnessPolicy::Relaxed,
184 )
185 .unwrap();
186
187 assert_add(rx.try_recv().unwrap(), 0);
188 assert!(
189 matches!(rx.try_recv(), Err(TryRecvError::Empty)),
190 "expected empty channel"
191 );
192 assert_eq!(store.snapshot_for(&addr).unwrap().next_sequence, 1);
193 }
194
195 #[test]
196 fn rbf_replacement_emits_add_for_the_new_message() {
197 let store = PendingStore::new(TEST_LIMITS);
198 let mut rx = store.subscribe();
199 let addr = Address::new_id(1);
200
201 store
202 .insert(
203 addr,
204 make_smsg(addr, 0, 100),
205 0,
206 TrustPolicy::Trusted,
207 StrictnessPolicy::Relaxed,
208 )
209 .unwrap();
210 store
211 .insert(
212 addr,
213 make_smsg(addr, 0, 200), 0,
215 TrustPolicy::Trusted,
216 StrictnessPolicy::Relaxed,
217 )
218 .unwrap();
219
220 assert_add(rx.try_recv().unwrap(), 0);
221 assert_add(rx.try_recv().unwrap(), 0);
222 assert!(
223 matches!(rx.try_recv(), Err(TryRecvError::Empty)),
224 "expected empty channel"
225 );
226 }
227
228 #[test]
229 fn remove_emits_remove_once_then_is_idempotent() {
230 let store = PendingStore::new(TEST_LIMITS);
231 let mut rx = store.subscribe();
232 let addr = Address::new_id(1);
233
234 store
235 .insert(
236 addr,
237 make_smsg(addr, 0, 100),
238 0,
239 TrustPolicy::Trusted,
240 StrictnessPolicy::Relaxed,
241 )
242 .unwrap();
243 let _add = rx.try_recv().unwrap();
244
245 assert!(store.remove(&addr, 0, true).is_some());
246 assert_remove(rx.try_recv().unwrap(), 0);
247
248 assert!(store.remove(&addr, 0, true).is_none());
250 assert!(
251 matches!(rx.try_recv(), Err(TryRecvError::Empty)),
252 "expected empty channel"
253 );
254 }
255
256 #[test]
257 fn remove_of_unknown_sender_is_silent() {
258 let store = PendingStore::new(TEST_LIMITS);
259 let mut rx = store.subscribe();
260 let addr = Address::new_id(42);
261
262 assert!(store.remove(&addr, 0, true).is_none());
263 assert!(
264 matches!(rx.try_recv(), Err(TryRecvError::Empty)),
265 "expected empty channel"
266 );
267 }
268
269 #[test]
270 fn insert_without_subscribers_skips_message_clone() {
271 let store = PendingStore::new(TEST_LIMITS);
275 let addr = Address::new_id(1);
276
277 assert!(!has_subscribers(&store.inner.events));
278 store
279 .insert(
280 addr,
281 make_smsg(addr, 0, 100),
282 0,
283 TrustPolicy::Trusted,
284 StrictnessPolicy::Relaxed,
285 )
286 .unwrap();
287 assert_eq!(store.snapshot_for(&addr).unwrap().next_sequence, 1);
288 }
289
290 #[test]
291 fn snapshot_is_a_deep_copy() {
292 let store = PendingStore::new(TEST_LIMITS);
293 let addr = Address::new_id(1);
294 store
295 .insert(
296 addr,
297 make_smsg(addr, 0, 100),
298 0,
299 TrustPolicy::Trusted,
300 StrictnessPolicy::Relaxed,
301 )
302 .unwrap();
303
304 let mut snap = store.snapshot();
305 snap.clear();
306 assert!(
307 !store.snapshot().is_empty(),
308 "mutating the snapshot must not affect the store"
309 );
310 }
311
312 #[test]
313 fn clone_is_cheap_and_shares_state() {
314 let store = PendingStore::new(TEST_LIMITS);
317 let handle = store.shallow_clone();
318 let mut rx = handle.subscribe();
319 let addr = Address::new_id(7);
320
321 store
322 .insert(
323 addr,
324 make_smsg(addr, 0, 100),
325 0,
326 TrustPolicy::Trusted,
327 StrictnessPolicy::Relaxed,
328 )
329 .unwrap();
330
331 assert_add(rx.try_recv().unwrap(), 0);
332 assert_eq!(handle.snapshot_for(&addr).unwrap().next_sequence, 1);
333 }
334
335 #[test]
336 fn remove_clears_empty_sender_bucket() {
337 let store = PendingStore::new(TEST_LIMITS);
338 let addr = Address::new_id(1);
339 store
340 .insert(
341 addr,
342 make_smsg(addr, 0, 100),
343 0,
344 TrustPolicy::Trusted,
345 StrictnessPolicy::Relaxed,
346 )
347 .unwrap();
348 store.remove(&addr, 0, true);
349 assert!(
350 store.snapshot().is_empty(),
351 "removing the last message for an actor should drop the bucket"
352 );
353 }
354}