use std::collections::{HashMap, VecDeque};
use freenet_stdlib::prelude::ContractInstanceId;
use super::handler::{ContractHandlerEvent, EventId};
pub(super) const MAX_QUEUED_PER_CONTRACT: usize = 100;
pub(super) const MAX_TOTAL_FAIR_QUEUE: usize = 5_000;
pub(super) const MAX_DRAIN_BATCH: usize = 256;
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
enum QueueKey {
Contract(ContractInstanceId),
Default,
}
pub(super) struct RejectedEvent {
pub id: EventId,
pub event: ContractHandlerEvent,
}
impl std::fmt::Debug for RejectedEvent {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RejectedEvent")
.field("id", &self.id.id)
.field("event", &self.event)
.finish()
}
}
pub(super) struct FairEventQueue {
contract_queues: HashMap<ContractInstanceId, VecDeque<(EventId, ContractHandlerEvent)>>,
default_queue: VecDeque<(EventId, ContractHandlerEvent)>,
round_robin: VecDeque<QueueKey>,
total_queued: usize,
}
impl FairEventQueue {
pub(super) fn new() -> Self {
Self {
contract_queues: HashMap::new(),
default_queue: VecDeque::new(),
round_robin: VecDeque::new(),
total_queued: 0,
}
}
pub(super) fn try_push(
&mut self,
id: EventId,
event: ContractHandlerEvent,
) -> Result<(), Box<RejectedEvent>> {
if self.total_queued >= MAX_TOTAL_FAIR_QUEUE {
return Err(Box::new(RejectedEvent { id, event }));
}
match extract_contract_id(&event) {
Some(contract_id) => {
let queue = self.contract_queues.entry(contract_id).or_default();
if queue.len() >= MAX_QUEUED_PER_CONTRACT {
return Err(Box::new(RejectedEvent { id, event }));
}
if queue.is_empty() {
self.round_robin.push_back(QueueKey::Contract(contract_id));
}
queue.push_back((id, event));
self.total_queued += 1;
}
None => {
if self.default_queue.len() >= MAX_QUEUED_PER_CONTRACT {
return Err(Box::new(RejectedEvent { id, event }));
}
if self.default_queue.is_empty() {
self.round_robin.push_back(QueueKey::Default);
}
self.default_queue.push_back((id, event));
self.total_queued += 1;
}
}
Ok(())
}
pub(super) fn pop(&mut self) -> Option<(EventId, ContractHandlerEvent)> {
loop {
let key = self.round_robin.pop_front()?;
match key {
QueueKey::Contract(contract_id) => {
if let Some(queue) = self.contract_queues.get_mut(&contract_id) {
if let Some(item) = queue.pop_front() {
debug_assert!(self.total_queued > 0);
self.total_queued = self.total_queued.saturating_sub(1);
if queue.is_empty() {
self.contract_queues.remove(&contract_id);
} else {
self.round_robin.push_back(key);
}
return Some(item);
}
self.contract_queues.remove(&contract_id);
}
}
QueueKey::Default => {
if let Some(item) = self.default_queue.pop_front() {
debug_assert!(self.total_queued > 0);
self.total_queued = self.total_queued.saturating_sub(1);
if !self.default_queue.is_empty() {
self.round_robin.push_back(QueueKey::Default);
}
return Some(item);
}
}
}
}
}
#[cfg(test)]
pub(super) fn is_empty(&self) -> bool {
self.total_queued == 0
}
#[cfg(test)]
pub(super) fn total_queued(&self) -> usize {
self.total_queued
}
}
fn extract_contract_id(event: &ContractHandlerEvent) -> Option<ContractInstanceId> {
match event {
ContractHandlerEvent::PutQuery { key, .. }
| ContractHandlerEvent::UpdateQuery { key, .. }
| ContractHandlerEvent::GetSummaryQuery { key, .. }
| ContractHandlerEvent::GetDeltaQuery { key, .. } => Some(*key.id()),
ContractHandlerEvent::GetQuery { instance_id, .. }
| ContractHandlerEvent::RegisterSubscriberListener {
key: instance_id, ..
}
| ContractHandlerEvent::NotifySubscriptionError {
key: instance_id, ..
} => Some(*instance_id),
ContractHandlerEvent::DelegateRequest { .. }
| ContractHandlerEvent::DelegateResponse(_)
| ContractHandlerEvent::PutResponse { .. }
| ContractHandlerEvent::GetResponse { .. }
| ContractHandlerEvent::UpdateResponse { .. }
| ContractHandlerEvent::UpdateNoChange { .. }
| ContractHandlerEvent::RegisterSubscriberListenerResponse
| ContractHandlerEvent::QuerySubscriptions { .. }
| ContractHandlerEvent::QuerySubscriptionsResponse
| ContractHandlerEvent::GetSummaryResponse { .. }
| ContractHandlerEvent::GetDeltaResponse { .. }
| ContractHandlerEvent::NotifySubscriptionErrorResponse
| ContractHandlerEvent::ClientDisconnect { .. } => None,
}
}
#[cfg(test)]
mod tests {
use super::*;
use freenet_stdlib::prelude::{
ContractCode, ContractKey, Parameters, RelatedContracts, WrappedState,
};
use crate::contract::handler::EventId;
fn make_contract_id(seed: u8) -> ContractInstanceId {
let code = ContractCode::from(vec![seed; 32]);
let params = Parameters::from(vec![seed; 8]);
let key = ContractKey::from_params_and_code(¶ms, &code);
*key.id()
}
fn make_event_id(id: u64) -> EventId {
EventId { id }
}
fn make_get_event(contract_id: ContractInstanceId) -> ContractHandlerEvent {
ContractHandlerEvent::GetQuery {
instance_id: contract_id,
return_contract_code: false,
}
}
fn make_delegate_event() -> ContractHandlerEvent {
ContractHandlerEvent::ClientDisconnect {
client_id: crate::client_events::ClientId::next(),
}
}
#[test]
fn test_round_robin_fairness() {
let mut queue = FairEventQueue::new();
let a = make_contract_id(1);
let b = make_contract_id(2);
for i in 0..10u64 {
queue
.try_push(make_event_id(i), make_get_event(a))
.expect("push should succeed");
}
for i in 10..20u64 {
queue
.try_push(make_event_id(i), make_get_event(b))
.expect("push should succeed");
}
let mut results = Vec::new();
while let Some((_, event)) = queue.pop() {
let ContractHandlerEvent::GetQuery { instance_id, .. } = event else {
panic!("unexpected event type");
};
results.push(instance_id);
}
assert_eq!(results.len(), 20);
for i in 0..10 {
assert_eq!(results[i * 2], a, "expected A at position {}", i * 2);
assert_eq!(
results[i * 2 + 1],
b,
"expected B at position {}",
i * 2 + 1
);
}
}
#[test]
fn test_per_contract_limit() {
let mut queue = FairEventQueue::new();
let contract_id = make_contract_id(1);
for i in 0..MAX_QUEUED_PER_CONTRACT as u64 {
queue
.try_push(make_event_id(i), make_get_event(contract_id))
.expect("should succeed within limit");
}
let result = queue.try_push(
make_event_id(MAX_QUEUED_PER_CONTRACT as u64),
make_get_event(contract_id),
);
assert!(
result.is_err(),
"should reject when per-contract limit exceeded"
);
}
#[test]
fn test_global_limit() {
let mut queue = FairEventQueue::new();
let mut pushed = 0usize;
let mut contract_seed = 0u8;
let mut event_id = 0u64;
while pushed < MAX_TOTAL_FAIR_QUEUE {
let remaining = MAX_TOTAL_FAIR_QUEUE - pushed;
let batch = remaining.min(MAX_QUEUED_PER_CONTRACT);
let contract_id = make_contract_id(contract_seed);
for _ in 0..batch {
queue
.try_push(make_event_id(event_id), make_get_event(contract_id))
.expect("should succeed within limits");
event_id += 1;
pushed += 1;
}
contract_seed = contract_seed.wrapping_add(1);
}
assert_eq!(queue.total_queued(), MAX_TOTAL_FAIR_QUEUE);
let contract_id = make_contract_id(contract_seed.wrapping_add(1));
let result = queue.try_push(make_event_id(event_id), make_get_event(contract_id));
assert!(result.is_err(), "should reject when global limit exceeded");
}
#[test]
fn test_default_queue_participates() {
let mut queue = FairEventQueue::new();
let contract_id = make_contract_id(1);
queue
.try_push(make_event_id(0), make_delegate_event())
.expect("push should succeed");
queue
.try_push(make_event_id(1), make_get_event(contract_id))
.expect("push should succeed");
assert_eq!(queue.total_queued(), 2);
let first = queue.pop();
let second = queue.pop();
let third = queue.pop();
assert!(first.is_some(), "first pop should return an event");
assert!(second.is_some(), "second pop should return an event");
assert!(third.is_none(), "third pop should be empty");
}
#[test]
fn test_empty_queue_removal() {
let mut queue = FairEventQueue::new();
let contract_id = make_contract_id(1);
queue
.try_push(make_event_id(0), make_get_event(contract_id))
.expect("push should succeed");
queue
.try_push(make_event_id(1), make_get_event(contract_id))
.expect("push should succeed");
queue.pop().expect("first pop should succeed");
queue.pop().expect("second pop should succeed");
assert!(queue.is_empty());
assert!(
queue.contract_queues.is_empty(),
"contract queue map should be empty"
);
assert!(queue.round_robin.is_empty(), "round_robin should be empty");
}
#[test]
fn test_single_contract_preserves_order() {
let mut queue = FairEventQueue::new();
let contract_id = make_contract_id(1);
for i in 0..5u64 {
queue
.try_push(make_event_id(i), make_get_event(contract_id))
.expect("push should succeed");
}
let mut popped_ids = Vec::new();
while let Some((id, _)) = queue.pop() {
popped_ids.push(id.id);
}
assert_eq!(popped_ids, vec![0, 1, 2, 3, 4]);
}
#[test]
fn test_stress_many_contracts() {
let mut queue = FairEventQueue::new();
let num_contracts = 50usize; let events_per_contract = MAX_QUEUED_PER_CONTRACT;
let mut event_id = 0u64;
for seed in 0..num_contracts as u8 {
let contract_id = make_contract_id(seed);
for _ in 0..events_per_contract {
queue
.try_push(make_event_id(event_id), make_get_event(contract_id))
.expect("push should succeed");
event_id += 1;
}
}
assert_eq!(queue.total_queued(), num_contracts * events_per_contract);
let mut per_contract_counts: HashMap<ContractInstanceId, usize> = HashMap::new();
while let Some((_, event)) = queue.pop() {
let ContractHandlerEvent::GetQuery { instance_id, .. } = event else {
panic!("unexpected event type");
};
*per_contract_counts.entry(instance_id).or_insert(0) += 1;
}
assert_eq!(per_contract_counts.len(), num_contracts);
for count in per_contract_counts.values() {
assert_eq!(*count, events_per_contract);
}
assert!(queue.is_empty());
}
#[test]
fn test_stress_mixed_load() {
let mut queue = FairEventQueue::new();
let hot_contract = make_contract_id(0);
let num_cold = 99usize;
for i in 0..MAX_QUEUED_PER_CONTRACT as u64 {
queue
.try_push(make_event_id(i), make_get_event(hot_contract))
.expect("push should succeed");
}
let mut event_id = MAX_QUEUED_PER_CONTRACT as u64;
for seed in 1..=num_cold as u8 {
let contract_id = make_contract_id(seed);
queue
.try_push(make_event_id(event_id), make_get_event(contract_id))
.expect("push should succeed");
event_id += 1;
}
let total = MAX_QUEUED_PER_CONTRACT + num_cold;
assert_eq!(queue.total_queued(), total);
let mut hot_positions = Vec::new();
let mut cold_positions = Vec::new();
let mut position = 0;
while let Some((_, event)) = queue.pop() {
let ContractHandlerEvent::GetQuery { instance_id, .. } = event else {
panic!("unexpected event type");
};
if instance_id == hot_contract {
hot_positions.push(position);
} else {
cold_positions.push(position);
}
position += 1;
}
assert_eq!(hot_positions.len(), MAX_QUEUED_PER_CONTRACT);
assert_eq!(cold_positions.len(), num_cold);
let last_cold = cold_positions.iter().max().copied().unwrap_or(0);
let first_hot = hot_positions.iter().min().copied().unwrap_or(usize::MAX);
assert!(
first_hot < last_cold,
"hot contract events should interleave with cold contract events"
);
assert_eq!(position, total, "all events should be popped");
}
#[test]
fn test_is_empty() {
let mut queue = FairEventQueue::new();
assert!(queue.is_empty());
let contract_id = make_contract_id(1);
queue
.try_push(make_event_id(0), make_get_event(contract_id))
.expect("push should succeed");
assert!(!queue.is_empty());
queue.pop();
assert!(queue.is_empty());
}
#[test]
fn test_put_event_routing() {
let mut queue = FairEventQueue::new();
let code = ContractCode::from(vec![42u8; 32]);
let params = Parameters::from(vec![7u8; 8]);
let key = ContractKey::from_params_and_code(¶ms, &code);
let contract_id = *key.id();
let event = ContractHandlerEvent::PutQuery {
key,
state: WrappedState::new(vec![1, 2, 3]),
related_contracts: RelatedContracts::default(),
contract: None,
};
queue
.try_push(make_event_id(0), event)
.expect("push should succeed");
assert_eq!(queue.total_queued(), 1);
assert!(queue.contract_queues.contains_key(&contract_id));
}
#[test]
fn test_default_queue_per_slot_limit() {
let mut queue = FairEventQueue::new();
for i in 0..MAX_QUEUED_PER_CONTRACT as u64 {
queue
.try_push(make_event_id(i), make_delegate_event())
.expect("should succeed within limit");
}
let result = queue.try_push(
make_event_id(MAX_QUEUED_PER_CONTRACT as u64),
make_delegate_event(),
);
assert!(
result.is_err(),
"should reject when default queue per-slot limit exceeded"
);
}
#[test]
fn test_interleaved_push_pop() {
let mut queue = FairEventQueue::new();
let a = make_contract_id(1);
let b = make_contract_id(2);
queue.try_push(make_event_id(0), make_get_event(a)).unwrap();
let (id0, _) = queue.pop().expect("should pop A");
assert_eq!(id0.id, 0);
queue.try_push(make_event_id(1), make_get_event(b)).unwrap();
queue.try_push(make_event_id(2), make_get_event(a)).unwrap();
let (id1, ev1) = queue.pop().expect("should pop B");
let ContractHandlerEvent::GetQuery { instance_id, .. } = ev1 else {
panic!("unexpected event type");
};
assert_eq!(instance_id, b, "expected B (id={})", id1.id);
let (id2, ev2) = queue.pop().expect("should pop A");
let ContractHandlerEvent::GetQuery { instance_id, .. } = ev2 else {
panic!("unexpected event type");
};
assert_eq!(instance_id, a, "expected A (id={})", id2.id);
assert!(queue.pop().is_none());
}
#[test]
fn test_interleaved_three_contracts() {
let mut queue = FairEventQueue::new();
let a = make_contract_id(1);
let b = make_contract_id(2);
let c = make_contract_id(3);
for i in 0..3u64 {
queue.try_push(make_event_id(i), make_get_event(a)).unwrap();
}
for i in 3..5u64 {
queue.try_push(make_event_id(i), make_get_event(b)).unwrap();
}
queue.try_push(make_event_id(5), make_get_event(c)).unwrap();
let mut first_round = Vec::new();
for _ in 0..3 {
let (_, ev) = queue.pop().unwrap();
let ContractHandlerEvent::GetQuery { instance_id, .. } = ev else {
panic!("unexpected");
};
first_round.push(instance_id);
}
assert_eq!(first_round, vec![a, b, c]);
let mut second_round = Vec::new();
for _ in 0..2 {
let (_, ev) = queue.pop().unwrap();
let ContractHandlerEvent::GetQuery { instance_id, .. } = ev else {
panic!("unexpected");
};
second_round.push(instance_id);
}
assert_eq!(second_round, vec![a, b]);
let (_, ev) = queue.pop().unwrap();
let ContractHandlerEvent::GetQuery { instance_id, .. } = ev else {
panic!("unexpected");
};
assert_eq!(instance_id, a);
assert!(queue.pop().is_none());
}
#[test]
fn test_extract_contract_id_all_variants() {
use freenet_stdlib::prelude::UpdateData;
use tokio::sync::mpsc;
let code = ContractCode::from(vec![99u8; 32]);
let params = Parameters::from(vec![88u8; 8]);
let key = ContractKey::from_params_and_code(¶ms, &code);
let contract_id = *key.id();
let contract_events: Vec<(&str, ContractHandlerEvent)> = vec![
(
"PutQuery",
ContractHandlerEvent::PutQuery {
key,
state: WrappedState::new(vec![1]),
related_contracts: RelatedContracts::default(),
contract: None,
},
),
(
"UpdateQuery",
ContractHandlerEvent::UpdateQuery {
key,
data: UpdateData::Delta(freenet_stdlib::prelude::StateDelta::from(vec![2])),
related_contracts: RelatedContracts::default(),
},
),
(
"GetQuery",
ContractHandlerEvent::GetQuery {
instance_id: contract_id,
return_contract_code: false,
},
),
(
"GetSummaryQuery",
ContractHandlerEvent::GetSummaryQuery { key },
),
(
"GetDeltaQuery",
ContractHandlerEvent::GetDeltaQuery {
key,
their_summary: freenet_stdlib::prelude::StateSummary::from(vec![3]),
},
),
(
"RegisterSubscriberListener",
ContractHandlerEvent::RegisterSubscriberListener {
key: contract_id,
client_id: crate::client_events::ClientId::next(),
summary: None,
subscriber_listener: mpsc::channel(64).0,
},
),
(
"NotifySubscriptionError",
ContractHandlerEvent::NotifySubscriptionError {
key: contract_id,
reason: "test".to_string(),
},
),
];
for (name, event) in &contract_events {
let result = extract_contract_id(event);
assert_eq!(
result,
Some(contract_id),
"{name} should route to contract queue"
);
}
let default_events: Vec<(&str, ContractHandlerEvent)> = vec![
(
"DelegateRequest",
ContractHandlerEvent::DelegateRequest {
req: freenet_stdlib::client_api::DelegateRequest::ApplicationMessages {
key: freenet_stdlib::prelude::DelegateKey::new(
[1u8; 32],
freenet_stdlib::prelude::CodeHash::new([0u8; 32]),
),
params: Parameters::from(vec![]),
inbound: vec![],
},
origin_contract: None,
},
),
(
"DelegateResponse",
ContractHandlerEvent::DelegateResponse(vec![]),
),
(
"ClientDisconnect",
ContractHandlerEvent::ClientDisconnect {
client_id: crate::client_events::ClientId::next(),
},
),
(
"PutResponse",
ContractHandlerEvent::PutResponse {
new_value: Ok(WrappedState::new(vec![])),
state_changed: false,
},
),
(
"GetResponse",
ContractHandlerEvent::GetResponse {
key: None,
response: Ok(crate::contract::handler::StoreResponse {
state: None,
contract: None,
}),
},
),
(
"UpdateResponse",
ContractHandlerEvent::UpdateResponse {
new_value: Ok(WrappedState::new(vec![])),
state_changed: false,
},
),
(
"UpdateNoChange",
ContractHandlerEvent::UpdateNoChange { key },
),
(
"RegisterSubscriberListenerResponse",
ContractHandlerEvent::RegisterSubscriberListenerResponse,
),
(
"QuerySubscriptionsResponse",
ContractHandlerEvent::QuerySubscriptionsResponse,
),
(
"GetSummaryResponse",
ContractHandlerEvent::GetSummaryResponse {
key,
summary: Ok(freenet_stdlib::prelude::StateSummary::from(vec![])),
},
),
(
"GetDeltaResponse",
ContractHandlerEvent::GetDeltaResponse {
key,
delta: Ok(freenet_stdlib::prelude::StateDelta::from(vec![])),
},
),
(
"NotifySubscriptionErrorResponse",
ContractHandlerEvent::NotifySubscriptionErrorResponse,
),
(
"QuerySubscriptions",
ContractHandlerEvent::QuerySubscriptions {
callback: mpsc::channel::<crate::message::QueryResult>(1).0,
},
),
];
for (name, event) in &default_events {
let result = extract_contract_id(event);
assert_eq!(result, None, "{name} should route to default queue");
}
}
}