use std::collections::{HashMap, VecDeque};
use freenet_stdlib::prelude::ContractInstanceId;
use super::handler::{ContractHandlerEvent, EventId};
pub(super) const MAX_QUEUED_PER_CONTRACT: usize = 100;
pub(super) const MAX_TOTAL_FAIR_QUEUE: usize = 5_000;
pub(super) const CLIENT_LOCAL_RESERVE: usize = 256;
pub(super) const MAX_DRAIN_BATCH: usize = 256;
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
pub(crate) enum Priority {
Background,
NetworkRelay,
ClientLocal,
}
impl Priority {
pub(crate) const DEFAULT: Priority = Priority::NetworkRelay;
}
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
enum QueueKey {
Contract(ContractInstanceId),
Default,
}
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub(super) enum RejectReason {
GlobalCapacity,
PerContract,
Evicted,
}
pub(super) struct RejectedEvent {
pub id: EventId,
pub event: ContractHandlerEvent,
pub priority: Priority,
pub reason: RejectReason,
}
impl std::fmt::Debug for RejectedEvent {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RejectedEvent")
.field("id", &self.id.id)
.field("event", &self.event)
.field("priority", &self.priority)
.field("reason", &self.reason)
.finish()
}
}
type QueuedItem = (EventId, ContractHandlerEvent, Priority);
#[derive(Default)]
struct PriorityTier {
contract_queues: HashMap<ContractInstanceId, VecDeque<QueuedItem>>,
default_queue: VecDeque<QueuedItem>,
round_robin: VecDeque<QueueKey>,
queued: usize,
}
impl PriorityTier {
fn len_for(&self, event: &ContractHandlerEvent) -> usize {
match extract_contract_id(event) {
Some(cid) => self.contract_queues.get(&cid).map_or(0, VecDeque::len),
None => self.default_queue.len(),
}
}
fn push(&mut self, item: QueuedItem) {
match extract_contract_id(&item.1) {
Some(cid) => {
let queue = self.contract_queues.entry(cid).or_default();
if queue.is_empty() {
self.round_robin.push_back(QueueKey::Contract(cid));
}
queue.push_back(item);
}
None => {
if self.default_queue.is_empty() {
self.round_robin.push_back(QueueKey::Default);
}
self.default_queue.push_back(item);
}
}
self.queued += 1;
}
fn pop(&mut self) -> Option<QueuedItem> {
loop {
let key = self.round_robin.pop_front()?;
match key {
QueueKey::Contract(cid) => {
if let Some(queue) = self.contract_queues.get_mut(&cid) {
if let Some(item) = queue.pop_front() {
self.queued = self.queued.saturating_sub(1);
if queue.is_empty() {
self.contract_queues.remove(&cid);
} else {
self.round_robin.push_back(key);
}
return Some(item);
}
self.contract_queues.remove(&cid);
}
}
QueueKey::Default => {
if let Some(item) = self.default_queue.pop_front() {
self.queued = self.queued.saturating_sub(1);
if !self.default_queue.is_empty() {
self.round_robin.push_back(QueueKey::Default);
}
return Some(item);
}
}
}
}
}
}
pub(super) struct FairEventQueue {
tiers: [PriorityTier; 3],
total_queued: usize,
client_streak: usize,
}
pub(super) const RELAY_STARVATION_FLOOR: usize = 16;
impl FairEventQueue {
pub(super) fn new() -> Self {
Self {
tiers: Default::default(),
total_queued: 0,
client_streak: 0,
}
}
fn tier(&mut self, priority: Priority) -> &mut PriorityTier {
&mut self.tiers[priority as usize]
}
pub(super) fn try_push(
&mut self,
id: EventId,
event: ContractHandlerEvent,
priority: Priority,
) -> Result<(), Box<RejectedEvent>> {
if self.total_queued >= MAX_TOTAL_FAIR_QUEUE {
return Err(Box::new(RejectedEvent {
id,
event,
priority,
reason: RejectReason::GlobalCapacity,
}));
}
let soft_cap = MAX_TOTAL_FAIR_QUEUE - CLIENT_LOCAL_RESERVE;
if priority < Priority::ClientLocal && self.total_queued >= soft_cap {
return Err(Box::new(RejectedEvent {
id,
event,
priority,
reason: RejectReason::GlobalCapacity,
}));
}
if self.tier(priority).len_for(&event) >= MAX_QUEUED_PER_CONTRACT {
return Err(Box::new(RejectedEvent {
id,
event,
priority,
reason: RejectReason::PerContract,
}));
}
self.tier(priority).push((id, event, priority));
self.total_queued += 1;
Ok(())
}
pub(super) fn evict_background(&mut self, max: usize) -> Vec<RejectedEvent> {
let mut evicted = Vec::new();
for _ in 0..max {
match self.tiers[Priority::Background as usize].pop() {
Some((id, event, priority)) => {
self.total_queued = self.total_queued.saturating_sub(1);
evicted.push(RejectedEvent {
id,
event,
priority,
reason: RejectReason::Evicted,
});
}
None => break,
}
}
evicted
}
pub(super) fn background_queued(&self) -> usize {
self.tiers[Priority::Background as usize].queued
}
pub(super) fn pop(&mut self) -> Option<(EventId, ContractHandlerEvent)> {
let client_nonempty = self.tiers[Priority::ClientLocal as usize].queued > 0;
let lower_nonempty = self.tiers[Priority::NetworkRelay as usize].queued > 0
|| self.tiers[Priority::Background as usize].queued > 0;
let force_lower =
client_nonempty && lower_nonempty && self.client_streak >= RELAY_STARVATION_FLOOR;
let order: [Priority; 3] = if force_lower {
[
Priority::NetworkRelay,
Priority::Background,
Priority::ClientLocal,
]
} else {
[
Priority::ClientLocal,
Priority::NetworkRelay,
Priority::Background,
]
};
for priority in order {
if let Some((id, event, _)) = self.tiers[priority as usize].pop() {
debug_assert!(self.total_queued > 0);
self.total_queued = self.total_queued.saturating_sub(1);
if priority == Priority::ClientLocal && lower_nonempty {
self.client_streak += 1;
} else {
self.client_streak = 0;
}
return Some((id, event));
}
}
None
}
#[cfg(test)]
pub(super) fn is_empty(&self) -> bool {
self.total_queued == 0
}
#[cfg(test)]
pub(super) fn total_queued(&self) -> usize {
self.total_queued
}
}
fn extract_contract_id(event: &ContractHandlerEvent) -> Option<ContractInstanceId> {
match event {
ContractHandlerEvent::PutQuery { key, .. }
| ContractHandlerEvent::UpdateQuery { key, .. }
| ContractHandlerEvent::GetSummaryQuery { key, .. }
| ContractHandlerEvent::GetDeltaQuery { key, .. }
| ContractHandlerEvent::EvictContract { key, .. } => Some(*key.id()),
ContractHandlerEvent::GetQuery { instance_id, .. }
| ContractHandlerEvent::RegisterSubscriberListener {
key: instance_id, ..
} => Some(*instance_id),
ContractHandlerEvent::DelegateRequest { .. }
| ContractHandlerEvent::DelegateResponse(_)
| ContractHandlerEvent::ExportUserSecrets { .. }
| ContractHandlerEvent::ExportUserSecretsResponse(_)
| ContractHandlerEvent::PutResponse { .. }
| ContractHandlerEvent::GetResponse { .. }
| ContractHandlerEvent::UpdateResponse { .. }
| ContractHandlerEvent::UpdateNoChange { .. }
| ContractHandlerEvent::RegisterSubscriberListenerResponse
| ContractHandlerEvent::QuerySubscriptions { .. }
| ContractHandlerEvent::QuerySubscriptionsResponse
| ContractHandlerEvent::GetSummaryResponse { .. }
| ContractHandlerEvent::GetDeltaResponse { .. }
| ContractHandlerEvent::ClientDisconnect { .. } => None,
}
}
#[cfg(test)]
mod tests {
use super::*;
use freenet_stdlib::prelude::{
ContractCode, ContractKey, Parameters, RelatedContracts, WrappedState,
};
use crate::contract::handler::EventId;
fn make_contract_id(seed: u8) -> ContractInstanceId {
make_contract_id_u32(seed as u32)
}
fn make_contract_id_u32(seed: u32) -> ContractInstanceId {
let bytes = seed.to_le_bytes();
let code = ContractCode::from(bytes.repeat(8)); let params = Parameters::from(bytes.repeat(2)); let key = ContractKey::from_params_and_code(¶ms, &code);
*key.id()
}
fn make_event_id(id: u64) -> EventId {
EventId { id }
}
fn make_get_event(contract_id: ContractInstanceId) -> ContractHandlerEvent {
ContractHandlerEvent::GetQuery {
instance_id: contract_id,
return_contract_code: false,
}
}
fn get_cid(event: ContractHandlerEvent) -> ContractInstanceId {
let ContractHandlerEvent::GetQuery { instance_id, .. } = event else {
panic!("expected GetQuery event");
};
instance_id
}
fn make_delegate_event() -> ContractHandlerEvent {
ContractHandlerEvent::ClientDisconnect {
client_id: crate::client_events::ClientId::next(),
}
}
impl FairEventQueue {
fn try_push_default(
&mut self,
id: EventId,
event: ContractHandlerEvent,
) -> Result<(), Box<RejectedEvent>> {
self.try_push(id, event, Priority::DEFAULT)
}
fn tier_queued(&self, priority: Priority) -> usize {
self.tiers[priority as usize].queued
}
}
#[test]
fn test_round_robin_fairness() {
let mut queue = FairEventQueue::new();
let a = make_contract_id(1);
let b = make_contract_id(2);
for i in 0..10u64 {
queue
.try_push_default(make_event_id(i), make_get_event(a))
.expect("push should succeed");
}
for i in 10..20u64 {
queue
.try_push_default(make_event_id(i), make_get_event(b))
.expect("push should succeed");
}
let mut results = Vec::new();
while let Some((_, event)) = queue.pop() {
let ContractHandlerEvent::GetQuery { instance_id, .. } = event else {
panic!("unexpected event type");
};
results.push(instance_id);
}
assert_eq!(results.len(), 20);
for i in 0..10 {
assert_eq!(results[i * 2], a, "expected A at position {}", i * 2);
assert_eq!(
results[i * 2 + 1],
b,
"expected B at position {}",
i * 2 + 1
);
}
}
#[test]
fn test_per_contract_limit() {
let mut queue = FairEventQueue::new();
let contract_id = make_contract_id(1);
for i in 0..MAX_QUEUED_PER_CONTRACT as u64 {
queue
.try_push_default(make_event_id(i), make_get_event(contract_id))
.expect("should succeed within limit");
}
let result = queue.try_push_default(
make_event_id(MAX_QUEUED_PER_CONTRACT as u64),
make_get_event(contract_id),
);
assert!(
result.is_err(),
"should reject when per-contract limit exceeded"
);
}
fn fill_to(queue: &mut FairEventQueue, priority: Priority, target: usize) -> u64 {
let mut pushed = queue.total_queued();
let mut contract_seed = 1_000_000u32;
let mut event_id = 1_000_000u64;
while pushed < target {
let batch = (target - pushed).min(MAX_QUEUED_PER_CONTRACT);
let contract_id = make_contract_id_u32(contract_seed);
for _ in 0..batch {
queue
.try_push(
make_event_id(event_id),
make_get_event(contract_id),
priority,
)
.expect("should succeed below the relevant cap");
event_id += 1;
pushed += 1;
}
contract_seed = contract_seed.wrapping_add(1);
}
event_id
}
#[test]
fn test_foreground_soft_cap_then_client_reserve_then_hard_cap() {
let mut queue = FairEventQueue::new();
let soft_cap = MAX_TOTAL_FAIR_QUEUE - CLIENT_LOCAL_RESERVE;
let mut event_id = fill_to(&mut queue, Priority::NetworkRelay, soft_cap);
assert_eq!(queue.total_queued(), soft_cap);
let relay_over = queue.try_push(
make_event_id(event_id),
make_get_event(make_contract_id_u32(9_999)),
Priority::NetworkRelay,
);
assert!(
relay_over.is_err(),
"NetworkRelay must be refused once the soft cap (reserve) is reached"
);
event_id += 1;
event_id = fill_to(&mut queue, Priority::ClientLocal, MAX_TOTAL_FAIR_QUEUE).max(event_id);
assert_eq!(queue.total_queued(), MAX_TOTAL_FAIR_QUEUE);
let client_over = queue.try_push(
make_event_id(event_id),
make_get_event(make_contract_id_u32(8_888)),
Priority::ClientLocal,
);
assert!(
client_over.is_err(),
"ClientLocal must still be refused at the hard MAX_TOTAL_FAIR_QUEUE cap"
);
}
#[test]
fn test_default_queue_participates() {
let mut queue = FairEventQueue::new();
let contract_id = make_contract_id(1);
queue
.try_push_default(make_event_id(0), make_delegate_event())
.expect("push should succeed");
queue
.try_push_default(make_event_id(1), make_get_event(contract_id))
.expect("push should succeed");
assert_eq!(queue.total_queued(), 2);
let first = queue.pop();
let second = queue.pop();
let third = queue.pop();
assert!(first.is_some(), "first pop should return an event");
assert!(second.is_some(), "second pop should return an event");
assert!(third.is_none(), "third pop should be empty");
}
#[test]
fn test_empty_queue_removal() {
let mut queue = FairEventQueue::new();
let contract_id = make_contract_id(1);
queue
.try_push_default(make_event_id(0), make_get_event(contract_id))
.expect("push should succeed");
queue
.try_push_default(make_event_id(1), make_get_event(contract_id))
.expect("push should succeed");
queue.pop().expect("first pop should succeed");
queue.pop().expect("second pop should succeed");
assert!(queue.is_empty());
for tier in &queue.tiers {
assert!(
tier.contract_queues.is_empty(),
"contract queue map should be empty"
);
assert!(tier.round_robin.is_empty(), "round_robin should be empty");
assert_eq!(tier.queued, 0, "tier count should be zero");
}
}
#[test]
fn test_single_contract_preserves_order() {
let mut queue = FairEventQueue::new();
let contract_id = make_contract_id(1);
for i in 0..5u64 {
queue
.try_push_default(make_event_id(i), make_get_event(contract_id))
.expect("push should succeed");
}
let mut popped_ids = Vec::new();
while let Some((id, _)) = queue.pop() {
popped_ids.push(id.id);
}
assert_eq!(popped_ids, vec![0, 1, 2, 3, 4]);
}
#[test]
fn test_stress_many_contracts() {
let mut queue = FairEventQueue::new();
let num_contracts = 47usize;
let events_per_contract = MAX_QUEUED_PER_CONTRACT;
let mut event_id = 0u64;
for seed in 0..num_contracts as u8 {
let contract_id = make_contract_id(seed);
for _ in 0..events_per_contract {
queue
.try_push_default(make_event_id(event_id), make_get_event(contract_id))
.expect("push should succeed");
event_id += 1;
}
}
assert_eq!(queue.total_queued(), num_contracts * events_per_contract);
let mut per_contract_counts: HashMap<ContractInstanceId, usize> = HashMap::new();
while let Some((_, event)) = queue.pop() {
let ContractHandlerEvent::GetQuery { instance_id, .. } = event else {
panic!("unexpected event type");
};
*per_contract_counts.entry(instance_id).or_insert(0) += 1;
}
assert_eq!(per_contract_counts.len(), num_contracts);
for count in per_contract_counts.values() {
assert_eq!(*count, events_per_contract);
}
assert!(queue.is_empty());
}
#[test]
fn test_stress_mixed_load() {
let mut queue = FairEventQueue::new();
let hot_contract = make_contract_id(0);
let num_cold = 99usize;
for i in 0..MAX_QUEUED_PER_CONTRACT as u64 {
queue
.try_push_default(make_event_id(i), make_get_event(hot_contract))
.expect("push should succeed");
}
let mut event_id = MAX_QUEUED_PER_CONTRACT as u64;
for seed in 1..=num_cold as u8 {
let contract_id = make_contract_id(seed);
queue
.try_push_default(make_event_id(event_id), make_get_event(contract_id))
.expect("push should succeed");
event_id += 1;
}
let total = MAX_QUEUED_PER_CONTRACT + num_cold;
assert_eq!(queue.total_queued(), total);
let mut hot_positions = Vec::new();
let mut cold_positions = Vec::new();
let mut position = 0;
while let Some((_, event)) = queue.pop() {
let ContractHandlerEvent::GetQuery { instance_id, .. } = event else {
panic!("unexpected event type");
};
if instance_id == hot_contract {
hot_positions.push(position);
} else {
cold_positions.push(position);
}
position += 1;
}
assert_eq!(hot_positions.len(), MAX_QUEUED_PER_CONTRACT);
assert_eq!(cold_positions.len(), num_cold);
let last_cold = cold_positions.iter().max().copied().unwrap_or(0);
let first_hot = hot_positions.iter().min().copied().unwrap_or(usize::MAX);
assert!(
first_hot < last_cold,
"hot contract events should interleave with cold contract events"
);
assert_eq!(position, total, "all events should be popped");
}
#[test]
fn test_is_empty() {
let mut queue = FairEventQueue::new();
assert!(queue.is_empty());
let contract_id = make_contract_id(1);
queue
.try_push_default(make_event_id(0), make_get_event(contract_id))
.expect("push should succeed");
assert!(!queue.is_empty());
queue.pop();
assert!(queue.is_empty());
}
#[test]
fn test_put_event_routing() {
let mut queue = FairEventQueue::new();
let code = ContractCode::from(vec![42u8; 32]);
let params = Parameters::from(vec![7u8; 8]);
let key = ContractKey::from_params_and_code(¶ms, &code);
let contract_id = *key.id();
let event = ContractHandlerEvent::PutQuery {
key,
state: WrappedState::new(vec![1, 2, 3]),
related_contracts: RelatedContracts::default(),
contract: None,
};
queue
.try_push_default(make_event_id(0), event)
.expect("push should succeed");
assert_eq!(queue.total_queued(), 1);
assert!(
queue.tiers[Priority::DEFAULT as usize]
.contract_queues
.contains_key(&contract_id)
);
}
#[test]
fn test_default_queue_per_slot_limit() {
let mut queue = FairEventQueue::new();
for i in 0..MAX_QUEUED_PER_CONTRACT as u64 {
queue
.try_push_default(make_event_id(i), make_delegate_event())
.expect("should succeed within limit");
}
let result = queue.try_push_default(
make_event_id(MAX_QUEUED_PER_CONTRACT as u64),
make_delegate_event(),
);
assert!(
result.is_err(),
"should reject when default queue per-slot limit exceeded"
);
}
#[test]
fn test_interleaved_push_pop() {
let mut queue = FairEventQueue::new();
let a = make_contract_id(1);
let b = make_contract_id(2);
queue
.try_push_default(make_event_id(0), make_get_event(a))
.unwrap();
let (id0, _) = queue.pop().expect("should pop A");
assert_eq!(id0.id, 0);
queue
.try_push_default(make_event_id(1), make_get_event(b))
.unwrap();
queue
.try_push_default(make_event_id(2), make_get_event(a))
.unwrap();
let (id1, ev1) = queue.pop().expect("should pop B");
let ContractHandlerEvent::GetQuery { instance_id, .. } = ev1 else {
panic!("unexpected event type");
};
assert_eq!(instance_id, b, "expected B (id={})", id1.id);
let (id2, ev2) = queue.pop().expect("should pop A");
let ContractHandlerEvent::GetQuery { instance_id, .. } = ev2 else {
panic!("unexpected event type");
};
assert_eq!(instance_id, a, "expected A (id={})", id2.id);
assert!(queue.pop().is_none());
}
#[test]
fn test_interleaved_three_contracts() {
let mut queue = FairEventQueue::new();
let a = make_contract_id(1);
let b = make_contract_id(2);
let c = make_contract_id(3);
for i in 0..3u64 {
queue
.try_push_default(make_event_id(i), make_get_event(a))
.unwrap();
}
for i in 3..5u64 {
queue
.try_push_default(make_event_id(i), make_get_event(b))
.unwrap();
}
queue
.try_push_default(make_event_id(5), make_get_event(c))
.unwrap();
let mut first_round = Vec::new();
for _ in 0..3 {
let (_, ev) = queue.pop().unwrap();
let ContractHandlerEvent::GetQuery { instance_id, .. } = ev else {
panic!("unexpected");
};
first_round.push(instance_id);
}
assert_eq!(first_round, vec![a, b, c]);
let mut second_round = Vec::new();
for _ in 0..2 {
let (_, ev) = queue.pop().unwrap();
let ContractHandlerEvent::GetQuery { instance_id, .. } = ev else {
panic!("unexpected");
};
second_round.push(instance_id);
}
assert_eq!(second_round, vec![a, b]);
let (_, ev) = queue.pop().unwrap();
let ContractHandlerEvent::GetQuery { instance_id, .. } = ev else {
panic!("unexpected");
};
assert_eq!(instance_id, a);
assert!(queue.pop().is_none());
}
#[test]
fn test_extract_contract_id_all_variants() {
use freenet_stdlib::prelude::UpdateData;
use tokio::sync::mpsc;
let code = ContractCode::from(vec![99u8; 32]);
let params = Parameters::from(vec![88u8; 8]);
let key = ContractKey::from_params_and_code(¶ms, &code);
let contract_id = *key.id();
let contract_events: Vec<(&str, ContractHandlerEvent)> = vec![
(
"PutQuery",
ContractHandlerEvent::PutQuery {
key,
state: WrappedState::new(vec![1]),
related_contracts: RelatedContracts::default(),
contract: None,
},
),
(
"UpdateQuery",
ContractHandlerEvent::UpdateQuery {
key,
data: UpdateData::Delta(freenet_stdlib::prelude::StateDelta::from(vec![2])),
related_contracts: RelatedContracts::default(),
},
),
(
"GetQuery",
ContractHandlerEvent::GetQuery {
instance_id: contract_id,
return_contract_code: false,
},
),
(
"GetSummaryQuery",
ContractHandlerEvent::GetSummaryQuery { key },
),
(
"GetDeltaQuery",
ContractHandlerEvent::GetDeltaQuery {
key,
their_summary: freenet_stdlib::prelude::StateSummary::from(vec![3]),
},
),
(
"RegisterSubscriberListener",
ContractHandlerEvent::RegisterSubscriberListener {
key: contract_id,
client_id: crate::client_events::ClientId::next(),
summary: None,
subscriber_listener: mpsc::channel(64).0,
},
),
(
"EvictContract",
ContractHandlerEvent::EvictContract {
key,
expected_generation: 0,
},
),
];
for (name, event) in &contract_events {
let result = extract_contract_id(event);
assert_eq!(
result,
Some(contract_id),
"{name} should route to contract queue"
);
}
let default_events: Vec<(&str, ContractHandlerEvent)> = vec![
(
"DelegateRequest",
ContractHandlerEvent::DelegateRequest {
req: freenet_stdlib::client_api::DelegateRequest::ApplicationMessages {
key: freenet_stdlib::prelude::DelegateKey::new(
[1u8; 32],
freenet_stdlib::prelude::CodeHash::new([0u8; 32]),
),
params: Parameters::from(vec![]),
inbound: vec![],
},
origin_contract: None,
user_context: None,
},
),
(
"DelegateResponse",
ContractHandlerEvent::DelegateResponse(vec![]),
),
(
"ClientDisconnect",
ContractHandlerEvent::ClientDisconnect {
client_id: crate::client_events::ClientId::next(),
},
),
(
"PutResponse",
ContractHandlerEvent::PutResponse {
new_value: Ok(WrappedState::new(vec![])),
state_changed: false,
},
),
(
"GetResponse",
ContractHandlerEvent::GetResponse {
key: None,
response: Ok(crate::contract::handler::StoreResponse {
state: None,
contract: None,
}),
},
),
(
"UpdateResponse",
ContractHandlerEvent::UpdateResponse {
new_value: Ok(WrappedState::new(vec![])),
state_changed: false,
},
),
(
"UpdateNoChange",
ContractHandlerEvent::UpdateNoChange { key },
),
(
"RegisterSubscriberListenerResponse",
ContractHandlerEvent::RegisterSubscriberListenerResponse,
),
(
"QuerySubscriptionsResponse",
ContractHandlerEvent::QuerySubscriptionsResponse,
),
(
"GetSummaryResponse",
ContractHandlerEvent::GetSummaryResponse {
key,
summary: Ok(freenet_stdlib::prelude::StateSummary::from(vec![])),
},
),
(
"GetDeltaResponse",
ContractHandlerEvent::GetDeltaResponse {
key,
delta: Ok(freenet_stdlib::prelude::StateDelta::from(vec![])),
},
),
(
"QuerySubscriptions",
ContractHandlerEvent::QuerySubscriptions {
callback: mpsc::channel::<crate::message::QueryResult>(1).0,
},
),
];
for (name, event) in &default_events {
let result = extract_contract_id(event);
assert_eq!(result, None, "{name} should route to default queue");
}
}
#[test]
fn pop_drains_higher_priority_first() {
let mut queue = FairEventQueue::new();
let bg = make_contract_id_u32(1);
let relay = make_contract_id_u32(2);
let client = make_contract_id_u32(3);
queue
.try_push(make_event_id(0), make_get_event(bg), Priority::Background)
.unwrap();
queue
.try_push(
make_event_id(1),
make_get_event(relay),
Priority::NetworkRelay,
)
.unwrap();
queue
.try_push(
make_event_id(2),
make_get_event(client),
Priority::ClientLocal,
)
.unwrap();
let order: Vec<ContractInstanceId> = std::iter::from_fn(|| queue.pop())
.map(|(_, ev)| get_cid(ev))
.collect();
assert_eq!(
order,
vec![client, relay, bg],
"must drain ClientLocal, then NetworkRelay, then Background"
);
}
#[test]
fn client_local_admitted_when_queue_full_of_background() {
let mut queue = FairEventQueue::new();
let soft_cap = MAX_TOTAL_FAIR_QUEUE - CLIENT_LOCAL_RESERVE;
fill_to(&mut queue, Priority::Background, soft_cap);
assert_eq!(queue.total_queued(), soft_cap);
for i in 0..CLIENT_LOCAL_RESERVE as u64 {
let client = make_contract_id_u32(7000 + (i / MAX_QUEUED_PER_CONTRACT as u64) as u32);
queue
.try_push(
make_event_id(100_000 + i),
make_get_event(client),
Priority::ClientLocal,
)
.expect("ClientLocal must use the reserved lane past the soft cap");
}
assert_eq!(queue.total_queued(), MAX_TOTAL_FAIR_QUEUE);
let evicted = queue.evict_background(CLIENT_LOCAL_RESERVE);
assert_eq!(evicted.len(), CLIENT_LOCAL_RESERVE);
assert!(evicted.iter().all(|e| e.priority == Priority::Background));
queue
.try_push(
make_event_id(999_999),
make_get_event(make_contract_id_u32(7777)),
Priority::ClientLocal,
)
.expect("ClientLocal admitted after Background eviction frees space");
}
#[test]
fn evict_background_only_touches_background_tier() {
let mut queue = FairEventQueue::new();
let relay = make_contract_id_u32(2);
let client = make_contract_id_u32(3);
queue
.try_push(
make_event_id(0),
make_get_event(relay),
Priority::NetworkRelay,
)
.unwrap();
queue
.try_push(
make_event_id(1),
make_get_event(client),
Priority::ClientLocal,
)
.unwrap();
for i in 0..5u64 {
queue
.try_push(
make_event_id(10 + i),
make_get_event(make_contract_id_u32(100 + i as u32)),
Priority::Background,
)
.unwrap();
}
assert_eq!(queue.background_queued(), 5);
let evicted = queue.evict_background(100);
assert_eq!(evicted.len(), 5);
assert_eq!(queue.background_queued(), 0);
assert_eq!(
queue.tier_queued(Priority::NetworkRelay),
1,
"relay event must survive background eviction"
);
assert_eq!(
queue.tier_queued(Priority::ClientLocal),
1,
"client event must survive background eviction"
);
assert_eq!(queue.total_queued(), 2);
}
#[test]
fn per_contract_cap_is_per_tier() {
let mut queue = FairEventQueue::new();
let key = make_contract_id_u32(42);
for i in 0..MAX_QUEUED_PER_CONTRACT as u64 {
queue
.try_push(make_event_id(i), make_get_event(key), Priority::Background)
.expect("background fills its own per-contract slot");
}
queue
.try_push(
make_event_id(9999),
make_get_event(key),
Priority::ClientLocal,
)
.expect("ClientLocal on a Background-saturated contract must still be admitted");
let (_, ev) = queue.pop().unwrap();
assert_eq!(get_cid(ev), key);
assert_eq!(queue.tier_queued(Priority::ClientLocal), 0);
assert_eq!(
queue.tier_queued(Priority::Background),
MAX_QUEUED_PER_CONTRACT
);
}
#[test]
fn within_tier_round_robin_preserved() {
let mut queue = FairEventQueue::new();
let a = make_contract_id_u32(1);
let b = make_contract_id_u32(2);
for i in 0..4u64 {
queue
.try_push(make_event_id(i), make_get_event(a), Priority::NetworkRelay)
.unwrap();
}
for i in 4..8u64 {
queue
.try_push(make_event_id(i), make_get_event(b), Priority::NetworkRelay)
.unwrap();
}
let order: Vec<ContractInstanceId> = std::iter::from_fn(|| queue.pop())
.map(|(_, ev)| get_cid(ev))
.collect();
assert_eq!(order, vec![a, b, a, b, a, b, a, b]);
}
#[test]
fn relay_not_starved_by_sustained_client_load() {
let mut queue = FairEventQueue::new();
let client = make_contract_id_u32(1);
let relay = make_contract_id_u32(2);
for i in 0..1000u64 {
queue
.try_push(
make_event_id(i),
make_get_event(make_contract_id_u32(1000 + (i / 50) as u32)),
Priority::ClientLocal,
)
.unwrap();
}
for i in 0..1000u64 {
queue
.try_push(
make_event_id(10_000 + i),
make_get_event(make_contract_id_u32(2000 + (i / 50) as u32)),
Priority::NetworkRelay,
)
.unwrap();
}
let _ = (client, relay);
let mut gap = 0usize; let mut max_gap = 0usize;
let mut relay_served = 0usize;
for _ in 0..400 {
let (_, ev) = queue.pop().unwrap();
let cid = get_cid(ev);
let is_relay = (2000..3000).any(|s| make_contract_id_u32(s) == cid);
if is_relay {
relay_served += 1;
max_gap = max_gap.max(gap);
gap = 0;
} else {
gap += 1;
}
}
max_gap = max_gap.max(gap);
assert!(relay_served > 0, "NetworkRelay must not be fully starved");
assert!(
max_gap <= RELAY_STARVATION_FLOOR,
"gap between relay pops ({max_gap}) must not exceed the floor ({RELAY_STARVATION_FLOOR})"
);
assert!(
relay_served < 400 / 2,
"client work should still dominate scheduling"
);
}
#[test]
fn uncontested_client_burst_does_not_trip_floor() {
let mut queue = FairEventQueue::new();
for i in 0..200u64 {
queue
.try_push(
make_event_id(i),
make_get_event(make_contract_id_u32(1000 + (i / 50) as u32)),
Priority::ClientLocal,
)
.unwrap();
}
for _ in 0..200 {
assert!(queue.pop().is_some());
}
assert!(queue.is_empty());
}
#[test]
fn per_contract_rejection_keeps_reason() {
let mut queue = FairEventQueue::new();
let key = make_contract_id_u32(1);
for i in 0..MAX_QUEUED_PER_CONTRACT as u64 {
queue
.try_push(
make_event_id(i),
make_get_event(key),
Priority::NetworkRelay,
)
.unwrap();
}
let rejected = queue
.try_push(
make_event_id(999),
make_get_event(key),
Priority::NetworkRelay,
)
.expect_err("should reject at per-contract cap");
assert_eq!(rejected.reason, RejectReason::PerContract);
let soft_cap = MAX_TOTAL_FAIR_QUEUE - CLIENT_LOCAL_RESERVE;
fill_to(&mut queue, Priority::NetworkRelay, soft_cap);
let rejected = queue
.try_push(
make_event_id(1000),
make_get_event(make_contract_id_u32(55555)),
Priority::NetworkRelay,
)
.expect_err("should reject at soft cap");
assert_eq!(rejected.reason, RejectReason::GlobalCapacity);
}
#[test]
fn evict_then_retry_can_still_fail_per_contract() {
let mut queue = FairEventQueue::new();
let hot = make_contract_id_u32(1);
for i in 0..MAX_QUEUED_PER_CONTRACT as u64 {
queue
.try_push(make_event_id(i), make_get_event(hot), Priority::ClientLocal)
.unwrap();
}
for i in 0..10u64 {
queue
.try_push(
make_event_id(1000 + i),
make_get_event(make_contract_id_u32(2000 + i as u32)),
Priority::Background,
)
.unwrap();
}
let before = queue.total_queued();
let evicted = queue.evict_background(CLIENT_LOCAL_RESERVE);
assert_eq!(evicted.len(), 10);
assert_eq!(queue.total_queued(), before - 10);
let rejected = queue
.try_push(
make_event_id(9999),
make_get_event(hot),
Priority::ClientLocal,
)
.expect_err("hot key is at its ClientLocal per-contract cap");
assert_eq!(
rejected.reason,
RejectReason::PerContract,
"post-eviction retry failure must be PerContract, not GlobalCapacity"
);
}
}