use std::sync::Arc;
use ahash::{HashMap, HashMapExt};
use parking_lot::RwLock as SyncRwLock;
use tokio::sync::broadcast;
use crate::message::SignedMessage;
use crate::message_pool::errors::Error;
use crate::message_pool::msgpool::events::{MPOOL_UPDATE_CHANNEL_CAPACITY, MpoolUpdate};
use crate::message_pool::msgpool::msg_pool::TrustPolicy;
use crate::message_pool::msgpool::msg_set::{MsgSet, MsgSetLimits, StrictnessPolicy};
use crate::shim::address::Address;
use crate::utils::ShallowClone;
use crate::utils::broadcast::has_subscribers;
pub(in crate::message_pool) struct PendingStore {
inner: Arc<Inner>,
}
struct Inner {
pending: SyncRwLock<HashMap<Address, MsgSet>>,
events: broadcast::Sender<MpoolUpdate>,
limits: MsgSetLimits,
}
impl ShallowClone for PendingStore {
fn shallow_clone(&self) -> Self {
Self {
inner: self.inner.shallow_clone(),
}
}
}
impl PendingStore {
pub(in crate::message_pool) fn new(limits: MsgSetLimits) -> Self {
let (events, _) = broadcast::channel(MPOOL_UPDATE_CHANNEL_CAPACITY);
Self {
inner: Arc::new(Inner {
pending: SyncRwLock::new(HashMap::new()),
events,
limits,
}),
}
}
pub(in crate::message_pool) fn insert(
&self,
resolved_from: Address,
msg: SignedMessage,
state_sequence: u64,
trust: TrustPolicy,
strictness: StrictnessPolicy,
) -> Result<(), Error> {
let event_msg = has_subscribers(&self.inner.events).then(|| msg.clone());
{
let mut pending = self.inner.pending.write();
let mset = pending
.entry(resolved_from)
.or_insert_with(|| MsgSet::new(state_sequence));
mset.add(msg, strictness, trust, self.inner.limits)?;
}
if let Some(m) = event_msg {
let _ = self.inner.events.send(MpoolUpdate::Add(m));
}
Ok(())
}
pub(in crate::message_pool) fn remove(
&self,
from: &Address,
sequence: u64,
applied: bool,
) -> Option<SignedMessage> {
let removed = {
let mut pending = self.inner.pending.write();
let mset = pending.get_mut(from)?;
let removed = mset.rm(sequence, applied);
if mset.msgs.is_empty() {
pending.remove(from);
}
removed
};
if let Some(msg) = &removed
&& has_subscribers(&self.inner.events)
{
let _ = self.inner.events.send(MpoolUpdate::Remove(msg.clone()));
}
removed
}
pub(in crate::message_pool) fn snapshot(&self) -> HashMap<Address, MsgSet> {
self.inner.pending.read().clone()
}
pub(in crate::message_pool) fn snapshot_for(&self, addr: &Address) -> Option<MsgSet> {
self.inner.pending.read().get(addr).cloned()
}
#[allow(dead_code)] pub fn subscribe(&self) -> broadcast::Receiver<MpoolUpdate> {
self.inner.events.subscribe()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::message::MessageRead as _;
use crate::shim::econ::TokenAmount;
use crate::shim::message::Message as ShimMessage;
use tokio::sync::broadcast::error::TryRecvError;
const TEST_LIMITS: MsgSetLimits = MsgSetLimits {
trusted: 1000,
untrusted: 1000,
};
fn make_smsg(from: Address, seq: u64, premium: u64) -> SignedMessage {
SignedMessage::mock_bls_signed_message(ShimMessage {
from,
sequence: seq,
gas_premium: TokenAmount::from_atto(premium),
gas_limit: 1_000_000,
..ShimMessage::default()
})
}
fn assert_add(update: MpoolUpdate, expected_seq: u64) {
match update {
MpoolUpdate::Add(m) => assert_eq!(m.sequence(), expected_seq),
other => panic!("expected Add, got {other:?}"),
}
}
fn assert_remove(update: MpoolUpdate, expected_seq: u64) {
match update {
MpoolUpdate::Remove(m) => assert_eq!(m.sequence(), expected_seq),
other => panic!("expected Remove, got {other:?}"),
}
}
#[test]
fn insert_emits_add_and_stores_message() {
let store = PendingStore::new(TEST_LIMITS);
let mut rx = store.subscribe();
let addr = Address::new_id(1);
store
.insert(
addr,
make_smsg(addr, 0, 100),
0,
TrustPolicy::Trusted,
StrictnessPolicy::Relaxed,
)
.unwrap();
assert_add(rx.try_recv().unwrap(), 0);
assert!(
matches!(rx.try_recv(), Err(TryRecvError::Empty)),
"expected empty channel"
);
assert_eq!(store.snapshot_for(&addr).unwrap().next_sequence, 1);
}
#[test]
fn rbf_replacement_emits_add_for_the_new_message() {
let store = PendingStore::new(TEST_LIMITS);
let mut rx = store.subscribe();
let addr = Address::new_id(1);
store
.insert(
addr,
make_smsg(addr, 0, 100),
0,
TrustPolicy::Trusted,
StrictnessPolicy::Relaxed,
)
.unwrap();
store
.insert(
addr,
make_smsg(addr, 0, 200), 0,
TrustPolicy::Trusted,
StrictnessPolicy::Relaxed,
)
.unwrap();
assert_add(rx.try_recv().unwrap(), 0);
assert_add(rx.try_recv().unwrap(), 0);
assert!(
matches!(rx.try_recv(), Err(TryRecvError::Empty)),
"expected empty channel"
);
}
#[test]
fn remove_emits_remove_once_then_is_idempotent() {
let store = PendingStore::new(TEST_LIMITS);
let mut rx = store.subscribe();
let addr = Address::new_id(1);
store
.insert(
addr,
make_smsg(addr, 0, 100),
0,
TrustPolicy::Trusted,
StrictnessPolicy::Relaxed,
)
.unwrap();
let _add = rx.try_recv().unwrap();
assert!(store.remove(&addr, 0, true).is_some());
assert_remove(rx.try_recv().unwrap(), 0);
assert!(store.remove(&addr, 0, true).is_none());
assert!(
matches!(rx.try_recv(), Err(TryRecvError::Empty)),
"expected empty channel"
);
}
#[test]
fn remove_of_unknown_sender_is_silent() {
let store = PendingStore::new(TEST_LIMITS);
let mut rx = store.subscribe();
let addr = Address::new_id(42);
assert!(store.remove(&addr, 0, true).is_none());
assert!(
matches!(rx.try_recv(), Err(TryRecvError::Empty)),
"expected empty channel"
);
}
#[test]
fn insert_without_subscribers_skips_message_clone() {
let store = PendingStore::new(TEST_LIMITS);
let addr = Address::new_id(1);
assert!(!has_subscribers(&store.inner.events));
store
.insert(
addr,
make_smsg(addr, 0, 100),
0,
TrustPolicy::Trusted,
StrictnessPolicy::Relaxed,
)
.unwrap();
assert_eq!(store.snapshot_for(&addr).unwrap().next_sequence, 1);
}
#[test]
fn snapshot_is_a_deep_copy() {
let store = PendingStore::new(TEST_LIMITS);
let addr = Address::new_id(1);
store
.insert(
addr,
make_smsg(addr, 0, 100),
0,
TrustPolicy::Trusted,
StrictnessPolicy::Relaxed,
)
.unwrap();
let mut snap = store.snapshot();
snap.clear();
assert!(
!store.snapshot().is_empty(),
"mutating the snapshot must not affect the store"
);
}
#[test]
fn clone_is_cheap_and_shares_state() {
let store = PendingStore::new(TEST_LIMITS);
let handle = store.shallow_clone();
let mut rx = handle.subscribe();
let addr = Address::new_id(7);
store
.insert(
addr,
make_smsg(addr, 0, 100),
0,
TrustPolicy::Trusted,
StrictnessPolicy::Relaxed,
)
.unwrap();
assert_add(rx.try_recv().unwrap(), 0);
assert_eq!(handle.snapshot_for(&addr).unwrap().next_sequence, 1);
}
#[test]
fn remove_clears_empty_sender_bucket() {
let store = PendingStore::new(TEST_LIMITS);
let addr = Address::new_id(1);
store
.insert(
addr,
make_smsg(addr, 0, 100),
0,
TrustPolicy::Trusted,
StrictnessPolicy::Relaxed,
)
.unwrap();
store.remove(&addr, 0, true);
assert!(
store.snapshot().is_empty(),
"removing the last message for an actor should drop the bucket"
);
}
}