use super::*;
use crate::memory::manager::MemoryManager;
use crate::memory::static_manager::StaticMemoryManager;
use crate::message::{Message, MessageHeader};
use crate::policy::{AdmissionPolicy, BatchingPolicy, EdgePolicy, OverBudgetAction, QueueCaps};
use crate::prelude::{create_test_tensor_filled_with, TestTensor};
use crate::types::{DeadlineNs, MessageToken, Ticks};
const TEST_EDGE_POLICY: EdgePolicy = EdgePolicy::new(
QueueCaps::new(8, 6, None, None),
AdmissionPolicy::DropNewest,
OverBudgetAction::Drop,
);
const MGR_DEPTH: usize = 32;
fn make_msg_tensor(tick: u64) -> Message<TestTensor> {
let mut h = MessageHeader::empty();
h.set_creation_tick(Ticks::new(tick));
Message::new(h, create_test_tensor_filled_with(0))
}
fn store(
mgr: &mut StaticMemoryManager<TestTensor, MGR_DEPTH>,
msg: Message<TestTensor>,
) -> MessageToken {
mgr.store(msg).expect("memory manager store failed")
}
#[macro_export]
macro_rules! run_edge_contract_tests {
($mod_name:ident, $make:expr) => {
#[cfg(test)]
mod $mod_name {
use super::*;
use $crate::edge::contract_tests as fixtures;
#[test]
fn basic_push_pop() {
fixtures::run_basic_push_pop(|| $make());
}
#[test]
fn fifo_order() {
fixtures::run_fifo_order(|| $make());
}
#[test]
fn occupancy_and_empty() {
fixtures::run_occupancy_and_empty(|| $make());
}
#[test]
fn batch_fixed_n() {
fixtures::run_batch_fixed_n(|| $make());
}
#[test]
fn batch_delta_t() {
fixtures::run_batch_delta_t(|| $make());
}
#[test]
fn batch_fixed_and_delta() {
fixtures::run_batch_fixed_and_delta(|| $make());
}
#[test]
fn batch_sliding() {
fixtures::run_batch_sliding(|| $make());
}
#[test]
fn batch_default_one() {
fixtures::run_batch_default_one(|| $make());
}
#[test]
fn admission_policies() {
fixtures::run_admission_policies(|| $make());
}
#[test]
fn admission_drop_newest_between_soft_and_hard() {
fixtures::run_admission_drop_newest_between_soft_and_hard(|| $make());
}
#[test]
fn admission_evict_until_below_hard() {
fixtures::run_admission_evict_until_below_hard(|| $make());
}
#[test]
fn admission_item_bytes_and_deadline_semantics() {
fixtures::run_admission_item_bytes_and_deadline_semantics(|| $make());
}
#[test]
fn try_peek_at() {
fixtures::run_try_peek_at(|| $make());
}
#[test]
fn get_admission_decision_is_pure() {
fixtures::run_get_admission_decision_is_pure(|| $make());
}
#[test]
fn byte_tracking_roundtrip() {
fixtures::run_byte_tracking_roundtrip(|| $make());
}
#[test]
fn evict_until_below_hard_caller_pattern() {
fixtures::run_evict_until_below_hard_caller_pattern(|| $make());
}
#[test]
fn try_push_never_evicts() {
fixtures::run_try_push_never_evicts(|| $make());
}
}
};
}
pub fn run_all_tests<Q, F>(mut make: F)
where
F: FnMut() -> Q,
Q: Edge,
{
run_basic_push_pop(&mut make);
run_fifo_order(&mut make);
run_occupancy_and_empty(&mut make);
run_batch_fixed_n(&mut make);
run_batch_delta_t(&mut make);
run_batch_fixed_and_delta(&mut make);
run_batch_sliding(&mut make);
run_batch_default_one(&mut make);
run_admission_policies(&mut make);
run_admission_drop_newest_between_soft_and_hard(&mut make);
run_admission_evict_until_below_hard(&mut make);
run_admission_item_bytes_and_deadline_semantics(&mut make);
run_try_peek_at(&mut make);
run_get_admission_decision_is_pure(&mut make);
run_byte_tracking_roundtrip(&mut make);
run_evict_until_below_hard_caller_pattern(&mut make);
run_try_push_never_evicts(&mut make);
}
pub fn run_basic_push_pop<Q, F>(mut make: F)
where
F: FnMut() -> Q,
Q: Edge,
{
let mut q = make();
let mut mgr: StaticMemoryManager<TestTensor, MGR_DEPTH> = StaticMemoryManager::new();
let policy = TEST_EDGE_POLICY;
assert!(matches!(q.try_pop(&mgr), Err(QueueError::Empty)));
assert!(matches!(q.try_peek(), Err(QueueError::Empty)));
assert!(q.is_empty());
let m = make_msg_tensor(1);
let token = store(&mut mgr, m);
assert_eq!(q.try_push(token, &policy, &mgr), EnqueueResult::Enqueued);
let peek_token = q.try_peek().expect("peek after push");
assert_eq!(peek_token, token);
{
let peek_header = mgr.peek_header(peek_token).expect("peek header");
assert_eq!(*peek_header.creation_tick(), Ticks::new(1));
}
assert!(!q.is_empty());
let got_token = q.try_pop(&mgr).expect("pop after push");
assert_eq!(got_token, token);
{
let got_header = mgr.peek_header(got_token).expect("got header");
assert_eq!(*got_header.creation_tick(), Ticks::new(1));
}
assert!(matches!(q.try_pop(&mgr), Err(QueueError::Empty)));
assert!(q.is_empty());
mgr.free(got_token).expect("free");
}
pub fn run_fifo_order<Q, F>(mut make: F)
where
F: FnMut() -> Q,
Q: Edge,
{
let mut q = make();
let mut mgr: StaticMemoryManager<TestTensor, MGR_DEPTH> = StaticMemoryManager::new();
let policy = TEST_EDGE_POLICY;
let mut tokens = [MessageToken::INVALID; 5];
for (i, t) in (1u64..6u64).enumerate() {
let m = make_msg_tensor(t);
tokens[i] = store(&mut mgr, m);
assert_eq!(
q.try_push(tokens[i], &policy, &mgr),
EnqueueResult::Enqueued
);
}
for (i, expected) in (1u64..6u64).enumerate() {
let popped = q.try_pop(&mgr).expect("pop");
assert_eq!(popped, tokens[i]);
let h = mgr.peek_header(popped).expect("header");
assert_eq!(*h.creation_tick().as_u64(), expected);
}
assert!(matches!(q.try_pop(&mgr), Err(QueueError::Empty)));
}
pub fn run_occupancy_and_empty<Q, F>(mut make: F)
where
F: FnMut() -> Q,
Q: Edge,
{
let mut q = make();
let mut mgr: StaticMemoryManager<TestTensor, MGR_DEPTH> = StaticMemoryManager::new();
let policy = TEST_EDGE_POLICY;
let occ0 = q.occupancy(&policy);
assert_eq!(*occ0.items(), 0usize);
let m = make_msg_tensor(1);
let token = store(&mut mgr, m);
assert_eq!(q.try_push(token, &policy, &mgr), EnqueueResult::Enqueued);
let occ1 = q.occupancy(&policy);
assert_eq!(*occ1.items(), 1usize);
let _ = q.try_pop(&mgr).expect("pop");
let occ2 = q.occupancy(&policy);
assert_eq!(*occ2.items(), 0usize);
}
pub fn run_batch_fixed_n<Q, F>(mut make: F)
where
F: FnMut() -> Q,
Q: Edge,
{
let mut q = make();
let mut mgr: StaticMemoryManager<TestTensor, MGR_DEPTH> = StaticMemoryManager::new();
let policy = TEST_EDGE_POLICY;
let mut tokens = [MessageToken::INVALID; 5];
for (i, t) in (1u64..=5u64).enumerate() {
let m = make_msg_tensor(t);
tokens[i] = store(&mut mgr, m);
assert_eq!(
q.try_push(tokens[i], &policy, &mgr),
EnqueueResult::Enqueued
);
}
let batch_policy = BatchingPolicy::fixed(3);
let batch = q.try_pop_batch(&batch_policy, &mgr).expect("batch");
assert_eq!(batch.len(), 3);
let mut iter = batch.iter();
let a = iter.next().expect("batch[0]");
let b = iter.next().expect("batch[1]");
let c = iter.next().expect("batch[2]");
assert_eq!(*mgr.peek_header(*a).unwrap().creation_tick(), Ticks::new(1));
assert_eq!(*mgr.peek_header(*b).unwrap().creation_tick(), Ticks::new(2));
assert_eq!(*mgr.peek_header(*c).unwrap().creation_tick(), Ticks::new(3));
assert!(iter.next().is_none());
let ra = q.try_pop(&mgr).expect("rem1");
let rb = q.try_pop(&mgr).expect("rem2");
assert_eq!(*mgr.peek_header(ra).unwrap().creation_tick(), Ticks::new(4));
assert_eq!(*mgr.peek_header(rb).unwrap().creation_tick(), Ticks::new(5));
assert!(matches!(q.try_pop(&mgr), Err(QueueError::Empty)));
}
pub fn run_batch_delta_t<Q, F>(mut make: F)
where
F: FnMut() -> Q,
Q: Edge,
{
let mut q = make();
let mut mgr: StaticMemoryManager<TestTensor, MGR_DEPTH> = StaticMemoryManager::new();
let policy = TEST_EDGE_POLICY;
for t in [10u64, 11u64, 12u64, 30u64].iter() {
let m = make_msg_tensor(*t);
let token = store(&mut mgr, m);
assert_eq!(q.try_push(token, &policy, &mgr), EnqueueResult::Enqueued);
}
let batch_policy = BatchingPolicy::delta_t(Ticks::new(2u64));
let batch = q.try_pop_batch(&batch_policy, &mgr).expect("batch");
assert_eq!(batch.len(), 3);
let mut iter = batch.iter();
let a = iter.next().expect("batch[0]");
let b = iter.next().expect("batch[1]");
let c = iter.next().expect("batch[2]");
assert_eq!(
*mgr.peek_header(*a).unwrap().creation_tick(),
Ticks::new(10)
);
assert_eq!(
*mgr.peek_header(*b).unwrap().creation_tick(),
Ticks::new(11)
);
assert_eq!(
*mgr.peek_header(*c).unwrap().creation_tick(),
Ticks::new(12)
);
assert!(iter.next().is_none());
let last = q.try_pop(&mgr).expect("remaining");
assert_eq!(
*mgr.peek_header(last).unwrap().creation_tick(),
Ticks::new(30)
);
}
pub fn run_batch_fixed_and_delta<Q, F>(mut make: F)
where
F: FnMut() -> Q,
Q: Edge,
{
let mut q = make();
let mut mgr: StaticMemoryManager<TestTensor, MGR_DEPTH> = StaticMemoryManager::new();
let policy = TEST_EDGE_POLICY;
for t in [100u64, 101u64, 102u64, 110u64].iter() {
let m = make_msg_tensor(*t);
let token = store(&mut mgr, m);
assert_eq!(q.try_push(token, &policy, &mgr), EnqueueResult::Enqueued);
}
let batch_policy = BatchingPolicy::fixed_and_delta_t(2, Ticks::new(5u64));
let batch = q.try_pop_batch(&batch_policy, &mgr).expect("batch");
assert_eq!(batch.len(), 2);
let mut iter = batch.iter();
let a = iter.next().expect("batch[0]");
let b = iter.next().expect("batch[1]");
assert_eq!(
*mgr.peek_header(*a).unwrap().creation_tick(),
Ticks::new(100)
);
assert_eq!(
*mgr.peek_header(*b).unwrap().creation_tick(),
Ticks::new(101)
);
assert!(iter.next().is_none());
let ra = q.try_pop(&mgr).expect("a");
assert_eq!(
*mgr.peek_header(ra).unwrap().creation_tick(),
Ticks::new(102)
);
let rb = q.try_pop(&mgr).expect("b");
assert_eq!(
*mgr.peek_header(rb).unwrap().creation_tick(),
Ticks::new(110)
);
}
pub fn run_batch_sliding<Q, F>(mut make: F)
where
F: FnMut() -> Q,
Q: Edge,
{
let mut q = make();
let mut mgr: StaticMemoryManager<TestTensor, MGR_DEPTH> = StaticMemoryManager::new();
let policy = TEST_EDGE_POLICY;
for t in 1u64..=6u64 {
let m = make_msg_tensor(t);
let token = store(&mut mgr, m);
assert_eq!(q.try_push(token, &policy, &mgr), EnqueueResult::Enqueued);
}
let sw = crate::policy::WindowKind::Sliding(crate::policy::SlidingWindow::new(2));
let batch_policy = crate::policy::BatchingPolicy::with_window(Some(4), None, sw);
let batch = q.try_pop_batch(&batch_policy, &mgr).expect("batch");
assert_eq!(batch.len(), 4);
let mut iter = batch.iter();
let a = iter.next().expect("batch[0]");
let b = iter.next().expect("batch[1]");
let c = iter.next().expect("batch[2]");
let d = iter.next().expect("batch[3]");
assert_eq!(*mgr.peek_header(*a).unwrap().creation_tick(), Ticks::new(1));
assert_eq!(*mgr.peek_header(*b).unwrap().creation_tick(), Ticks::new(2));
assert_eq!(*mgr.peek_header(*c).unwrap().creation_tick(), Ticks::new(3));
assert_eq!(*mgr.peek_header(*d).unwrap().creation_tick(), Ticks::new(4));
assert!(iter.next().is_none());
let next = q.try_pop(&mgr).expect("next after sliding");
assert_eq!(
*mgr.peek_header(next).unwrap().creation_tick(),
Ticks::new(3)
);
}
pub fn run_batch_default_one<Q, F>(mut make: F)
where
F: FnMut() -> Q,
Q: Edge,
{
let mut q = make();
let mut mgr: StaticMemoryManager<TestTensor, MGR_DEPTH> = StaticMemoryManager::new();
let policy = TEST_EDGE_POLICY;
for t in [1u64, 2u64, 3u64].iter() {
let m = make_msg_tensor(*t);
let token = store(&mut mgr, m);
assert_eq!(q.try_push(token, &policy, &mgr), EnqueueResult::Enqueued);
}
let batch_policy = BatchingPolicy::default();
let batch = q.try_pop_batch(&batch_policy, &mgr).expect("batch");
assert_eq!(batch.len(), 1);
let first_token = batch.iter().next().unwrap();
assert_eq!(
*mgr.peek_header(*first_token).unwrap().creation_tick(),
Ticks::new(1)
);
let a = q.try_pop(&mgr).expect("a");
let b = q.try_pop(&mgr).expect("b");
assert_eq!(*mgr.peek_header(a).unwrap().creation_tick(), Ticks::new(2));
assert_eq!(*mgr.peek_header(b).unwrap().creation_tick(), Ticks::new(3));
assert!(matches!(q.try_pop(&mgr), Err(QueueError::Empty)));
}
pub fn run_admission_policies<Q, F>(mut make: F)
where
F: FnMut() -> Q,
Q: Edge,
{
let caps = QueueCaps::new(3, 1, None, None);
{
let mut q = make();
let mut mgr: StaticMemoryManager<TestTensor, MGR_DEPTH> = StaticMemoryManager::new();
let policy = EdgePolicy::new(caps, AdmissionPolicy::DropNewest, OverBudgetAction::Drop);
let a_msg = make_msg_tensor(1);
let a_token = store(&mut mgr, a_msg);
assert_eq!(q.try_push(a_token, &policy, &mgr), EnqueueResult::Enqueued);
let b_msg = make_msg_tensor(2);
let b_token = store(&mut mgr, b_msg);
let res = q.try_push(b_token, &policy, &mgr);
assert_eq!(res, EnqueueResult::DroppedNewest);
let peek_token = q.try_peek().expect("peek after drop-newest");
assert_eq!(peek_token, a_token);
let popped = q.try_pop(&mgr).expect("pop a");
assert_eq!(popped, a_token);
assert!(matches!(q.try_pop(&mgr), Err(QueueError::Empty)));
}
{
let mut q = make();
let mut mgr: StaticMemoryManager<TestTensor, MGR_DEPTH> = StaticMemoryManager::new();
let policy = EdgePolicy::new(caps, AdmissionPolicy::DropOldest, OverBudgetAction::Drop);
let a_token = store(&mut mgr, make_msg_tensor(1));
let b_token = store(&mut mgr, make_msg_tensor(2));
let c_token = store(&mut mgr, make_msg_tensor(3));
assert_eq!(q.try_push(a_token, &policy, &mgr), EnqueueResult::Enqueued);
assert_eq!(q.try_push(b_token, &policy, &mgr), EnqueueResult::Enqueued);
assert_eq!(q.try_push(c_token, &policy, &mgr), EnqueueResult::Enqueued);
let d_token = store(&mut mgr, make_msg_tensor(4));
assert_eq!(q.try_push(d_token, &policy, &mgr), EnqueueResult::Rejected);
let evicted = q.try_pop(&mgr).expect("pre-evict pop");
assert_eq!(evicted, a_token);
let _ = mgr.free(evicted);
assert_eq!(q.try_push(d_token, &policy, &mgr), EnqueueResult::Enqueued);
assert_eq!(q.try_pop(&mgr).expect("pop b"), b_token);
assert_eq!(q.try_pop(&mgr).expect("pop c"), c_token);
assert_eq!(q.try_pop(&mgr).expect("pop d"), d_token);
assert!(matches!(q.try_pop(&mgr), Err(QueueError::Empty)));
}
{
let mut q = make();
let mut mgr: StaticMemoryManager<TestTensor, MGR_DEPTH> = StaticMemoryManager::new();
let policy = EdgePolicy::new(caps, AdmissionPolicy::Block, OverBudgetAction::Drop);
let a_msg = make_msg_tensor(1);
let a_token = store(&mut mgr, a_msg);
assert_eq!(q.try_push(a_token, &policy, &mgr), EnqueueResult::Enqueued);
let b_msg = make_msg_tensor(2);
let b_token = store(&mut mgr, b_msg);
let res = q.try_push(b_token, &policy, &mgr);
assert_eq!(res, EnqueueResult::Rejected);
let popped = q.try_pop(&mgr).expect("pop after block");
assert_eq!(popped, a_token);
assert!(matches!(q.try_pop(&mgr), Err(QueueError::Empty)));
}
{
let mut q = make();
let mut mgr: StaticMemoryManager<TestTensor, MGR_DEPTH> = StaticMemoryManager::new();
let policy = EdgePolicy::new(
caps,
AdmissionPolicy::DeadlineAndQoSAware,
OverBudgetAction::Drop,
);
let a_msg = make_msg_tensor(1);
let a_token = store(&mut mgr, a_msg);
assert_eq!(q.try_push(a_token, &policy, &mgr), EnqueueResult::Enqueued);
let b_msg = make_msg_tensor(2);
let b_token = store(&mut mgr, b_msg);
let res = q.try_push(b_token, &policy, &mgr);
assert_eq!(res, EnqueueResult::Enqueued);
let x = q.try_pop(&mgr).expect("pop a");
let y = q.try_pop(&mgr).expect("pop b");
assert_eq!(x, a_token);
assert_eq!(y, b_token);
assert!(matches!(q.try_pop(&mgr), Err(QueueError::Empty)));
}
}
pub fn run_admission_drop_newest_between_soft_and_hard<Q, F>(mut make: F)
where
F: FnMut() -> Q,
Q: Edge,
{
let caps = QueueCaps::new(4, 2, None, None);
let mut q = make();
let mut mgr: StaticMemoryManager<TestTensor, MGR_DEPTH> = StaticMemoryManager::new();
let policy_drop_newest =
EdgePolicy::new(caps, AdmissionPolicy::DropNewest, OverBudgetAction::Drop);
let m1 = make_msg_tensor(1);
let t1 = store(&mut mgr, m1);
assert_eq!(
q.try_push(t1, &policy_drop_newest, &mgr),
EnqueueResult::Enqueued
);
let m2 = make_msg_tensor(2);
let t2 = store(&mut mgr, m2);
assert_eq!(
q.try_push(t2, &policy_drop_newest, &mgr),
EnqueueResult::Enqueued
);
let m3 = make_msg_tensor(3);
let t3 = store(&mut mgr, m3);
let decision = q.get_admission_decision(&policy_drop_newest, t3, &mgr);
assert_eq!(decision, crate::policy::AdmissionDecision::DropNewest);
}
pub fn run_admission_evict_until_below_hard<Q, F>(mut make: F)
where
F: FnMut() -> Q,
Q: Edge,
{
let caps = QueueCaps::new(4, 2, Some(1024), Some(512));
let mut q = make();
let mut mgr: StaticMemoryManager<TestTensor, MGR_DEPTH> = StaticMemoryManager::new();
let policy_fill = EdgePolicy::new(
caps,
AdmissionPolicy::DeadlineAndQoSAware,
OverBudgetAction::Drop,
);
for _ in 0..*caps.max_items() {
let m = make_msg_tensor(10);
let token = store(&mut mgr, m);
let _ = q.try_push(token, &policy_fill, &mgr);
}
let policy_drop_oldest =
EdgePolicy::new(caps, AdmissionPolicy::DropOldest, OverBudgetAction::Drop);
let small_msg = make_msg_tensor(20);
let small_token = store(&mut mgr, small_msg);
let decision_small = q.get_admission_decision(&policy_drop_oldest, small_token, &mgr);
assert_eq!(
decision_small,
crate::policy::AdmissionDecision::EvictUntilBelowHard
);
let mut large_msg = make_msg_tensor(30);
large_msg.header_mut().set_payload_size_bytes(2048);
let large_token = store(&mut mgr, large_msg);
let decision_large = q.get_admission_decision(&policy_drop_oldest, large_token, &mgr);
assert_eq!(decision_large, crate::policy::AdmissionDecision::Reject);
}
pub fn run_admission_item_bytes_and_deadline_semantics<Q, F>(mut make: F)
where
F: FnMut() -> Q,
Q: Edge,
{
let caps = QueueCaps::new(100, 50, None, None);
let q = make();
let mut mgr: StaticMemoryManager<TestTensor, MGR_DEPTH> = StaticMemoryManager::new();
let mut m = make_msg_tensor(1);
m.header_mut().set_deadline_ns(Some(DeadlineNs::new(2000)));
let token = store(&mut mgr, m);
let policy = EdgePolicy::new(
caps,
AdmissionPolicy::DeadlineAndQoSAware,
OverBudgetAction::Drop,
);
let decision = q.get_admission_decision(&policy, token, &mgr);
assert_eq!(decision, crate::policy::AdmissionDecision::Admit);
let h = mgr.peek_header(token).unwrap();
assert_eq!(*h.deadline_ns(), Some(DeadlineNs::new(2000)));
}
pub fn run_try_peek_at<Q, F>(mut make: F)
where
F: FnMut() -> Q,
Q: Edge,
{
let mut q = make();
let mut mgr: StaticMemoryManager<TestTensor, MGR_DEPTH> = StaticMemoryManager::new();
let policy = TEST_EDGE_POLICY;
assert!(matches!(q.try_peek_at(0), Err(QueueError::Empty)));
let mut tokens = [MessageToken::INVALID; 4];
for (i, t) in (1u64..=4u64).enumerate() {
let m = make_msg_tensor(t);
tokens[i] = store(&mut mgr, m);
assert_eq!(
q.try_push(tokens[i], &policy, &mgr),
EnqueueResult::Enqueued
);
}
for (idx, expected_tick) in (0usize..4usize).zip(1u64..=4u64) {
let peek_token = q.try_peek_at(idx).expect("peek_at in range");
assert_eq!(peek_token, tokens[idx]);
let h = mgr.peek_header(peek_token).expect("header");
assert_eq!(*h.creation_tick().as_u64(), expected_tick);
}
assert!(matches!(q.try_peek_at(4), Err(QueueError::Empty)));
for (i, expected_tick) in (0usize..4usize).zip(1u64..=4u64) {
let popped = q.try_pop(&mgr).expect("pop after peek_at");
assert_eq!(popped, tokens[i]);
let h = mgr.peek_header(popped).expect("header");
assert_eq!(*h.creation_tick().as_u64(), expected_tick);
}
assert!(matches!(q.try_pop(&mgr), Err(QueueError::Empty)));
}
pub fn run_get_admission_decision_is_pure<Q, F>(mut make: F)
where
F: FnMut() -> Q,
Q: Edge,
{
let caps = QueueCaps::new(4, 2, None, None);
let mut q = make();
let mut mgr: StaticMemoryManager<TestTensor, MGR_DEPTH> = StaticMemoryManager::new();
let policy = EdgePolicy::new(caps, AdmissionPolicy::DropOldest, OverBudgetAction::Drop);
let t1 = store(&mut mgr, make_msg_tensor(1));
let t2 = store(&mut mgr, make_msg_tensor(2));
assert_eq!(q.try_push(t1, &policy, &mgr), EnqueueResult::Enqueued);
assert_eq!(q.try_push(t2, &policy, &mgr), EnqueueResult::Enqueued);
let probe = store(&mut mgr, make_msg_tensor(3));
let d1 = q.get_admission_decision(&policy, probe, &mgr);
let d2 = q.get_admission_decision(&policy, probe, &mgr);
let d3 = q.get_admission_decision(&policy, probe, &mgr);
assert_eq!(d1, d2);
assert_eq!(d2, d3);
assert_eq!(q.try_pop(&mgr).expect("first pop"), t1);
assert_eq!(q.try_pop(&mgr).expect("second pop"), t2);
assert!(matches!(q.try_pop(&mgr), Err(QueueError::Empty)));
}
pub fn run_byte_tracking_roundtrip<Q, F>(mut make: F)
where
F: FnMut() -> Q,
Q: Edge,
{
let caps = QueueCaps::new(8, 6, Some(4096), Some(2048));
let mut q = make();
let mut mgr: StaticMemoryManager<TestTensor, MGR_DEPTH> = StaticMemoryManager::new();
let policy = EdgePolicy::new(
caps,
AdmissionPolicy::DeadlineAndQoSAware,
OverBudgetAction::Drop,
);
let mut tokens = [MessageToken::default(); 4];
for (i, slot) in tokens.iter_mut().enumerate() {
let mut m = make_msg_tensor(i as u64 + 1);
m.header_mut().set_payload_size_bytes(100);
let t = store(&mut mgr, m);
assert_eq!(q.try_push(t, &policy, &mgr), EnqueueResult::Enqueued);
*slot = t;
}
let occ = q.occupancy(&policy);
assert_eq!(*occ.items(), 4);
assert_eq!(*occ.bytes(), 400);
for expected in tokens.iter() {
let got = q.try_pop(&mgr).expect("pop");
assert_eq!(got, *expected);
}
let occ_after = q.occupancy(&policy);
assert_eq!(*occ_after.items(), 0);
assert_eq!(*occ_after.bytes(), 0);
assert!(q.is_empty());
}
pub fn run_evict_until_below_hard_caller_pattern<Q, F>(mut make: F)
where
F: FnMut() -> Q,
Q: Edge,
{
let caps = QueueCaps::new(4, 2, None, None);
let mut q = make();
let mut mgr: StaticMemoryManager<TestTensor, MGR_DEPTH> = StaticMemoryManager::new();
let policy = EdgePolicy::new(caps, AdmissionPolicy::DropOldest, OverBudgetAction::Drop);
let tokens: [MessageToken; 4] = core::array::from_fn(|i| {
let t = store(&mut mgr, make_msg_tensor(i as u64 + 1));
assert_eq!(q.try_push(t, &policy, &mgr), EnqueueResult::Enqueued);
t
});
assert_eq!(*q.occupancy(&policy).items(), 4);
let new_token = store(&mut mgr, make_msg_tensor(10));
loop {
let occ = q.occupancy(&policy);
if !policy.caps.at_or_above_hard(*occ.items(), *occ.bytes()) {
break;
}
match q.try_pop(&mgr) {
Ok(evicted) => {
let _ = mgr.free(evicted);
}
Err(_) => break,
}
}
let result = q.try_push(new_token, &policy, &mgr);
assert_eq!(result, EnqueueResult::Enqueued);
let occ = q.occupancy(&policy);
assert_eq!(
*occ.items(),
4,
"expected 4 items after pre-evict-one + push; double-eviction would give 3"
);
assert_eq!(q.try_pop(&mgr).expect("pop"), tokens[1]);
assert_eq!(q.try_pop(&mgr).expect("pop"), tokens[2]);
assert_eq!(q.try_pop(&mgr).expect("pop"), tokens[3]);
assert_eq!(q.try_pop(&mgr).expect("pop"), new_token);
assert!(matches!(q.try_pop(&mgr), Err(QueueError::Empty)));
}
pub fn run_try_push_never_evicts<Q, F>(mut make: F)
where
F: FnMut() -> Q,
Q: Edge,
{
let caps = QueueCaps::new(4, 2, None, None);
let policy = EdgePolicy::new(caps, AdmissionPolicy::DropOldest, OverBudgetAction::Drop);
for fill in 0usize..=4 {
let mut q = make();
let mut mgr: StaticMemoryManager<TestTensor, MGR_DEPTH> = StaticMemoryManager::new();
for i in 0..fill {
let t = store(&mut mgr, make_msg_tensor(i as u64));
let _ = q.try_push(t, &policy, &mgr);
}
let before = *q.occupancy(&policy).items();
let probe = store(&mut mgr, make_msg_tensor(99));
let _ = q.try_push(probe, &policy, &mgr);
let after = *q.occupancy(&policy).items();
assert!(
after >= before,
"try_push decreased queue length from {} to {} at fill={}; \
eviction must not happen inside try_push",
before,
after,
fill
);
assert!(
after <= before + 1,
"try_push increased queue length by more than 1 (from {} to {})",
before,
after
);
}
}