use super::*;
use crate::{
message::MessageHeader,
policy::{AdmissionPolicy, OverBudgetAction, QueueCaps},
prelude::{
fixed_buffer_line_writer, EdgeLink, FixedBuffer, FmtLineWriter, GraphTelemetry,
NoStdLinuxMonotonicClock, NodeLink, StaticMemoryManager, TestSpscRingBuf,
},
types::{EdgeIndex, NodeIndex, PortId, PortIndex, Ticks},
};
use heapless::Vec;
const TELE_NODES: usize = 8;
const TELE_EDGES: usize = 16;
const TELE_BUF_BYTES: usize = 1024;
const TEST_EDGE_POLICY: EdgePolicy = EdgePolicy::new(
QueueCaps::new(16, 14, None, None),
AdmissionPolicy::DropNewest,
OverBudgetAction::Drop,
);
const TEST_DROP_OLDEST_POLICY: EdgePolicy = EdgePolicy::new(
QueueCaps::new(4, 2, None, None),
AdmissionPolicy::DropOldest,
OverBudgetAction::Drop,
);
#[macro_export]
macro_rules! run_node_contract_tests {
($mod_name:ident, {
make_nodelink: $make_nodelink:expr
}) => {
#[cfg(test)]
mod $mod_name {
use super::*;
use $crate::node::contract_tests as fixtures;
#[test]
fn initialize_start_stop_roundtrip() {
fixtures::run_initialize_start_stop_roundtrip(|| $make_nodelink());
}
#[test]
fn process_message_enqueues_and_made_progress() {
fixtures::run_process_message_enqueues_and_made_progress(|| $make_nodelink());
}
#[test]
fn step_on_empty_returns_noinput() {
fixtures::run_step_on_empty_returns_noinput(|| $make_nodelink());
}
#[test]
fn step_pops_and_calls_process_message() {
fixtures::run_step_pops_and_calls_process_message(|| $make_nodelink());
}
#[test]
fn step_batch_respects_fixed_n_disjoint() {
fixtures::run_step_batch_fixed_n_disjoint(|| $make_nodelink());
}
#[test]
fn step_batch_respects_sliding_window() {
fixtures::run_step_batch_sliding_window(|| $make_nodelink());
}
#[test]
fn step_maps_backpressure_and_errors() {
fixtures::run_step_maps_backpressure_and_errors(|| $make_nodelink());
}
#[test]
fn source_specific_behaviour() {
fixtures::run_source_specific_tests(|| $make_nodelink());
}
#[test]
fn sink_specific_behaviour() {
fixtures::run_sink_specific_tests(|| $make_nodelink());
}
#[test]
fn model_specific_batching_behaviour() {
fixtures::run_model_batching_tests(|| $make_nodelink());
}
#[test]
fn fixed_n_with_max_delta_t_behaviour() {
fixtures::run_step_batch_fixed_n_max_delta_t_tests(|| $make_nodelink());
}
#[test]
fn push_output_drop_oldest_evicts_oldest_once() {
fixtures::run_push_output_drop_oldest_evicts_oldest_once(|| $make_nodelink());
}
#[test]
fn push_output_no_token_leak_on_backpressure() {
fixtures::run_push_output_no_token_leak_on_backpressure(|| $make_nodelink());
}
#[test]
fn push_output_evict_until_below_hard_no_double_eviction() {
fixtures::run_push_output_evict_until_below_hard_no_double_eviction(|| {
$make_nodelink()
});
}
}
};
}
fn make_graph_telemetry(
) -> GraphTelemetry<TELE_NODES, TELE_EDGES, FmtLineWriter<FixedBuffer<TELE_BUF_BYTES>>> {
GraphTelemetry::new(0u32, true, fixed_buffer_line_writer::<TELE_BUF_BYTES>())
}
#[allow(clippy::type_complexity)]
fn make_edge_links_for_node<const IN: usize, const OUT: usize>(
base_upstream_node: NodeIndex,
base_downstream_node: NodeIndex,
) -> (
[EdgeLink<TestSpscRingBuf<16>>; IN],
[EdgeLink<TestSpscRingBuf<16>>; OUT],
) {
let inputs = core::array::from_fn(|i| {
let queue = TestSpscRingBuf::<16>::new();
let id = EdgeIndex::new(i + 1);
let upstream_port = PortId::new(base_upstream_node, PortIndex::new(i));
let downstream_port = PortId::new(base_downstream_node, PortIndex::new(i));
EdgeLink::new(
queue,
id,
upstream_port,
downstream_port,
TEST_EDGE_POLICY,
Some("in"),
)
});
let outputs = core::array::from_fn(|o| {
let queue = TestSpscRingBuf::<16>::new();
let id = EdgeIndex::new(o + 1);
let upstream_port = PortId::new(base_upstream_node, PortIndex::new(o));
let downstream_port = PortId::new(base_downstream_node, PortIndex::new(o));
EdgeLink::new(
queue,
id,
upstream_port,
downstream_port,
TEST_EDGE_POLICY,
Some("out"),
)
});
(inputs, outputs)
}
#[allow(clippy::type_complexity)]
fn build_step_context<
'graph,
'telemetry,
'clock,
const IN: usize,
const OUT: usize,
InP,
OutP,
C,
T,
>(
inputs: &'graph mut [EdgeLink<TestSpscRingBuf<16>>; IN],
outputs: &'graph mut [EdgeLink<TestSpscRingBuf<16>>; OUT],
in_managers: &'graph mut [StaticMemoryManager<InP, 16>; IN],
out_managers: &'graph mut [StaticMemoryManager<OutP, 16>; OUT],
clock: &'clock C,
telemetry: &'telemetry mut T,
) -> crate::node::StepContext<
'graph,
'telemetry,
'clock,
IN,
OUT,
InP,
OutP,
EdgeLink<TestSpscRingBuf<16>>,
EdgeLink<TestSpscRingBuf<16>>,
StaticMemoryManager<InP, 16>,
StaticMemoryManager<OutP, 16>,
C,
T,
>
where
InP: crate::message::payload::Payload + Default + Clone,
OutP: crate::message::payload::Payload + Default + Clone,
C: PlatformClock + Sized,
T: Telemetry + Sized,
{
let in_policies = core::array::from_fn(|_| TEST_EDGE_POLICY);
let out_policies = core::array::from_fn(|_| TEST_EDGE_POLICY);
let mut inputs_ref_vec: Vec<&mut EdgeLink<TestSpscRingBuf<16>>, IN> = Vec::new();
for elem in inputs.iter_mut() {
assert!(inputs_ref_vec.push(elem).is_ok(), "inputs_ref_vec overflow");
}
let mut outputs_ref_vec: Vec<&mut EdgeLink<TestSpscRingBuf<16>>, OUT> = Vec::new();
for elem in outputs.iter_mut() {
assert!(
outputs_ref_vec.push(elem).is_ok(),
"outputs_ref_vec overflow"
);
}
let mut in_mgrs_ref_vec: Vec<&mut StaticMemoryManager<InP, 16>, IN> = Vec::new();
for elem in in_managers.iter_mut() {
assert!(
in_mgrs_ref_vec.push(elem).is_ok(),
"in_mgrs_ref_vec overflow"
);
}
let mut out_mgrs_ref_vec: Vec<&mut StaticMemoryManager<OutP, 16>, OUT> = Vec::new();
for elem in out_managers.iter_mut() {
assert!(
out_mgrs_ref_vec.push(elem).is_ok(),
"out_mgrs_ref_vec overflow"
);
}
let inputs_ref: [&mut EdgeLink<TestSpscRingBuf<16>>; IN] = match inputs_ref_vec.into_array() {
Ok(arr) => arr,
Err(_) => panic!("inputs_ref_vec length mismatch"),
};
let outputs_ref: [&mut EdgeLink<TestSpscRingBuf<16>>; OUT] = match outputs_ref_vec.into_array()
{
Ok(arr) => arr,
Err(_) => panic!("outputs_ref_vec length mismatch"),
};
let in_mgrs_ref: [&mut StaticMemoryManager<InP, 16>; IN] = match in_mgrs_ref_vec.into_array() {
Ok(arr) => arr,
Err(_) => panic!("in_mgrs_ref_vec length mismatch"),
};
let out_mgrs_ref: [&mut StaticMemoryManager<OutP, 16>; OUT] =
match out_mgrs_ref_vec.into_array() {
Ok(arr) => arr,
Err(_) => panic!("out_mgrs_ref_vec length mismatch"),
};
let in_edge_ids = core::array::from_fn(|i| *inputs_ref[i].id().as_usize() as u32);
let out_edge_ids = core::array::from_fn(|o| *outputs_ref[o].id().as_usize() as u32);
crate::node::StepContext::new(
inputs_ref,
outputs_ref,
in_mgrs_ref,
out_mgrs_ref,
in_policies,
out_policies,
0u32,
in_edge_ids,
out_edge_ids,
clock,
telemetry,
)
}
#[allow(clippy::type_complexity)]
fn build_step_context_with_out_policy<
'graph,
'telemetry,
'clock,
const IN: usize,
const OUT: usize,
InP,
OutP,
C,
T,
>(
inputs: &'graph mut [EdgeLink<TestSpscRingBuf<16>>; IN],
outputs: &'graph mut [EdgeLink<TestSpscRingBuf<16>>; OUT],
in_managers: &'graph mut [StaticMemoryManager<InP, 16>; IN],
out_managers: &'graph mut [StaticMemoryManager<OutP, 16>; OUT],
out_policy: EdgePolicy,
clock: &'clock C,
telemetry: &'telemetry mut T,
) -> crate::node::StepContext<
'graph,
'telemetry,
'clock,
IN,
OUT,
InP,
OutP,
EdgeLink<TestSpscRingBuf<16>>,
EdgeLink<TestSpscRingBuf<16>>,
StaticMemoryManager<InP, 16>,
StaticMemoryManager<OutP, 16>,
C,
T,
>
where
InP: crate::message::payload::Payload + Default + Clone,
OutP: crate::message::payload::Payload + Default + Clone,
C: PlatformClock + Sized,
T: Telemetry + Sized,
{
let in_policies = core::array::from_fn(|_| TEST_EDGE_POLICY);
let out_policies = core::array::from_fn(|_| out_policy);
let mut inputs_ref_vec: Vec<&mut EdgeLink<TestSpscRingBuf<16>>, IN> = Vec::new();
for elem in inputs.iter_mut() {
assert!(inputs_ref_vec.push(elem).is_ok(), "inputs_ref_vec overflow");
}
let mut outputs_ref_vec: Vec<&mut EdgeLink<TestSpscRingBuf<16>>, OUT> = Vec::new();
for elem in outputs.iter_mut() {
assert!(
outputs_ref_vec.push(elem).is_ok(),
"outputs_ref_vec overflow"
);
}
let mut in_mgrs_ref_vec: Vec<&mut StaticMemoryManager<InP, 16>, IN> = Vec::new();
for elem in in_managers.iter_mut() {
assert!(
in_mgrs_ref_vec.push(elem).is_ok(),
"in_mgrs_ref_vec overflow"
);
}
let mut out_mgrs_ref_vec: Vec<&mut StaticMemoryManager<OutP, 16>, OUT> = Vec::new();
for elem in out_managers.iter_mut() {
assert!(
out_mgrs_ref_vec.push(elem).is_ok(),
"out_mgrs_ref_vec overflow"
);
}
let inputs_ref: [&mut EdgeLink<TestSpscRingBuf<16>>; IN] = match inputs_ref_vec.into_array() {
Ok(arr) => arr,
Err(_) => panic!("inputs_ref_vec length mismatch"),
};
let outputs_ref: [&mut EdgeLink<TestSpscRingBuf<16>>; OUT] = match outputs_ref_vec.into_array()
{
Ok(arr) => arr,
Err(_) => panic!("outputs_ref_vec length mismatch"),
};
let in_mgrs_ref: [&mut StaticMemoryManager<InP, 16>; IN] = match in_mgrs_ref_vec.into_array() {
Ok(arr) => arr,
Err(_) => panic!("in_mgrs_ref_vec length mismatch"),
};
let out_mgrs_ref: [&mut StaticMemoryManager<OutP, 16>; OUT] =
match out_mgrs_ref_vec.into_array() {
Ok(arr) => arr,
Err(_) => panic!("out_mgrs_ref_vec length mismatch"),
};
let in_edge_ids = core::array::from_fn(|i| *inputs_ref[i].id().as_usize() as u32);
let out_edge_ids = core::array::from_fn(|o| *outputs_ref[o].id().as_usize() as u32);
crate::node::StepContext::new(
inputs_ref,
outputs_ref,
in_mgrs_ref,
out_mgrs_ref,
in_policies,
out_policies,
0u32,
in_edge_ids,
out_edge_ids,
clock,
telemetry,
)
}
pub fn run_initialize_start_stop_roundtrip<N, const IN: usize, const OUT: usize, InP, OutP>(
mut make_nodelink: impl FnMut() -> NodeLink<N, IN, OUT, InP, OutP>,
) where
InP: crate::message::payload::Payload + Default + Clone,
OutP: crate::message::payload::Payload + Default + Clone,
N: crate::node::Node<IN, OUT, InP, OutP>,
{
let mut nlink = make_nodelink();
let clock = NoStdLinuxMonotonicClock::new();
let mut tele = make_graph_telemetry();
nlink.initialize(&clock, &mut tele).expect("init ok");
nlink.start(&clock, &mut tele).expect("start ok");
let _ = nlink
.on_watchdog_timeout(&clock, &mut tele)
.expect("watchdog ok");
nlink.stop(&clock, &mut tele).expect("stop ok");
}
pub fn run_process_message_enqueues_and_made_progress<
N,
const IN: usize,
const OUT: usize,
InP,
OutP,
>(
mut make_nodelink: impl FnMut() -> NodeLink<N, IN, OUT, InP, OutP>,
) where
InP: crate::message::payload::Payload + Default + Clone,
OutP: crate::message::payload::Payload + Default + Clone,
N: crate::node::Node<IN, OUT, InP, OutP>,
{
let mut nlink = make_nodelink();
let clock = NoStdLinuxMonotonicClock::new();
let mut tele = make_graph_telemetry();
nlink.initialize(&clock, &mut tele).expect("init ok");
let (mut in_links, mut out_links) =
make_edge_links_for_node::<IN, OUT>(NodeIndex::new(0), NodeIndex::new(1));
let mut in_mgrs: [StaticMemoryManager<InP, 16>; IN] =
core::array::from_fn(|_| StaticMemoryManager::new());
let mut out_mgrs: [StaticMemoryManager<OutP, 16>; OUT] =
core::array::from_fn(|_| StaticMemoryManager::new());
if IN == 0 {
return;
}
let mut hdr = MessageHeader::empty();
hdr.set_creation_tick(clock.now_ticks());
let msg = Message::new(hdr, InP::default());
let in_policy = TEST_EDGE_POLICY;
let token = in_mgrs[0].store(msg).expect("store ok");
assert_eq!(
in_links[0].try_push(token, &in_policy, &in_mgrs[0]),
crate::edge::EnqueueResult::Enqueued
);
let mut ctx = build_step_context(
&mut in_links,
&mut out_links,
&mut in_mgrs,
&mut out_mgrs,
&clock,
&mut tele,
);
let res = nlink.step(&mut ctx).expect("step ok");
assert!(res != crate::node::StepResult::NoInput);
if OUT > 0 {
let mut pushed = 0usize;
loop {
match out_links[0].try_pop(&out_mgrs[0]) {
Ok(_token) => pushed += 1,
Err(QueueError::Empty) => break,
Err(e) => panic!("unexpected queue error: {:?}", e),
}
}
assert!(
pushed > 0,
"expected node to push at least one message on output 0"
);
}
let metrics = tele.metrics();
let processed = metrics.nodes()[0].processed();
assert!(
*processed >= 1u64,
"expected processed >= 1, got {}",
processed
);
}
pub fn run_step_on_empty_returns_noinput<N, const IN: usize, const OUT: usize, InP, OutP>(
mut make_nodelink: impl FnMut() -> NodeLink<N, IN, OUT, InP, OutP>,
) where
InP: crate::message::payload::Payload + Default + Clone,
OutP: crate::message::payload::Payload + Default + Clone,
N: crate::node::Node<IN, OUT, InP, OutP>,
{
let mut nlink = make_nodelink();
let clock = NoStdLinuxMonotonicClock::new();
let mut tele = make_graph_telemetry();
let (mut in_links, mut out_links) =
make_edge_links_for_node::<IN, OUT>(NodeIndex::new(0), NodeIndex::new(1));
let mut in_mgrs: [StaticMemoryManager<InP, 16>; IN] =
core::array::from_fn(|_| StaticMemoryManager::new());
let mut out_mgrs: [StaticMemoryManager<OutP, 16>; OUT] =
core::array::from_fn(|_| StaticMemoryManager::new());
let mut ctx = build_step_context(
&mut in_links,
&mut out_links,
&mut in_mgrs,
&mut out_mgrs,
&clock,
&mut tele,
);
let res = nlink.step(&mut ctx).expect("step ok");
if IN == 0 {
assert!(
res == crate::node::StepResult::NoInput || res == crate::node::StepResult::MadeProgress,
"expected NoInput or MadeProgress for zero-input node, got {:?}",
res
);
} else {
assert_eq!(res, crate::node::StepResult::NoInput);
}
}
pub fn run_step_pops_and_calls_process_message<N, const IN: usize, const OUT: usize, InP, OutP>(
mut make_nodelink: impl FnMut() -> NodeLink<N, IN, OUT, InP, OutP>,
) where
InP: crate::message::payload::Payload + Default + Clone,
OutP: crate::message::payload::Payload + Default + Clone,
N: crate::node::Node<IN, OUT, InP, OutP>,
{
let mut nlink = make_nodelink();
let clock = NoStdLinuxMonotonicClock::new();
let mut tele = make_graph_telemetry();
nlink.initialize(&clock, &mut tele).expect("init ok");
let (mut in_links, mut out_links) =
make_edge_links_for_node::<IN, OUT>(NodeIndex::new(0), NodeIndex::new(1));
let mut in_mgrs: [StaticMemoryManager<InP, 16>; IN] =
core::array::from_fn(|_| StaticMemoryManager::new());
let mut out_mgrs: [StaticMemoryManager<OutP, 16>; OUT] =
core::array::from_fn(|_| StaticMemoryManager::new());
if IN == 0 {
return;
}
let mut hdr = MessageHeader::empty();
hdr.set_creation_tick(clock.now_ticks());
let msg = Message::new(hdr, InP::default());
let policy = TEST_EDGE_POLICY;
let token = in_mgrs[0].store(msg).expect("store ok");
assert_eq!(
in_links[0].try_push(token, &policy, &in_mgrs[0]),
crate::edge::EnqueueResult::Enqueued
);
let mut ctx = build_step_context(
&mut in_links,
&mut out_links,
&mut in_mgrs,
&mut out_mgrs,
&clock,
&mut tele,
);
let res = nlink.step(&mut ctx).expect("step ok");
assert!(res != crate::node::StepResult::NoInput);
if OUT > 0 {
let mut popped = 0usize;
while let Ok(_token) = out_links[0].try_pop(&out_mgrs[0]) {
popped += 1;
}
assert!(popped > 0, "expected output items");
}
let metrics = tele.metrics();
assert!(*metrics.nodes()[0].processed() >= 1u64);
}
pub fn run_step_batch_fixed_n_disjoint<N, const IN: usize, const OUT: usize, InP, OutP>(
mut make_nodelink: impl FnMut() -> NodeLink<N, IN, OUT, InP, OutP>,
) where
InP: crate::message::payload::Payload + Default + Clone,
OutP: crate::message::payload::Payload + Default + Clone,
N: crate::node::Node<IN, OUT, InP, OutP>,
{
let mut nlink = make_nodelink();
const TEST_FIXED_N: usize = 3;
let base_policy = nlink.node().policy();
let batching = crate::policy::BatchingPolicy::with_window(
Some(TEST_FIXED_N),
None,
crate::policy::WindowKind::Disjoint,
);
let new_policy = NodePolicy::new(batching, *base_policy.budget(), *base_policy.deadline());
nlink.set_policy(new_policy);
let clock = NoStdLinuxMonotonicClock::new();
let mut tele = make_graph_telemetry();
nlink.initialize(&clock, &mut tele).expect("init ok");
let (mut in_links, mut out_links) =
make_edge_links_for_node::<IN, OUT>(NodeIndex::new(0), NodeIndex::new(1));
let mut in_mgrs: [StaticMemoryManager<InP, 16>; IN] =
core::array::from_fn(|_| StaticMemoryManager::new());
let mut out_mgrs: [StaticMemoryManager<OutP, 16>; OUT] =
core::array::from_fn(|_| StaticMemoryManager::new());
if IN == 0 {
return;
}
let fixed_n = nlink.node().policy().batching().fixed_n().unwrap_or(1usize);
let policy = TEST_EDGE_POLICY;
for t in 1u64..=(fixed_n as u64 + 1) {
let mut hdr = MessageHeader::empty();
hdr.set_creation_tick(Ticks::new(t));
let m = Message::new(hdr, InP::default());
let token = in_mgrs[0].store(m).expect("store ok");
assert_eq!(
in_links[0].try_push(token, &policy, &in_mgrs[0]),
crate::edge::EnqueueResult::Enqueued
);
}
let in_before = *in_links[0].occupancy(&policy).items();
let mut ctx = build_step_context(
&mut in_links,
&mut out_links,
&mut in_mgrs,
&mut out_mgrs,
&clock,
&mut tele,
);
let res = nlink.step(&mut ctx).expect("step_batch ok");
assert!(res != crate::node::StepResult::NoInput);
let in_after = *ctx.in_occupancy(0).items();
assert_eq!(
in_before.saturating_sub(in_after),
fixed_n,
"expected fixed_n items popped from input"
);
if OUT > 0 {
let mut out_count = 0usize;
while let Ok(_token) = out_links[0].try_pop(&out_mgrs[0]) {
out_count += 1;
}
if fixed_n > 0 {
assert_eq!(
out_count, fixed_n,
"expected out_count == fixed_n (got {}, fixed_n={})",
out_count, fixed_n
);
} else {
assert!(out_count >= 1, "expected at least one output");
}
}
let metrics = tele.metrics();
if fixed_n > 1 {
assert_eq!(
*metrics.nodes()[0].processed(),
fixed_n as u64,
"expected processed == fixed_n for batched step"
);
} else {
assert!(*metrics.nodes()[0].processed() >= 1u64);
}
}
pub fn run_step_batch_sliding_window<N, const IN: usize, const OUT: usize, InP, OutP>(
mut make_nodelink: impl FnMut() -> NodeLink<N, IN, OUT, InP, OutP>,
) where
InP: crate::message::payload::Payload + Default + Clone,
OutP: crate::message::payload::Payload + Default + Clone,
N: crate::node::Node<IN, OUT, InP, OutP>,
{
let mut nlink = make_nodelink();
const TEST_FIXED_N: usize = 4;
const TEST_STRIDE: usize = 2;
let base_policy = nlink.node().policy();
let batching = crate::policy::BatchingPolicy::with_window(
Some(TEST_FIXED_N),
None,
crate::policy::WindowKind::Sliding(crate::policy::SlidingWindow::new(TEST_STRIDE)),
);
let new_policy = NodePolicy::new(batching, *base_policy.budget(), *base_policy.deadline());
nlink.set_policy(new_policy);
let clock = NoStdLinuxMonotonicClock::new();
let mut tele = make_graph_telemetry();
nlink.initialize(&clock, &mut tele).expect("init ok");
let (mut in_links, mut out_links) =
make_edge_links_for_node::<IN, OUT>(NodeIndex::new(0), NodeIndex::new(1));
let mut in_mgrs: [StaticMemoryManager<InP, 16>; IN] =
core::array::from_fn(|_| StaticMemoryManager::new());
let mut out_mgrs: [StaticMemoryManager<OutP, 16>; OUT] =
core::array::from_fn(|_| StaticMemoryManager::new());
if IN == 0 {
return;
}
let policy = TEST_EDGE_POLICY;
for t in 1u64..=6u64 {
let mut hdr = MessageHeader::empty();
hdr.set_creation_tick(Ticks::new(t));
let m = Message::new(hdr, InP::default());
let token = in_mgrs[0].store(m).expect("store ok");
assert_eq!(
in_links[0].try_push(token, &policy, &in_mgrs[0]),
crate::edge::EnqueueResult::Enqueued
);
}
let in_before = *in_links[0].occupancy(&policy).items();
let mut ctx = build_step_context(
&mut in_links,
&mut out_links,
&mut in_mgrs,
&mut out_mgrs,
&clock,
&mut tele,
);
let res = nlink.step(&mut ctx).expect("step_batch ok");
assert!(res != crate::node::StepResult::NoInput);
let in_after = *ctx.in_occupancy(0).items();
let stride_to_pop = core::cmp::min(TEST_STRIDE, in_before);
let removed = in_before.saturating_sub(in_after);
assert_eq!(
removed, stride_to_pop,
"unexpected number popped: removed={}, expected stride {}",
removed, stride_to_pop
);
let fixed_n = nlink.node().policy().batching().fixed_n().unwrap_or(1usize);
let expected_present = core::cmp::min(in_before, fixed_n);
if OUT > 0 {
let mut out_count = 0usize;
while let Ok(_token) = out_links[0].try_pop(&out_mgrs[0]) {
out_count += 1;
}
assert_eq!(
out_count, expected_present,
"expected out_count == expected_present (got {}, expected {})",
out_count, expected_present
);
}
let metrics = tele.metrics();
if fixed_n > 1 {
assert_eq!(
*metrics.nodes()[0].processed(),
fixed_n as u64,
"expected processed == fixed_n for batched step"
);
} else {
assert!(*metrics.nodes()[0].processed() >= 1u64);
}
}
pub fn run_step_maps_backpressure_and_errors<N, const IN: usize, const OUT: usize, InP, OutP>(
mut make_nodelink: impl FnMut() -> NodeLink<N, IN, OUT, InP, OutP>,
) where
InP: crate::message::payload::Payload + Default + Clone,
OutP: crate::message::payload::Payload + Default + Clone,
N: crate::node::Node<IN, OUT, InP, OutP>,
{
if IN == 0 || OUT == 0 {
return;
}
let mut nlink = make_nodelink();
let clock = NoStdLinuxMonotonicClock::new();
let mut tele = make_graph_telemetry();
nlink.initialize(&clock, &mut tele).expect("init ok");
let (mut in_links, mut out_links) =
make_edge_links_for_node::<IN, OUT>(NodeIndex::new(0), NodeIndex::new(1));
let mut in_mgrs: [StaticMemoryManager<InP, 16>; IN] =
core::array::from_fn(|_| StaticMemoryManager::new());
let mut out_mgrs: [StaticMemoryManager<OutP, 16>; OUT] =
core::array::from_fn(|_| StaticMemoryManager::new());
let policy = TEST_EDGE_POLICY;
loop {
let dummy_out_msg = Message::new(MessageHeader::empty(), OutP::default());
let token = match out_mgrs[0].store(dummy_out_msg) {
Ok(t) => t,
Err(_) => break, };
match out_links[0].try_push(token, &policy, &out_mgrs[0]) {
crate::edge::EnqueueResult::Enqueued => continue,
crate::edge::EnqueueResult::DroppedNewest | crate::edge::EnqueueResult::Rejected => {
break
}
}
}
let mut hdr = MessageHeader::empty();
hdr.set_creation_tick(clock.now_ticks());
let msg = Message::new(hdr, InP::default());
let token = in_mgrs[0].store(msg).expect("store ok");
assert_eq!(
in_links[0].try_push(token, &policy, &in_mgrs[0]),
crate::edge::EnqueueResult::Enqueued
);
let mut ctx = build_step_context(
&mut in_links,
&mut out_links,
&mut in_mgrs,
&mut out_mgrs,
&clock,
&mut tele,
);
match nlink.step(&mut ctx) {
Ok(res) => {
assert!(res != crate::node::StepResult::NoInput);
}
Err(_e) => {
}
}
}
pub fn run_source_specific_tests<N, const IN: usize, const OUT: usize, InP, OutP>(
mut make_nodelink: impl FnMut() -> NodeLink<N, IN, OUT, InP, OutP>,
) where
InP: crate::message::payload::Payload + Default + Clone,
OutP: crate::message::payload::Payload + Default + Clone,
N: crate::node::Node<IN, OUT, InP, OutP>,
{
let mut nlink = make_nodelink();
let kind = nlink.node().node_kind();
if kind != crate::node::NodeKind::Source {
return;
}
let clock = NoStdLinuxMonotonicClock::new();
let mut tele = make_graph_telemetry();
nlink.initialize(&clock, &mut tele).expect("init ok");
let (mut in_links, mut out_links) =
make_edge_links_for_node::<IN, OUT>(NodeIndex::new(0), NodeIndex::new(1));
let mut in_mgrs: [StaticMemoryManager<InP, 16>; IN] =
core::array::from_fn(|_| StaticMemoryManager::new());
let mut out_mgrs: [StaticMemoryManager<OutP, 16>; OUT] =
core::array::from_fn(|_| StaticMemoryManager::new());
let mut ctx = build_step_context(
&mut in_links,
&mut out_links,
&mut in_mgrs,
&mut out_mgrs,
&clock,
&mut tele,
);
let res = nlink.step(&mut ctx).expect("step ok");
assert!(
res == crate::node::StepResult::NoInput || res == crate::node::StepResult::MadeProgress,
"source.step should return NoInput or MadeProgress"
);
let _ = nlink.step(&mut ctx);
}
pub fn run_sink_specific_tests<N, const IN: usize, const OUT: usize, InP, OutP>(
mut make_nodelink: impl FnMut() -> NodeLink<N, IN, OUT, InP, OutP>,
) where
InP: crate::message::payload::Payload + Default + Clone,
OutP: crate::message::payload::Payload + Default + Clone,
N: crate::node::Node<IN, OUT, InP, OutP>,
{
let mut nlink = make_nodelink();
let kind = nlink.node().node_kind();
if kind != crate::node::NodeKind::Sink {
return;
}
let clock = NoStdLinuxMonotonicClock::new();
let mut tele = make_graph_telemetry();
nlink.initialize(&clock, &mut tele).expect("init ok");
let (mut in_links, mut out_links) =
make_edge_links_for_node::<IN, OUT>(NodeIndex::new(0), NodeIndex::new(1));
let mut in_mgrs: [StaticMemoryManager<InP, 16>; IN] =
core::array::from_fn(|_| StaticMemoryManager::new());
let mut out_mgrs: [StaticMemoryManager<OutP, 16>; OUT] =
core::array::from_fn(|_| StaticMemoryManager::new());
if IN == 0 {
return;
}
let mut hdr = MessageHeader::empty();
hdr.set_creation_tick(clock.now_ticks());
let msg = Message::new(hdr, InP::default());
let policy = TEST_EDGE_POLICY;
let token = in_mgrs[0].store(msg).expect("store ok");
assert_eq!(
in_links[0].try_push(token, &policy, &in_mgrs[0]),
crate::edge::EnqueueResult::Enqueued
);
let mut ctx = build_step_context(
&mut in_links,
&mut out_links,
&mut in_mgrs,
&mut out_mgrs,
&clock,
&mut tele,
);
let res = nlink.step(&mut ctx);
match res {
Ok(r) => {
assert!(
r == crate::node::StepResult::MadeProgress || r == crate::node::StepResult::NoInput,
"sink.step returned unexpected StepResult"
);
}
Err(_e) => {}
}
}
pub fn run_model_batching_tests<N, const IN: usize, const OUT: usize, InP, OutP>(
mut make_nodelink: impl FnMut() -> NodeLink<N, IN, OUT, InP, OutP>,
) where
InP: crate::message::payload::Payload + Default + Clone,
OutP: crate::message::payload::Payload + Default + Clone,
N: crate::node::Node<IN, OUT, InP, OutP>,
{
let mut nlink = make_nodelink();
if nlink.node().node_kind() != crate::node::NodeKind::Model || IN != 1 || OUT != 1 {
return;
}
const TEST_FIXED_N: usize = 4;
let base_policy = nlink.node().policy();
let batching = crate::policy::BatchingPolicy::with_window(
Some(TEST_FIXED_N),
None,
crate::policy::WindowKind::Disjoint,
);
let new_policy = NodePolicy::new(batching, *base_policy.budget(), *base_policy.deadline());
nlink.set_policy(new_policy);
let clock = NoStdLinuxMonotonicClock::new();
let mut tele = make_graph_telemetry();
nlink.initialize(&clock, &mut tele).expect("init ok");
let (mut in_links, mut out_links) =
make_edge_links_for_node::<IN, OUT>(NodeIndex::new(0), NodeIndex::new(1));
let mut in_mgrs: [StaticMemoryManager<InP, 16>; IN] =
core::array::from_fn(|_| StaticMemoryManager::new());
let mut out_mgrs: [StaticMemoryManager<OutP, 16>; OUT] =
core::array::from_fn(|_| StaticMemoryManager::new());
let requested_fixed = nlink.node().policy().batching().fixed_n().unwrap_or(1usize);
let policy = TEST_EDGE_POLICY;
for t in 1u64..=(requested_fixed as u64) {
let mut hdr = MessageHeader::empty();
hdr.set_creation_tick(Ticks::new(t));
let m = Message::new(hdr, InP::default());
let token = in_mgrs[0].store(m).expect("store ok");
assert_eq!(
in_links[0].try_push(token, &policy, &in_mgrs[0]),
crate::edge::EnqueueResult::Enqueued
);
}
let mut ctx = build_step_context(
&mut in_links,
&mut out_links,
&mut in_mgrs,
&mut out_mgrs,
&clock,
&mut tele,
);
let res = nlink.step(&mut ctx).expect("step_batch ok");
assert!(
res != crate::node::StepResult::NoInput,
"model.step_batch returned NoInput"
);
if OUT > 0 {
let mut out_count = 0usize;
while let Ok(_token) = out_links[0].try_pop(&out_mgrs[0]) {
out_count += 1;
}
assert!(
out_count >= 1,
"expected at least one output from model batching"
);
assert!(
out_count <= requested_fixed,
"unexpectedly produced more outputs ({}) than requested_fixed ({})",
out_count,
requested_fixed
);
}
let metrics = tele.metrics();
assert_eq!(
*metrics.nodes()[0].processed(),
requested_fixed as u64,
"expected processed == requested_fixed for a model batched step"
);
}
pub fn run_step_batch_fixed_n_max_delta_t_tests<N, const IN: usize, const OUT: usize, InP, OutP>(
mut make_nodelink: impl FnMut() -> NodeLink<N, IN, OUT, InP, OutP>,
) where
InP: crate::message::payload::Payload + Default + Clone,
OutP: crate::message::payload::Payload + Default + Clone,
N: crate::node::Node<IN, OUT, InP, OutP>,
{
if IN == 0 {
return;
}
let mut nlink = make_nodelink();
const TEST_FIXED_N: usize = 4;
const TEST_MAX_DELTA_TICKS: u64 = 5u64;
let base_policy = nlink.node().policy();
let batching = crate::policy::BatchingPolicy::with_window(
Some(TEST_FIXED_N),
Some(crate::types::Ticks::new(TEST_MAX_DELTA_TICKS)),
crate::policy::WindowKind::Disjoint,
);
let new_policy = NodePolicy::new(batching, *base_policy.budget(), *base_policy.deadline());
nlink.set_policy(new_policy);
let policy_installed = *nlink.node().policy().batching();
let fixed_opt = *policy_installed.fixed_n();
let delta_opt = *policy_installed.max_delta_t();
if fixed_opt.is_none() || delta_opt.is_none() {
return;
}
let fixed_n = fixed_opt.unwrap();
let max_delta = *delta_opt.unwrap().as_u64();
let clock = NoStdLinuxMonotonicClock::new();
let mut tele = make_graph_telemetry();
nlink.initialize(&clock, &mut tele).expect("init ok");
{
let (mut in_links, mut out_links) =
make_edge_links_for_node::<IN, OUT>(NodeIndex::new(0), NodeIndex::new(1));
let mut in_mgrs: [StaticMemoryManager<InP, 16>; IN] =
core::array::from_fn(|_| StaticMemoryManager::new());
let mut out_mgrs: [StaticMemoryManager<OutP, 16>; OUT] =
core::array::from_fn(|_| StaticMemoryManager::new());
let policy = TEST_EDGE_POLICY;
for i in 0..fixed_n {
let tick = i as u64;
let mut hdr = MessageHeader::empty();
hdr.set_creation_tick(Ticks::new(tick));
let m = Message::new(hdr, InP::default());
let token = in_mgrs[0].store(m).expect("store ok");
assert_eq!(
in_links[0].try_push(token, &policy, &in_mgrs[0]),
crate::edge::EnqueueResult::Enqueued
);
}
let metrics_before = tele.metrics();
let processed_before = *metrics_before.nodes()[0].processed();
let mut ctx = build_step_context(
&mut in_links,
&mut out_links,
&mut in_mgrs,
&mut out_mgrs,
&clock,
&mut tele,
);
let res = nlink.step(&mut ctx).expect("step_batch ok (valid span)");
assert!(
res != crate::node::StepResult::NoInput,
"expected batch processed for valid span"
);
if OUT > 0 {
let mut out_count = 0usize;
while let Ok(_token) = out_links[0].try_pop(&out_mgrs[0]) {
out_count += 1;
}
assert_eq!(
out_count, fixed_n,
"expected exactly fixed_n outputs ({}) for valid span, got {}",
fixed_n, out_count
);
}
let metrics_after = tele.metrics();
let processed_after = *metrics_after.nodes()[0].processed();
assert_eq!(
processed_after.saturating_sub(processed_before),
fixed_n as u64,
"expected telemetry processed to increase by fixed_n for valid span"
);
}
{
let (mut in_links, mut out_links) =
make_edge_links_for_node::<IN, OUT>(NodeIndex::new(0), NodeIndex::new(1));
let mut in_mgrs: [StaticMemoryManager<InP, 16>; IN] =
core::array::from_fn(|_| StaticMemoryManager::new());
let mut out_mgrs: [StaticMemoryManager<OutP, 16>; OUT] =
core::array::from_fn(|_| StaticMemoryManager::new());
let policy = TEST_EDGE_POLICY;
for i in 0..fixed_n {
let tick = (i as u64) * (max_delta + 1000u64);
let mut hdr = MessageHeader::empty();
hdr.set_creation_tick(Ticks::new(tick));
let m = Message::new(hdr, InP::default());
let token = in_mgrs[0].store(m).expect("store ok");
assert_eq!(
in_links[0].try_push(token, &policy, &in_mgrs[0]),
crate::edge::EnqueueResult::Enqueued
);
}
let metrics_before_invalid = tele.metrics();
let processed_before_invalid = *metrics_before_invalid.nodes()[0].processed();
let mut ctx = build_step_context(
&mut in_links,
&mut out_links,
&mut in_mgrs,
&mut out_mgrs,
&clock,
&mut tele,
);
let res = nlink.step(&mut ctx).expect("step_batch ok (invalid span)");
if res == crate::node::StepResult::NoInput {
let metrics_after_invalid = tele.metrics();
let processed_after_invalid = *metrics_after_invalid.nodes()[0].processed();
assert_eq!(
processed_after_invalid, processed_before_invalid,
"expected no telemetry change when invalid span results in NoInput"
);
} else {
assert_eq!(
res,
crate::node::StepResult::MadeProgress,
"unexpected StepResult for invalid span: {:?}",
res
);
if OUT > 0 {
let mut out_count = 0usize;
while let Ok(_token) = out_links[0].try_pop(&out_mgrs[0]) {
out_count += 1;
}
assert!(
out_count > 0 && out_count < fixed_n,
"expected partial progress for invalid span (0 < out_count < fixed_n), got {}",
out_count
);
}
}
}
}
pub fn run_push_output_drop_oldest_evicts_oldest_once<
N,
const IN: usize,
const OUT: usize,
InP,
OutP,
>(
mut make_nodelink: impl FnMut() -> NodeLink<N, IN, OUT, InP, OutP>,
) where
InP: crate::message::payload::Payload + Default + Clone,
OutP: crate::message::payload::Payload + Default + Clone,
N: crate::node::Node<IN, OUT, InP, OutP>,
{
if IN == 0 || OUT == 0 {
return;
}
let mut nlink = make_nodelink();
let clock = NoStdLinuxMonotonicClock::new();
let mut tele = make_graph_telemetry();
nlink.initialize(&clock, &mut tele).expect("init ok");
let (mut in_links, mut out_links) =
make_edge_links_for_node::<IN, OUT>(NodeIndex::new(0), NodeIndex::new(1));
let mut in_mgrs: [StaticMemoryManager<InP, 16>; IN] =
core::array::from_fn(|_| StaticMemoryManager::new());
let mut out_mgrs: [StaticMemoryManager<OutP, 16>; OUT] =
core::array::from_fn(|_| StaticMemoryManager::new());
for i in 0u64..3 {
let mut hdr = MessageHeader::empty();
hdr.set_creation_tick(Ticks::new(i + 1));
let tok = out_mgrs[0]
.store(Message::new(hdr, OutP::default()))
.expect("store filler");
assert_eq!(
out_links[0].try_push(tok, &TEST_EDGE_POLICY, &out_mgrs[0]),
crate::edge::EnqueueResult::Enqueued,
);
}
let in_tok = {
let mut hdr = MessageHeader::empty();
hdr.set_creation_tick(clock.now_ticks());
in_mgrs[0]
.store(Message::new(hdr, InP::default()))
.expect("store input")
};
assert_eq!(
in_links[0].try_push(in_tok, &TEST_EDGE_POLICY, &in_mgrs[0]),
crate::edge::EnqueueResult::Enqueued,
);
let mut ctx = build_step_context_with_out_policy(
&mut in_links,
&mut out_links,
&mut in_mgrs,
&mut out_mgrs,
TEST_DROP_OLDEST_POLICY,
&clock,
&mut tele,
);
let res = nlink.step(&mut ctx).expect("step ok");
assert_eq!(res, crate::node::StepResult::MadeProgress);
let occ = out_links[0].occupancy(&TEST_DROP_OLDEST_POLICY);
assert_eq!(
*occ.items(),
3,
"expected 3 items (exactly 1 evicted, 1 pushed); double-eviction gives 2"
);
}
pub fn run_push_output_no_token_leak_on_backpressure<
N,
const IN: usize,
const OUT: usize,
InP,
OutP,
>(
mut make_nodelink: impl FnMut() -> NodeLink<N, IN, OUT, InP, OutP>,
) where
InP: crate::message::payload::Payload + Default + Clone,
OutP: crate::message::payload::Payload + Default + Clone,
N: crate::node::Node<IN, OUT, InP, OutP>,
{
if IN == 0 || OUT == 0 {
return;
}
let mut nlink = make_nodelink();
let clock = NoStdLinuxMonotonicClock::new();
let mut tele = make_graph_telemetry();
nlink.initialize(&clock, &mut tele).expect("init ok");
let tight_drop_newest = EdgePolicy::new(
QueueCaps::new(2, 1, None, None),
AdmissionPolicy::DropNewest,
OverBudgetAction::Drop,
);
let (mut in_links, mut out_links) =
make_edge_links_for_node::<IN, OUT>(NodeIndex::new(0), NodeIndex::new(1));
let mut in_mgrs: [StaticMemoryManager<InP, 16>; IN] =
core::array::from_fn(|_| StaticMemoryManager::new());
let mut out_mgrs: [StaticMemoryManager<OutP, 16>; OUT] =
core::array::from_fn(|_| StaticMemoryManager::new());
{
let mut hdr = MessageHeader::empty();
hdr.set_creation_tick(Ticks::new(1));
let tok = out_mgrs[0]
.store(Message::new(hdr, OutP::default()))
.expect("store filler");
assert_eq!(
out_links[0].try_push(tok, &tight_drop_newest, &out_mgrs[0]),
crate::edge::EnqueueResult::Enqueued,
);
}
let available_before = out_mgrs[0].available();
let in_tok = {
let mut hdr = MessageHeader::empty();
hdr.set_creation_tick(clock.now_ticks());
in_mgrs[0]
.store(Message::new(hdr, InP::default()))
.expect("store input")
};
assert_eq!(
in_links[0].try_push(in_tok, &TEST_EDGE_POLICY, &in_mgrs[0]),
crate::edge::EnqueueResult::Enqueued,
);
let mut ctx = build_step_context_with_out_policy(
&mut in_links,
&mut out_links,
&mut in_mgrs,
&mut out_mgrs,
tight_drop_newest,
&clock,
&mut tele,
);
let res = nlink.step(&mut ctx).expect("step ok");
assert_eq!(res, crate::node::StepResult::Backpressured);
assert_eq!(
out_mgrs[0].available(),
available_before,
"manager slot leaked: push_output must free token on DropNewest backpressure"
);
let occ = out_links[0].occupancy(&tight_drop_newest);
assert_eq!(
*occ.items(),
1,
"queue occupancy must not change on backpressure"
);
}
pub fn run_push_output_evict_until_below_hard_no_double_eviction<
N,
const IN: usize,
const OUT: usize,
InP,
OutP,
>(
mut make_nodelink: impl FnMut() -> NodeLink<N, IN, OUT, InP, OutP>,
) where
InP: crate::message::payload::Payload + Default + Clone,
OutP: crate::message::payload::Payload + Default + Clone,
N: crate::node::Node<IN, OUT, InP, OutP>,
{
if IN == 0 || OUT == 0 {
return;
}
let mut nlink = make_nodelink();
let clock = NoStdLinuxMonotonicClock::new();
let mut tele = make_graph_telemetry();
nlink.initialize(&clock, &mut tele).expect("init ok");
let (mut in_links, mut out_links) =
make_edge_links_for_node::<IN, OUT>(NodeIndex::new(0), NodeIndex::new(1));
let mut in_mgrs: [StaticMemoryManager<InP, 16>; IN] =
core::array::from_fn(|_| StaticMemoryManager::new());
let mut out_mgrs: [StaticMemoryManager<OutP, 16>; OUT] =
core::array::from_fn(|_| StaticMemoryManager::new());
for i in 0u64..4 {
let mut hdr = MessageHeader::empty();
hdr.set_creation_tick(Ticks::new(i + 1));
let tok = out_mgrs[0]
.store(Message::new(hdr, OutP::default()))
.expect("store filler");
assert_eq!(
out_links[0].try_push(tok, &TEST_EDGE_POLICY, &out_mgrs[0]),
crate::edge::EnqueueResult::Enqueued,
);
}
let in_tok = {
let mut hdr = MessageHeader::empty();
hdr.set_creation_tick(clock.now_ticks());
in_mgrs[0]
.store(Message::new(hdr, InP::default()))
.expect("store input")
};
assert_eq!(
in_links[0].try_push(in_tok, &TEST_EDGE_POLICY, &in_mgrs[0]),
crate::edge::EnqueueResult::Enqueued,
);
let mut ctx = build_step_context_with_out_policy(
&mut in_links,
&mut out_links,
&mut in_mgrs,
&mut out_mgrs,
TEST_DROP_OLDEST_POLICY,
&clock,
&mut tele,
);
let res = nlink.step(&mut ctx).expect("step ok");
assert_eq!(res, crate::node::StepResult::MadeProgress);
let occ = out_links[0].occupancy(&TEST_DROP_OLDEST_POLICY);
assert_eq!(
*occ.items(),
4,
"expected 4 items (1 pre-evicted, 1 pushed, net stable); \
double-eviction (old try_push Evict branch) gives 3"
);
}