use dashmap::DashMap;
use freenet_stdlib::prelude::{ContractInstanceId, WrappedState};
use crate::config::GlobalSimulationTime;
const MAX_PENDING_BROADCASTS: usize = 1024;
const PENDING_BROADCAST_TTL_MS: u64 = 5 * 60 * 1_000;
struct PendingEntry {
state: WrappedState,
inserted_at_ms: u64,
}
pub(crate) struct PendingBroadcastStore {
entries: DashMap<ContractInstanceId, PendingEntry>,
}
impl PendingBroadcastStore {
pub(crate) fn new() -> Self {
Self {
entries: DashMap::new(),
}
}
pub(crate) fn stash(&self, contract: ContractInstanceId, state: WrappedState) {
let now_ms = GlobalSimulationTime::read_time_ms();
self.prune_expired(now_ms);
if !self.entries.contains_key(&contract) && self.entries.len() >= MAX_PENDING_BROADCASTS {
self.evict_oldest();
}
self.entries.insert(
contract,
PendingEntry {
state,
inserted_at_ms: now_ms,
},
);
}
pub(crate) fn contains(&self, contract: &ContractInstanceId) -> bool {
self.entries.contains_key(contract)
}
pub(crate) fn take(&self, contract: &ContractInstanceId) -> Option<WrappedState> {
let (_, entry) = self.entries.remove(contract)?;
let now_ms = GlobalSimulationTime::read_time_ms();
if now_ms.saturating_sub(entry.inserted_at_ms) >= PENDING_BROADCAST_TTL_MS {
return None;
}
Some(entry.state)
}
fn prune_expired(&self, now_ms: u64) {
self.entries
.retain(|_, e| now_ms.saturating_sub(e.inserted_at_ms) < PENDING_BROADCAST_TTL_MS);
}
fn evict_oldest(&self) {
let oldest = self
.entries
.iter()
.min_by_key(|e| e.value().inserted_at_ms)
.map(|e| *e.key());
if let Some(key) = oldest {
self.entries.remove(&key);
}
}
#[cfg(test)]
fn len(&self) -> usize {
self.entries.len()
}
}
#[cfg(test)]
mod tests {
use super::*;
fn cid(seed: u8) -> ContractInstanceId {
ContractInstanceId::new([seed; 32])
}
fn state(byte: u8) -> WrappedState {
WrappedState::new(vec![byte; 8])
}
#[test]
fn stash_then_take_returns_state_and_removes_entry() {
GlobalSimulationTime::set_time_ms(0);
let store = PendingBroadcastStore::new();
store.stash(cid(1), state(0xAA));
assert_eq!(store.len(), 1);
let taken = store.take(&cid(1)).expect("pending state present");
assert_eq!(taken.as_ref(), &[0xAA; 8]);
assert_eq!(store.len(), 0, "take must remove the entry");
assert!(store.take(&cid(1)).is_none(), "second take yields nothing");
GlobalSimulationTime::clear_time();
}
#[test]
fn contains_reflects_membership_without_removing() {
GlobalSimulationTime::set_time_ms(0);
let store = PendingBroadcastStore::new();
assert!(!store.contains(&cid(1)), "absent before stash");
store.stash(cid(1), state(0xAA));
assert!(store.contains(&cid(1)), "present after stash");
assert!(store.contains(&cid(1)), "contains is non-destructive");
assert!(store.take(&cid(1)).is_some(), "take still finds it");
assert!(!store.contains(&cid(1)), "absent after take");
GlobalSimulationTime::clear_time();
}
#[test]
fn restash_overwrites_with_newest_state() {
GlobalSimulationTime::set_time_ms(0);
let store = PendingBroadcastStore::new();
store.stash(cid(1), state(0x01));
store.stash(cid(1), state(0x02));
assert_eq!(store.len(), 1, "re-stash must not duplicate the contract");
assert_eq!(store.take(&cid(1)).unwrap().as_ref(), &[0x02; 8]);
GlobalSimulationTime::clear_time();
}
#[test]
fn expired_entry_is_not_rebroadcast() {
GlobalSimulationTime::set_time_ms(0);
let store = PendingBroadcastStore::new();
store.stash(cid(1), state(0x01));
GlobalSimulationTime::set_time_ms(PENDING_BROADCAST_TTL_MS + 1);
assert!(
store.take(&cid(1)).is_none(),
"an entry past its TTL must not be re-broadcast"
);
GlobalSimulationTime::clear_time();
}
#[test]
fn prune_drops_expired_on_stash() {
GlobalSimulationTime::set_time_ms(0);
let store = PendingBroadcastStore::new();
store.stash(cid(1), state(0x01));
GlobalSimulationTime::set_time_ms(PENDING_BROADCAST_TTL_MS + 1);
store.stash(cid(2), state(0x02));
assert_eq!(store.len(), 1, "stale entry must be pruned on stash");
assert!(store.take(&cid(1)).is_none());
assert!(store.take(&cid(2)).is_some());
GlobalSimulationTime::clear_time();
}
#[test]
fn size_cap_evicts_oldest() {
GlobalSimulationTime::set_time_ms(0);
let store = PendingBroadcastStore::new();
for i in 0..MAX_PENDING_BROADCASTS {
GlobalSimulationTime::set_time_ms(i as u64);
let mut bytes = [0u8; 32];
bytes[..8].copy_from_slice(&(i as u64).to_le_bytes());
store.stash(ContractInstanceId::new(bytes), state(1));
}
assert_eq!(store.len(), MAX_PENDING_BROADCASTS);
GlobalSimulationTime::set_time_ms(MAX_PENDING_BROADCASTS as u64);
let overflow = {
let mut bytes = [0u8; 32];
bytes[0] = 0xFF;
bytes[16] = 0xFF;
ContractInstanceId::new(bytes)
};
store.stash(overflow, state(2));
assert_eq!(store.len(), MAX_PENDING_BROADCASTS, "cap holds");
let oldest = {
let mut bytes = [0u8; 32];
bytes[..8].copy_from_slice(&0u64.to_le_bytes());
ContractInstanceId::new(bytes)
};
assert!(
store.take(&oldest).is_none(),
"oldest entry must have been evicted"
);
assert!(store.take(&overflow).is_some(), "new entry retained");
GlobalSimulationTime::clear_time();
}
}