use hashbrown::HashMap;
use parking_lot::RwLock as SyncRwLock;
use tokio::sync::broadcast;
use crate::message::SignedMessage;
use crate::message_pool::errors::Error;
use crate::message_pool::msgpool::events::{MPOOL_UPDATE_CHANNEL_CAPACITY, MpoolUpdate};
use crate::message_pool::msgpool::msg_pool::TrustPolicy;
use crate::message_pool::msgpool::msg_set::{MsgSet, MsgSetLimits, StrictnessPolicy};
use crate::prelude::*;
use crate::shim::address::Address;
use crate::utils::broadcast::has_subscribers;
pub(in crate::message_pool) struct PendingStore {
inner: Arc<Inner>,
}
impl ShallowClone for PendingStore {
fn shallow_clone(&self) -> Self {
Self {
inner: self.inner.shallow_clone(),
}
}
}
struct Inner {
pending: SyncRwLock<HashMap<Address, MsgSet>>,
events: broadcast::Sender<MpoolUpdate>,
limits: MsgSetLimits,
}
impl PendingStore {
pub(in crate::message_pool) fn new(limits: MsgSetLimits) -> Self {
let (events, _) = broadcast::channel(MPOOL_UPDATE_CHANNEL_CAPACITY);
let inner = Arc::new(Inner {
pending: SyncRwLock::new(HashMap::new()),
events,
limits,
});
crate::metrics::register_collector(Box::new(InnerMetricsCollector(inner.shallow_clone())));
Self { inner }
}
pub(in crate::message_pool) fn insert(
&self,
resolved_from: Address,
msg: SignedMessage,
state_sequence: u64,
trust: TrustPolicy,
strictness: StrictnessPolicy,
) -> Result<(), Error> {
let event_msg = has_subscribers(&self.inner.events).then(|| msg.clone());
{
let mut pending = self.inner.pending.write();
let mset = pending
.entry(resolved_from)
.or_insert_with(|| MsgSet::new(state_sequence));
mset.add(msg, strictness, trust, self.inner.limits)?;
}
if let Some(m) = event_msg {
let _ = self.inner.events.send(MpoolUpdate::Add(m));
}
Ok(())
}
pub(in crate::message_pool) fn remove(
&self,
from: &Address,
sequence: u64,
applied: bool,
) -> Option<SignedMessage> {
let removed = {
let mut pending = self.inner.pending.write();
let mset = pending.get_mut(from)?;
let removed = mset.rm(sequence, applied);
if mset.msgs.is_empty() {
pending.remove(from);
}
removed
};
if let Some(msg) = &removed
&& has_subscribers(&self.inner.events)
{
let _ = self.inner.events.send(MpoolUpdate::Remove(msg.clone()));
}
removed
}
pub(in crate::message_pool) fn shrink_to_fit(&self) {
self.inner.pending.write().shrink_to_fit();
}
pub(in crate::message_pool) fn snapshot(&self) -> HashMap<Address, MsgSet> {
self.inner.pending.read().clone()
}
pub(in crate::message_pool) fn snapshot_for(&self, addr: &Address) -> Option<MsgSet> {
self.inner.pending.read().get(addr).cloned()
}
pub fn subscribe(&self) -> broadcast::Receiver<MpoolUpdate> {
self.inner.events.subscribe()
}
}
#[derive(derive_more::Debug, derive_more::Deref)]
struct InnerMetricsCollector(#[debug(skip)] Arc<Inner>);
mod metrics_collection {
use super::*;
use prometheus_client::{
collector::Collector,
encoding::{DescriptorEncoder, EncodeMetric},
metrics::gauge::Gauge,
registry::Unit,
};
impl Collector for InnerMetricsCollector {
fn encode(&self, mut encoder: DescriptorEncoder) -> Result<(), std::fmt::Error> {
{
let size_in_bytes = {
let g: Gauge = Default::default();
g.set(self.pending.read().allocation_size() as i64);
g
};
let size_metric_encoder = encoder.encode_descriptor(
"mpool_pending_size",
"Allocation size of message pool pending messages in bytes",
Some(&Unit::Bytes),
size_in_bytes.metric_type(),
)?;
size_in_bytes.encode(size_metric_encoder)?;
}
{
let len = {
let g: Gauge = Default::default();
g.set(self.pending.read().len() as i64);
g
};
let size_metric_encoder = encoder.encode_descriptor(
"mpool_pending_len",
"Length of the message pool pending messages",
None,
len.metric_type(),
)?;
len.encode(size_metric_encoder)?;
}
{
let cap = {
let g: Gauge = Default::default();
g.set(self.pending.read().capacity() as i64);
g
};
let size_metric_encoder = encoder.encode_descriptor(
"mpool_pending_cap",
"Capacity of the message pool pending messages",
None,
cap.metric_type(),
)?;
cap.encode(size_metric_encoder)?;
}
Ok(())
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::message::MessageRead as _;
use crate::shim::econ::TokenAmount;
use crate::shim::message::Message as ShimMessage;
use tokio::sync::broadcast::error::TryRecvError;
const TEST_LIMITS: MsgSetLimits = MsgSetLimits {
trusted: 1000,
untrusted: 1000,
};
fn make_smsg(from: Address, seq: u64, premium: u64) -> SignedMessage {
SignedMessage::mock_bls_signed_message(ShimMessage {
from,
sequence: seq,
gas_premium: TokenAmount::from_atto(premium),
gas_limit: 1_000_000,
..ShimMessage::default()
})
}
fn assert_add(update: MpoolUpdate, expected_seq: u64) {
match update {
MpoolUpdate::Add(m) => assert_eq!(m.sequence(), expected_seq),
other => panic!("expected Add, got {other:?}"),
}
}
fn assert_remove(update: MpoolUpdate, expected_seq: u64) {
match update {
MpoolUpdate::Remove(m) => assert_eq!(m.sequence(), expected_seq),
other => panic!("expected Remove, got {other:?}"),
}
}
#[test]
fn insert_emits_add_and_stores_message() {
let store = PendingStore::new(TEST_LIMITS);
let mut rx = store.subscribe();
let addr = Address::new_id(1);
store
.insert(
addr,
make_smsg(addr, 0, 100),
0,
TrustPolicy::Trusted,
StrictnessPolicy::Relaxed,
)
.unwrap();
assert_add(rx.try_recv().unwrap(), 0);
assert!(
matches!(rx.try_recv(), Err(TryRecvError::Empty)),
"expected empty channel"
);
assert_eq!(store.snapshot_for(&addr).unwrap().next_sequence, 1);
}
#[test]
fn rbf_replacement_emits_add_for_the_new_message() {
let store = PendingStore::new(TEST_LIMITS);
let mut rx = store.subscribe();
let addr = Address::new_id(1);
store
.insert(
addr,
make_smsg(addr, 0, 100),
0,
TrustPolicy::Trusted,
StrictnessPolicy::Relaxed,
)
.unwrap();
store
.insert(
addr,
make_smsg(addr, 0, 200), 0,
TrustPolicy::Trusted,
StrictnessPolicy::Relaxed,
)
.unwrap();
assert_add(rx.try_recv().unwrap(), 0);
assert_add(rx.try_recv().unwrap(), 0);
assert!(
matches!(rx.try_recv(), Err(TryRecvError::Empty)),
"expected empty channel"
);
}
#[test]
fn remove_emits_remove_once_then_is_idempotent() {
let store = PendingStore::new(TEST_LIMITS);
let mut rx = store.subscribe();
let addr = Address::new_id(1);
store
.insert(
addr,
make_smsg(addr, 0, 100),
0,
TrustPolicy::Trusted,
StrictnessPolicy::Relaxed,
)
.unwrap();
let _add = rx.try_recv().unwrap();
assert!(store.remove(&addr, 0, true).is_some());
assert_remove(rx.try_recv().unwrap(), 0);
assert!(store.remove(&addr, 0, true).is_none());
assert!(
matches!(rx.try_recv(), Err(TryRecvError::Empty)),
"expected empty channel"
);
}
#[test]
fn remove_of_unknown_sender_is_silent() {
let store = PendingStore::new(TEST_LIMITS);
let mut rx = store.subscribe();
let addr = Address::new_id(42);
assert!(store.remove(&addr, 0, true).is_none());
assert!(
matches!(rx.try_recv(), Err(TryRecvError::Empty)),
"expected empty channel"
);
}
#[test]
fn insert_without_subscribers_skips_message_clone() {
let store = PendingStore::new(TEST_LIMITS);
let addr = Address::new_id(1);
assert!(!has_subscribers(&store.inner.events));
store
.insert(
addr,
make_smsg(addr, 0, 100),
0,
TrustPolicy::Trusted,
StrictnessPolicy::Relaxed,
)
.unwrap();
assert_eq!(store.snapshot_for(&addr).unwrap().next_sequence, 1);
}
#[test]
fn snapshot_is_a_deep_copy() {
let store = PendingStore::new(TEST_LIMITS);
let addr = Address::new_id(1);
store
.insert(
addr,
make_smsg(addr, 0, 100),
0,
TrustPolicy::Trusted,
StrictnessPolicy::Relaxed,
)
.unwrap();
let mut snap = store.snapshot();
snap.clear();
assert!(
!store.snapshot().is_empty(),
"mutating the snapshot must not affect the store"
);
}
#[test]
fn clone_is_cheap_and_shares_state() {
let store = PendingStore::new(TEST_LIMITS);
let handle = store.shallow_clone();
let mut rx = handle.subscribe();
let addr = Address::new_id(7);
store
.insert(
addr,
make_smsg(addr, 0, 100),
0,
TrustPolicy::Trusted,
StrictnessPolicy::Relaxed,
)
.unwrap();
assert_add(rx.try_recv().unwrap(), 0);
assert_eq!(handle.snapshot_for(&addr).unwrap().next_sequence, 1);
}
#[test]
fn remove_clears_empty_sender_bucket() {
let store = PendingStore::new(TEST_LIMITS);
let addr = Address::new_id(1);
store
.insert(
addr,
make_smsg(addr, 0, 100),
0,
TrustPolicy::Trusted,
StrictnessPolicy::Relaxed,
)
.unwrap();
store.remove(&addr, 0, true);
assert!(
store.snapshot().is_empty(),
"removing the last message for an actor should drop the bucket"
);
}
}