Skip to main content

limen_core/node/
contract_tests.rs

1//! Node contract test fixtures and helpers.
2//!
3//! This module contains a reusable, *single* contract test-suite that validates
4//! the `Node` trait behaviour expected by Limen runtimes.  The tests exercise:
5//!
6//! - Node lifecycle: `initialize`, `start`, `on_watchdog_timeout`, `stop`.
7//! - Per-message behaviour: `process_message` and the `step()` path that pops a
8//!   single input and delegates to `process_message`.
9//! - Batched behaviour: `step_batch()` semantics for fixed-size, sliding and
10//!   fixed+max_delta_t windowing, including span validation based on
11//!   `MessageHeader::creation_tick`.
12//! - Error mapping: `QueueError` → `StepResult` / `NodeError` rules and how
13//!   enqueue results map to progress/backpressure semantics.
14//! - Telemetry and occupancy hooks exercised via `EdgeLink` + `GraphTelemetry`.
15//!
16//! Usage
17//! -----
18//! Implementors should provide:
19//! - a `NodeLink` factory: `|| -> NodeLink<N, IN, OUT, InP, OutP>` that returns
20//!   a fresh `NodeLink` owning the concrete node under test, and
21//! - ensure payload types implement `Default + Clone` so tests can construct
22//!   `Message::<P>::new(Header, P::default())` values.
23//!
24//! Then call the test macro from your crate tests:
25//
26//! ```text
27//! run_node_contract_tests!(my_node_contracts, { make_nodelink: || create_mynode_link() });
28//! ```
29//!
30//! The suite is *adaptive*: it skips or changes assertions for `IN == 0`
31//! (sources), `OUT == 0` (sinks) or when `node.node_kind()` indicates a
32//! wrapper-specific implementation (`Source`, `Sink`, `Model`). The tests only
33//! assert node-level semantics (counts/StepResult/telemetry) and do **not**
34//! inspect or require node-specific payload contents.
35//!
36//! ---------------------------------------------------------------------------
37//! Planned node/context tests — NOT YET IMPLEMENTED
38//!
39//! C2 (N→M node arity — planned/C2.md):
40//!   When C2 lands (multiple input ports, multiple output ports, optional
41//!   inputs):
42//!   - run_fan_in_two_inputs_both_available: step() with two filled input ports
43//!     triggers the node and both tokens are consumed.
44//!   - run_fan_in_one_input_absent_blocks: with an optional-absent policy,
45//!     step() can still fire on one input.
46//!   - run_fan_out_two_outputs: a single step produces messages on two
47//!     distinct output ports; both queues gain one item.
48//!   - run_fan_out_one_backpressured_maps_to_backpressure: if output port 1
49//!     is at capacity and output port 0 succeeds, the step returns Backpressured.
50//!
51//! R1 (urgency ordering — planned/R1.md):
52//!   - run_step_pop_respects_urgency_order: fill input queue with messages of
53//!     mixed QoSClass; step() must process the highest-urgency item first.
54//!
55//! R2 (freshness / expiry — planned/R2.md):
56//!   - run_step_skips_expired_inputs: push a message whose deadline is already
57//!     elapsed; step() must not pass it to process_message (or must decrement
58//!     a stale counter and skip).
59//!
60//! R4 (mailbox semantics — planned/R4.md):
61//!   - run_step_mailbox_overwrites_previous: rapid pushes to a mailbox-mode
62//!     input leave only the most recent; step() sees only the latest value.
63//!
64//! R5/R6 (liveness policies — planned/R5.md, R6.md):
65//!   - run_step_liveness_check_fires_on_timeout: if no input arrives within
66//!     the configured liveness interval, the node must emit a watchdog event
67//!     or return a liveness-timeout StepResult.
68//!
69//! P1 (PlatformBackend / controllable clock — planned/P1.md):
70//!  - run_batch_delta_t_with_controllable_clock: inject a fake clock that
71//!    advances in discrete steps; verify delta_t batching respects injected
72//!    timestamps rather than wall time.
73//!
74//! RS1 (runtime lifecycle — planned/RS1.md):
75//!   - run_stop_during_inflight_batch: call stop() while a batch is partially
76//!     consumed; verify all tokens are freed and no manager slots leak.
77//! ---------------------------------------------------------------------------
78
79use super::*;
80use crate::{
81    message::MessageHeader,
82    policy::{AdmissionPolicy, OverBudgetAction, QueueCaps},
83    prelude::{
84        fixed_buffer_line_writer, EdgeLink, FixedBuffer, FmtLineWriter, GraphTelemetry,
85        NoStdLinuxMonotonicClock, NodeLink, StaticMemoryManager, TestSpscRingBuf,
86    },
87    types::{EdgeIndex, NodeIndex, PortId, PortIndex, Ticks},
88};
89
90use heapless::Vec;
91
92// Fixed buffer sizes for test telemetry
93const TELE_NODES: usize = 8;
94const TELE_EDGES: usize = 16;
95const TELE_BUF_BYTES: usize = 1024;
96
97const TEST_EDGE_POLICY: EdgePolicy = EdgePolicy::new(
98    QueueCaps::new(16, 14, None, None),
99    AdmissionPolicy::DropNewest,
100    OverBudgetAction::Drop,
101);
102
103/// Edge policy for tests that exercise DropOldest / pre-eviction paths.
104/// caps: max_items=4, soft_items=2 — narrow enough to trigger eviction quickly.
105const TEST_DROP_OLDEST_POLICY: EdgePolicy = EdgePolicy::new(
106    QueueCaps::new(4, 2, None, None),
107    AdmissionPolicy::DropOldest,
108    OverBudgetAction::Drop,
109);
110
111/// Expand a canonical suite of node contract tests for a `NodeLink` factory.
112///
113/// # Purpose
114/// Generate a focused `#[test]` module that runs the standard Node contract
115/// fixtures (lifecycle, single-message, and batched behaviour, error/backpressure
116/// mapping, and wrapper-specific checks for `Source`, `Sink`, and `Model`).
117///
118/// # Required argument
119/// - `make_nodelink: || -> NodeLink<N, IN, OUT, InP, OutP>`
120///   A zero-argument closure that returns a **fresh** `NodeLink` owning the
121///   concrete node under test. The closure is invoked **once per generated test**
122///   (i.e., the factory must return a new, independent `NodeLink` each time).
123///
124/// # Type requirements
125/// - The node produced by your `NodeLink` must implement `Node<IN, OUT, InP, OutP>`.
126/// - Payload types `InP` and `OutP` must implement `Payload + Default + Clone` so
127///   the tests can synthesize `Message::new(header, P::default())` values.
128///
129/// # Behaviour
130/// The macro expands to a `mod $mod_name { ... }` containing these tests:
131/// - `initialize_start_stop_roundtrip`
132/// - `process_message_enqueues_and_made_progress`
133/// - `step_on_empty_returns_noinput`
134/// - `step_pops_and_calls_process_message`
135/// - `step_batch_respects_fixed_n_disjoint`
136/// - `step_batch_respects_sliding_window`
137/// - `step_maps_backpressure_and_errors`
138/// - `source_specific_behaviour`
139/// - `sink_specific_behaviour`
140/// - `model_specific_batching_behaviour`
141/// - `fixed_n_with_max_delta_t_behaviour`
142///
143/// Each generated test delegates to a fixture in `node::contract_tests`. The
144/// suite is *adaptive*: fixtures will skip or adjust assertions for `IN == 0`
145/// (sources), `OUT == 0` (sinks), or when the node reports `NodeKind::Source`,
146/// `NodeKind::Sink`, or `NodeKind::Model`.
147///
148/// # Usage
149/// ```rust
150/// use limen_core::run_node_contract_tests;
151///
152/// run_node_contract_tests!(my_node_contracts, {
153///     make_nodelink: || create_my_node_link()
154/// });
155/// ```
156///
157/// Put the macro invocation in your crate's tests (or `#[cfg(test)]` module).
158/// Keep the `make_nodelink` factory cheap and deterministic so the per-test
159/// instances are reliable.
160///
161/// # Notes
162/// - Tests assert node-level semantics (counts, `StepResult`, telemetry) and do
163///   **not** inspect payload internals. Implementors should ensure payloads
164///   derive/implement `Default + Clone` for compatibility with the suite.
165#[macro_export]
166macro_rules! run_node_contract_tests {
167    ($mod_name:ident, {
168            make_nodelink: $make_nodelink:expr
169        }) => {
170        #[cfg(test)]
171        mod $mod_name {
172            use super::*;
173            use $crate::node::contract_tests as fixtures;
174
175            #[test]
176            fn initialize_start_stop_roundtrip() {
177                fixtures::run_initialize_start_stop_roundtrip(|| $make_nodelink());
178            }
179
180            #[test]
181            fn process_message_enqueues_and_made_progress() {
182                fixtures::run_process_message_enqueues_and_made_progress(|| $make_nodelink());
183            }
184
185            #[test]
186            fn step_on_empty_returns_noinput() {
187                fixtures::run_step_on_empty_returns_noinput(|| $make_nodelink());
188            }
189
190            #[test]
191            fn step_pops_and_calls_process_message() {
192                fixtures::run_step_pops_and_calls_process_message(|| $make_nodelink());
193            }
194
195            #[test]
196            fn step_batch_respects_fixed_n_disjoint() {
197                fixtures::run_step_batch_fixed_n_disjoint(|| $make_nodelink());
198            }
199
200            #[test]
201            fn step_batch_respects_sliding_window() {
202                fixtures::run_step_batch_sliding_window(|| $make_nodelink());
203            }
204
205            #[test]
206            fn step_maps_backpressure_and_errors() {
207                fixtures::run_step_maps_backpressure_and_errors(|| $make_nodelink());
208            }
209
210            #[test]
211            fn source_specific_behaviour() {
212                fixtures::run_source_specific_tests(|| $make_nodelink());
213            }
214
215            #[test]
216            fn sink_specific_behaviour() {
217                fixtures::run_sink_specific_tests(|| $make_nodelink());
218            }
219
220            #[test]
221            fn model_specific_batching_behaviour() {
222                fixtures::run_model_batching_tests(|| $make_nodelink());
223            }
224
225            #[test]
226            fn fixed_n_with_max_delta_t_behaviour() {
227                fixtures::run_step_batch_fixed_n_max_delta_t_tests(|| $make_nodelink());
228            }
229
230            #[test]
231            fn push_output_drop_oldest_evicts_oldest_once() {
232                fixtures::run_push_output_drop_oldest_evicts_oldest_once(|| $make_nodelink());
233            }
234
235            #[test]
236            fn push_output_no_token_leak_on_backpressure() {
237                fixtures::run_push_output_no_token_leak_on_backpressure(|| $make_nodelink());
238            }
239
240            #[test]
241            fn push_output_evict_until_below_hard_no_double_eviction() {
242                fixtures::run_push_output_evict_until_below_hard_no_double_eviction(|| {
243                    $make_nodelink()
244                });
245            }
246        }
247    };
248}
249
250// -----------------------
251// helpers
252// -----------------------
253
254/// Create a small `GraphTelemetry` instance used by contract tests.
255///
256/// The returned telemetry has fixed-size internal buffers suitable for unit
257/// tests and enables node telemetry paths so tests can assert `processed()`
258/// and other node/edge counters. Use this to construct a `StepContext`.
259fn make_graph_telemetry(
260) -> GraphTelemetry<TELE_NODES, TELE_EDGES, FmtLineWriter<FixedBuffer<TELE_BUF_BYTES>>> {
261    GraphTelemetry::new(0u32, true, fixed_buffer_line_writer::<TELE_BUF_BYTES>())
262}
263
264/// Construct input/output `EdgeLink` arrays backed by `TestSpscRingBuf`.
265///
266/// Returns `(inputs, outputs)` arrays of length `IN` and `OUT` respectively.
267/// Each `EdgeLink` uses `TEST_EDGE_POLICY` and deterministic `EdgeIndex` and
268/// `PortId` values so tests are reproducible.
269///
270/// # Type constraints
271/// `InP` and `OutP` must implement `Payload + Default + Clone`.
272///
273/// This helper is the canonical way to produce testable queues that implement
274/// the `Edge` contract and integrate with `StepContext`.
275#[allow(clippy::type_complexity)]
276fn make_edge_links_for_node<const IN: usize, const OUT: usize>(
277    base_upstream_node: NodeIndex,
278    base_downstream_node: NodeIndex,
279) -> (
280    [EdgeLink<TestSpscRingBuf<16>>; IN],
281    [EdgeLink<TestSpscRingBuf<16>>; OUT],
282) {
283    let inputs = core::array::from_fn(|i| {
284        let queue = TestSpscRingBuf::<16>::new();
285        let id = EdgeIndex::new(i + 1);
286        let upstream_port = PortId::new(base_upstream_node, PortIndex::new(i));
287        let downstream_port = PortId::new(base_downstream_node, PortIndex::new(i));
288        EdgeLink::new(
289            queue,
290            id,
291            upstream_port,
292            downstream_port,
293            TEST_EDGE_POLICY,
294            Some("in"),
295        )
296    });
297
298    let outputs = core::array::from_fn(|o| {
299        let queue = TestSpscRingBuf::<16>::new();
300        let id = EdgeIndex::new(o + 1);
301        let upstream_port = PortId::new(base_upstream_node, PortIndex::new(o));
302        let downstream_port = PortId::new(base_downstream_node, PortIndex::new(o));
303        EdgeLink::new(
304            queue,
305            id,
306            upstream_port,
307            downstream_port,
308            TEST_EDGE_POLICY,
309            Some("out"),
310        )
311    });
312
313    (inputs, outputs)
314}
315
316/// Build a `StepContext` from the provided `EdgeLink` arrays, clock and telemetry.
317///
318/// The returned `StepContext` wraps the given input/output `EdgeLink` arrays,
319/// populates per-port `EdgePolicy` arrays with `TEST_EDGE_POLICY`, and provides
320/// `node_id`, `in_edge_ids`, and `out_edge_ids` derived from the `EdgeLink`s.
321///
322/// # Notes
323/// - `inputs` and `outputs` must be arrays of exactly `IN` and `OUT` length.
324/// - `InP` / `OutP` must implement `Default + Clone` so tests can craft messages.
325/// - Use this helper to produce the context passed to `Node::step` /
326///   `Node::step_batch` in fixtures.
327#[allow(clippy::type_complexity)]
328fn build_step_context<
329    'graph,
330    'telemetry,
331    'clock,
332    const IN: usize,
333    const OUT: usize,
334    InP,
335    OutP,
336    C,
337    T,
338>(
339    inputs: &'graph mut [EdgeLink<TestSpscRingBuf<16>>; IN],
340    outputs: &'graph mut [EdgeLink<TestSpscRingBuf<16>>; OUT],
341    in_managers: &'graph mut [StaticMemoryManager<InP, 16>; IN],
342    out_managers: &'graph mut [StaticMemoryManager<OutP, 16>; OUT],
343    clock: &'clock C,
344    telemetry: &'telemetry mut T,
345) -> crate::node::StepContext<
346    'graph,
347    'telemetry,
348    'clock,
349    IN,
350    OUT,
351    InP,
352    OutP,
353    EdgeLink<TestSpscRingBuf<16>>,
354    EdgeLink<TestSpscRingBuf<16>>,
355    StaticMemoryManager<InP, 16>,
356    StaticMemoryManager<OutP, 16>,
357    C,
358    T,
359>
360where
361    InP: crate::message::payload::Payload + Default + Clone,
362    OutP: crate::message::payload::Payload + Default + Clone,
363    C: PlatformClock + Sized,
364    T: Telemetry + Sized,
365{
366    let in_policies = core::array::from_fn(|_| TEST_EDGE_POLICY);
367    let out_policies = core::array::from_fn(|_| TEST_EDGE_POLICY);
368
369    let mut inputs_ref_vec: Vec<&mut EdgeLink<TestSpscRingBuf<16>>, IN> = Vec::new();
370    for elem in inputs.iter_mut() {
371        assert!(inputs_ref_vec.push(elem).is_ok(), "inputs_ref_vec overflow");
372    }
373
374    let mut outputs_ref_vec: Vec<&mut EdgeLink<TestSpscRingBuf<16>>, OUT> = Vec::new();
375    for elem in outputs.iter_mut() {
376        assert!(
377            outputs_ref_vec.push(elem).is_ok(),
378            "outputs_ref_vec overflow"
379        );
380    }
381
382    let mut in_mgrs_ref_vec: Vec<&mut StaticMemoryManager<InP, 16>, IN> = Vec::new();
383    for elem in in_managers.iter_mut() {
384        assert!(
385            in_mgrs_ref_vec.push(elem).is_ok(),
386            "in_mgrs_ref_vec overflow"
387        );
388    }
389
390    let mut out_mgrs_ref_vec: Vec<&mut StaticMemoryManager<OutP, 16>, OUT> = Vec::new();
391    for elem in out_managers.iter_mut() {
392        assert!(
393            out_mgrs_ref_vec.push(elem).is_ok(),
394            "out_mgrs_ref_vec overflow"
395        );
396    }
397
398    let inputs_ref: [&mut EdgeLink<TestSpscRingBuf<16>>; IN] = match inputs_ref_vec.into_array() {
399        Ok(arr) => arr,
400        Err(_) => panic!("inputs_ref_vec length mismatch"),
401    };
402
403    let outputs_ref: [&mut EdgeLink<TestSpscRingBuf<16>>; OUT] = match outputs_ref_vec.into_array()
404    {
405        Ok(arr) => arr,
406        Err(_) => panic!("outputs_ref_vec length mismatch"),
407    };
408
409    let in_mgrs_ref: [&mut StaticMemoryManager<InP, 16>; IN] = match in_mgrs_ref_vec.into_array() {
410        Ok(arr) => arr,
411        Err(_) => panic!("in_mgrs_ref_vec length mismatch"),
412    };
413
414    let out_mgrs_ref: [&mut StaticMemoryManager<OutP, 16>; OUT] =
415        match out_mgrs_ref_vec.into_array() {
416            Ok(arr) => arr,
417            Err(_) => panic!("out_mgrs_ref_vec length mismatch"),
418        };
419
420    let in_edge_ids = core::array::from_fn(|i| *inputs_ref[i].id().as_usize() as u32);
421    let out_edge_ids = core::array::from_fn(|o| *outputs_ref[o].id().as_usize() as u32);
422
423    crate::node::StepContext::new(
424        inputs_ref,
425        outputs_ref,
426        in_mgrs_ref,
427        out_mgrs_ref,
428        in_policies,
429        out_policies,
430        0u32,
431        in_edge_ids,
432        out_edge_ids,
433        clock,
434        telemetry,
435    )
436}
437
438/// Like `build_step_context` but applies `out_policy` to every output port.
439/// Use this when a test needs to exercise eviction or backpressure paths that
440/// differ from the default `TEST_EDGE_POLICY`.
441#[allow(clippy::type_complexity)]
442fn build_step_context_with_out_policy<
443    'graph,
444    'telemetry,
445    'clock,
446    const IN: usize,
447    const OUT: usize,
448    InP,
449    OutP,
450    C,
451    T,
452>(
453    inputs: &'graph mut [EdgeLink<TestSpscRingBuf<16>>; IN],
454    outputs: &'graph mut [EdgeLink<TestSpscRingBuf<16>>; OUT],
455    in_managers: &'graph mut [StaticMemoryManager<InP, 16>; IN],
456    out_managers: &'graph mut [StaticMemoryManager<OutP, 16>; OUT],
457    out_policy: EdgePolicy,
458    clock: &'clock C,
459    telemetry: &'telemetry mut T,
460) -> crate::node::StepContext<
461    'graph,
462    'telemetry,
463    'clock,
464    IN,
465    OUT,
466    InP,
467    OutP,
468    EdgeLink<TestSpscRingBuf<16>>,
469    EdgeLink<TestSpscRingBuf<16>>,
470    StaticMemoryManager<InP, 16>,
471    StaticMemoryManager<OutP, 16>,
472    C,
473    T,
474>
475where
476    InP: crate::message::payload::Payload + Default + Clone,
477    OutP: crate::message::payload::Payload + Default + Clone,
478    C: PlatformClock + Sized,
479    T: Telemetry + Sized,
480{
481    let in_policies = core::array::from_fn(|_| TEST_EDGE_POLICY);
482    let out_policies = core::array::from_fn(|_| out_policy); // <-- only difference
483
484    let mut inputs_ref_vec: Vec<&mut EdgeLink<TestSpscRingBuf<16>>, IN> = Vec::new();
485    for elem in inputs.iter_mut() {
486        assert!(inputs_ref_vec.push(elem).is_ok(), "inputs_ref_vec overflow");
487    }
488    let mut outputs_ref_vec: Vec<&mut EdgeLink<TestSpscRingBuf<16>>, OUT> = Vec::new();
489    for elem in outputs.iter_mut() {
490        assert!(
491            outputs_ref_vec.push(elem).is_ok(),
492            "outputs_ref_vec overflow"
493        );
494    }
495    let mut in_mgrs_ref_vec: Vec<&mut StaticMemoryManager<InP, 16>, IN> = Vec::new();
496    for elem in in_managers.iter_mut() {
497        assert!(
498            in_mgrs_ref_vec.push(elem).is_ok(),
499            "in_mgrs_ref_vec overflow"
500        );
501    }
502    let mut out_mgrs_ref_vec: Vec<&mut StaticMemoryManager<OutP, 16>, OUT> = Vec::new();
503    for elem in out_managers.iter_mut() {
504        assert!(
505            out_mgrs_ref_vec.push(elem).is_ok(),
506            "out_mgrs_ref_vec overflow"
507        );
508    }
509
510    let inputs_ref: [&mut EdgeLink<TestSpscRingBuf<16>>; IN] = match inputs_ref_vec.into_array() {
511        Ok(arr) => arr,
512        Err(_) => panic!("inputs_ref_vec length mismatch"),
513    };
514    let outputs_ref: [&mut EdgeLink<TestSpscRingBuf<16>>; OUT] = match outputs_ref_vec.into_array()
515    {
516        Ok(arr) => arr,
517        Err(_) => panic!("outputs_ref_vec length mismatch"),
518    };
519    let in_mgrs_ref: [&mut StaticMemoryManager<InP, 16>; IN] = match in_mgrs_ref_vec.into_array() {
520        Ok(arr) => arr,
521        Err(_) => panic!("in_mgrs_ref_vec length mismatch"),
522    };
523    let out_mgrs_ref: [&mut StaticMemoryManager<OutP, 16>; OUT] =
524        match out_mgrs_ref_vec.into_array() {
525            Ok(arr) => arr,
526            Err(_) => panic!("out_mgrs_ref_vec length mismatch"),
527        };
528
529    let in_edge_ids = core::array::from_fn(|i| *inputs_ref[i].id().as_usize() as u32);
530    let out_edge_ids = core::array::from_fn(|o| *outputs_ref[o].id().as_usize() as u32);
531
532    crate::node::StepContext::new(
533        inputs_ref,
534        outputs_ref,
535        in_mgrs_ref,
536        out_mgrs_ref,
537        in_policies,
538        out_policies,
539        0u32,
540        in_edge_ids,
541        out_edge_ids,
542        clock,
543        telemetry,
544    )
545}
546
547// -----------------------
548// Fixtures
549// -----------------------
550
551/// Lifecycle: `initialize` → `start` → `on_watchdog_timeout` → `stop`.
552///
553/// Verifies that a fresh `NodeLink` can be initialized and started without
554/// error, that calling `on_watchdog_timeout` returns a valid `StepResult`, and
555/// that `stop` returns `Ok(())`. This test asserts only success paths and is
556/// intended as a basic lifecycle smoke-test.
557pub fn run_initialize_start_stop_roundtrip<N, const IN: usize, const OUT: usize, InP, OutP>(
558    mut make_nodelink: impl FnMut() -> NodeLink<N, IN, OUT, InP, OutP>,
559) where
560    InP: crate::message::payload::Payload + Default + Clone,
561    OutP: crate::message::payload::Payload + Default + Clone,
562    N: crate::node::Node<IN, OUT, InP, OutP>,
563{
564    let mut nlink = make_nodelink();
565    let clock = NoStdLinuxMonotonicClock::new();
566    let mut tele = make_graph_telemetry();
567
568    nlink.initialize(&clock, &mut tele).expect("init ok");
569    nlink.start(&clock, &mut tele).expect("start ok");
570
571    let _ = nlink
572        .on_watchdog_timeout(&clock, &mut tele)
573        .expect("watchdog ok");
574
575    nlink.stop(&clock, &mut tele).expect("stop ok");
576}
577
578/// Single-message `process_message` path and basic egress semantics.
579///
580/// - Pushes one `Message<InP>` (header creation tick sourced from the clock)
581///   into input port 0 (skipped when `IN == 0`).
582/// - Calls `step()` and asserts the result is not `NoInput`.
583/// - If `OUT > 0`, asserts at least one message was enqueued on output 0.
584/// - Asserts the node telemetry `processed()` counter increased.
585///
586/// This fixture tests the canonical single-message processing path without
587/// asserting message payload contents.
588pub fn run_process_message_enqueues_and_made_progress<
589    N,
590    const IN: usize,
591    const OUT: usize,
592    InP,
593    OutP,
594>(
595    mut make_nodelink: impl FnMut() -> NodeLink<N, IN, OUT, InP, OutP>,
596) where
597    InP: crate::message::payload::Payload + Default + Clone,
598    OutP: crate::message::payload::Payload + Default + Clone,
599    N: crate::node::Node<IN, OUT, InP, OutP>,
600{
601    let mut nlink = make_nodelink();
602    let clock = NoStdLinuxMonotonicClock::new();
603    let mut tele = make_graph_telemetry();
604
605    nlink.initialize(&clock, &mut tele).expect("init ok");
606
607    let (mut in_links, mut out_links) =
608        make_edge_links_for_node::<IN, OUT>(NodeIndex::new(0), NodeIndex::new(1));
609    let mut in_mgrs: [StaticMemoryManager<InP, 16>; IN] =
610        core::array::from_fn(|_| StaticMemoryManager::new());
611    let mut out_mgrs: [StaticMemoryManager<OutP, 16>; OUT] =
612        core::array::from_fn(|_| StaticMemoryManager::new());
613
614    if IN == 0 {
615        return;
616    }
617
618    let mut hdr = MessageHeader::empty();
619    hdr.set_creation_tick(clock.now_ticks());
620    let msg = Message::new(hdr, InP::default());
621
622    let in_policy = TEST_EDGE_POLICY;
623    let token = in_mgrs[0].store(msg).expect("store ok");
624    assert_eq!(
625        in_links[0].try_push(token, &in_policy, &in_mgrs[0]),
626        crate::edge::EnqueueResult::Enqueued
627    );
628
629    let mut ctx = build_step_context(
630        &mut in_links,
631        &mut out_links,
632        &mut in_mgrs,
633        &mut out_mgrs,
634        &clock,
635        &mut tele,
636    );
637
638    let res = nlink.step(&mut ctx).expect("step ok");
639    assert!(res != crate::node::StepResult::NoInput);
640
641    if OUT > 0 {
642        let mut pushed = 0usize;
643        loop {
644            match out_links[0].try_pop(&out_mgrs[0]) {
645                Ok(_token) => pushed += 1,
646                Err(QueueError::Empty) => break,
647                Err(e) => panic!("unexpected queue error: {:?}", e),
648            }
649        }
650        assert!(
651            pushed > 0,
652            "expected node to push at least one message on output 0"
653        );
654    }
655
656    let metrics = tele.metrics();
657    let processed = metrics.nodes()[0].processed();
658    assert!(
659        *processed >= 1u64,
660        "expected processed >= 1, got {}",
661        processed
662    );
663}
664
665/// `step()` on empty inputs must return `StepResult::NoInput`.
666///
667/// Builds empty input queues and confirms `step()` returns `NoInput`. This
668/// verifies scheduler-readiness predicates and the node's empty-input fast
669/// path.
670pub fn run_step_on_empty_returns_noinput<N, const IN: usize, const OUT: usize, InP, OutP>(
671    mut make_nodelink: impl FnMut() -> NodeLink<N, IN, OUT, InP, OutP>,
672) where
673    InP: crate::message::payload::Payload + Default + Clone,
674    OutP: crate::message::payload::Payload + Default + Clone,
675    N: crate::node::Node<IN, OUT, InP, OutP>,
676{
677    let mut nlink = make_nodelink();
678    let clock = NoStdLinuxMonotonicClock::new();
679    let mut tele = make_graph_telemetry();
680
681    let (mut in_links, mut out_links) =
682        make_edge_links_for_node::<IN, OUT>(NodeIndex::new(0), NodeIndex::new(1));
683    let mut in_mgrs: [StaticMemoryManager<InP, 16>; IN] =
684        core::array::from_fn(|_| StaticMemoryManager::new());
685    let mut out_mgrs: [StaticMemoryManager<OutP, 16>; OUT] =
686        core::array::from_fn(|_| StaticMemoryManager::new());
687
688    let mut ctx = build_step_context(
689        &mut in_links,
690        &mut out_links,
691        &mut in_mgrs,
692        &mut out_mgrs,
693        &clock,
694        &mut tele,
695    );
696
697    let res = nlink.step(&mut ctx).expect("step ok");
698
699    if IN == 0 {
700        assert!(
701            res == crate::node::StepResult::NoInput || res == crate::node::StepResult::MadeProgress,
702            "expected NoInput or MadeProgress for zero-input node, got {:?}",
703            res
704        );
705    } else {
706        assert_eq!(res, crate::node::StepResult::NoInput);
707    }
708}
709
710/// `step()` pops one message and delegates to `process_message`.
711///
712/// - Pushes a single message into input port 0 (skipped when `IN == 0`).
713/// - Calls `step()` and asserts the node made progress (not `NoInput`).
714/// - If `OUT > 0`, asserts at least one output item was produced.
715/// - Asserts telemetry processed counter incremented.
716///
717/// Ensures the node honors `step()` semantics and emits telemetry as expected.
718pub fn run_step_pops_and_calls_process_message<N, const IN: usize, const OUT: usize, InP, OutP>(
719    mut make_nodelink: impl FnMut() -> NodeLink<N, IN, OUT, InP, OutP>,
720) where
721    InP: crate::message::payload::Payload + Default + Clone,
722    OutP: crate::message::payload::Payload + Default + Clone,
723    N: crate::node::Node<IN, OUT, InP, OutP>,
724{
725    let mut nlink = make_nodelink();
726    let clock = NoStdLinuxMonotonicClock::new();
727    let mut tele = make_graph_telemetry();
728    nlink.initialize(&clock, &mut tele).expect("init ok");
729
730    let (mut in_links, mut out_links) =
731        make_edge_links_for_node::<IN, OUT>(NodeIndex::new(0), NodeIndex::new(1));
732    let mut in_mgrs: [StaticMemoryManager<InP, 16>; IN] =
733        core::array::from_fn(|_| StaticMemoryManager::new());
734    let mut out_mgrs: [StaticMemoryManager<OutP, 16>; OUT] =
735        core::array::from_fn(|_| StaticMemoryManager::new());
736
737    if IN == 0 {
738        return;
739    }
740
741    let mut hdr = MessageHeader::empty();
742    hdr.set_creation_tick(clock.now_ticks());
743    let msg = Message::new(hdr, InP::default());
744
745    let policy = TEST_EDGE_POLICY;
746    let token = in_mgrs[0].store(msg).expect("store ok");
747    assert_eq!(
748        in_links[0].try_push(token, &policy, &in_mgrs[0]),
749        crate::edge::EnqueueResult::Enqueued
750    );
751
752    let mut ctx = build_step_context(
753        &mut in_links,
754        &mut out_links,
755        &mut in_mgrs,
756        &mut out_mgrs,
757        &clock,
758        &mut tele,
759    );
760
761    let res = nlink.step(&mut ctx).expect("step ok");
762    assert!(res != crate::node::StepResult::NoInput);
763
764    if OUT > 0 {
765        let mut popped = 0usize;
766        while let Ok(_token) = out_links[0].try_pop(&out_mgrs[0]) {
767            popped += 1;
768        }
769        assert!(popped > 0, "expected output items");
770    }
771
772    let metrics = tele.metrics();
773    assert!(*metrics.nodes()[0].processed() >= 1u64);
774}
775
776/// `step_batch()` with fixed-N disjoint semantics.
777///
778/// - Pushes `fixed_n + 1` messages into input port 0 (skipped when `IN == 0`).
779/// - Calls `step_batch()` and asserts it returned a progress result.
780/// - If `fixed_n > 0` and `OUT > 0`, asserts the node produced at least
781///   `fixed_n` outputs (i.e., full fixed-size batch processed).
782///
783/// This checks the default fixed-size batch behaviour and that the node
784/// consumes and emits the expected number of items under disjoint semantics.
785pub fn run_step_batch_fixed_n_disjoint<N, const IN: usize, const OUT: usize, InP, OutP>(
786    mut make_nodelink: impl FnMut() -> NodeLink<N, IN, OUT, InP, OutP>,
787) where
788    InP: crate::message::payload::Payload + Default + Clone,
789    OutP: crate::message::payload::Payload + Default + Clone,
790    N: crate::node::Node<IN, OUT, InP, OutP>,
791{
792    let mut nlink = make_nodelink();
793    const TEST_FIXED_N: usize = 3;
794
795    let base_policy = nlink.node().policy();
796    let batching = crate::policy::BatchingPolicy::with_window(
797        Some(TEST_FIXED_N),
798        None,
799        crate::policy::WindowKind::Disjoint,
800    );
801    let new_policy = NodePolicy::new(batching, *base_policy.budget(), *base_policy.deadline());
802    nlink.set_policy(new_policy);
803
804    let clock = NoStdLinuxMonotonicClock::new();
805    let mut tele = make_graph_telemetry();
806    nlink.initialize(&clock, &mut tele).expect("init ok");
807
808    let (mut in_links, mut out_links) =
809        make_edge_links_for_node::<IN, OUT>(NodeIndex::new(0), NodeIndex::new(1));
810    let mut in_mgrs: [StaticMemoryManager<InP, 16>; IN] =
811        core::array::from_fn(|_| StaticMemoryManager::new());
812    let mut out_mgrs: [StaticMemoryManager<OutP, 16>; OUT] =
813        core::array::from_fn(|_| StaticMemoryManager::new());
814
815    if IN == 0 {
816        return;
817    }
818
819    let fixed_n = nlink.node().policy().batching().fixed_n().unwrap_or(1usize);
820
821    let policy = TEST_EDGE_POLICY;
822    for t in 1u64..=(fixed_n as u64 + 1) {
823        let mut hdr = MessageHeader::empty();
824        hdr.set_creation_tick(Ticks::new(t));
825        let m = Message::new(hdr, InP::default());
826        let token = in_mgrs[0].store(m).expect("store ok");
827        assert_eq!(
828            in_links[0].try_push(token, &policy, &in_mgrs[0]),
829            crate::edge::EnqueueResult::Enqueued
830        );
831    }
832
833    let in_before = *in_links[0].occupancy(&policy).items();
834
835    let mut ctx = build_step_context(
836        &mut in_links,
837        &mut out_links,
838        &mut in_mgrs,
839        &mut out_mgrs,
840        &clock,
841        &mut tele,
842    );
843
844    let res = nlink.step(&mut ctx).expect("step_batch ok");
845    assert!(res != crate::node::StepResult::NoInput);
846
847    let in_after = *ctx.in_occupancy(0).items();
848    assert_eq!(
849        in_before.saturating_sub(in_after),
850        fixed_n,
851        "expected fixed_n items popped from input"
852    );
853
854    if OUT > 0 {
855        let mut out_count = 0usize;
856        while let Ok(_token) = out_links[0].try_pop(&out_mgrs[0]) {
857            out_count += 1;
858        }
859        if fixed_n > 0 {
860            assert_eq!(
861                out_count, fixed_n,
862                "expected out_count == fixed_n (got {}, fixed_n={})",
863                out_count, fixed_n
864            );
865        } else {
866            assert!(out_count >= 1, "expected at least one output");
867        }
868    }
869
870    let metrics = tele.metrics();
871    if fixed_n > 1 {
872        assert_eq!(
873            *metrics.nodes()[0].processed(),
874            fixed_n as u64,
875            "expected processed == fixed_n for batched step"
876        );
877    } else {
878        assert!(*metrics.nodes()[0].processed() >= 1u64);
879    }
880}
881
882/// `step_batch()` with sliding-window semantics (smoke test).
883///
884/// - Pushes several messages into input port 0 to exercise sliding-window
885///   batch processing and stride semantics.
886/// - Calls `step_batch()` and asserts the node returned progress.
887/// - Verifies telemetry processed counter incremented.
888///
889/// This is a behavioural smoke-test rather than a precise numerical check of
890/// popped/produced counts because sliding windows and backpressure can affect
891/// exact numbers.
892pub fn run_step_batch_sliding_window<N, const IN: usize, const OUT: usize, InP, OutP>(
893    mut make_nodelink: impl FnMut() -> NodeLink<N, IN, OUT, InP, OutP>,
894) where
895    InP: crate::message::payload::Payload + Default + Clone,
896    OutP: crate::message::payload::Payload + Default + Clone,
897    N: crate::node::Node<IN, OUT, InP, OutP>,
898{
899    let mut nlink = make_nodelink();
900
901    const TEST_FIXED_N: usize = 4;
902    const TEST_STRIDE: usize = 2;
903
904    let base_policy = nlink.node().policy();
905    let batching = crate::policy::BatchingPolicy::with_window(
906        Some(TEST_FIXED_N),
907        None,
908        crate::policy::WindowKind::Sliding(crate::policy::SlidingWindow::new(TEST_STRIDE)),
909    );
910    let new_policy = NodePolicy::new(batching, *base_policy.budget(), *base_policy.deadline());
911    nlink.set_policy(new_policy);
912
913    let clock = NoStdLinuxMonotonicClock::new();
914    let mut tele = make_graph_telemetry();
915    nlink.initialize(&clock, &mut tele).expect("init ok");
916
917    let (mut in_links, mut out_links) =
918        make_edge_links_for_node::<IN, OUT>(NodeIndex::new(0), NodeIndex::new(1));
919    let mut in_mgrs: [StaticMemoryManager<InP, 16>; IN] =
920        core::array::from_fn(|_| StaticMemoryManager::new());
921    let mut out_mgrs: [StaticMemoryManager<OutP, 16>; OUT] =
922        core::array::from_fn(|_| StaticMemoryManager::new());
923
924    if IN == 0 {
925        return;
926    }
927
928    let policy = TEST_EDGE_POLICY;
929    for t in 1u64..=6u64 {
930        let mut hdr = MessageHeader::empty();
931        hdr.set_creation_tick(Ticks::new(t));
932        let m = Message::new(hdr, InP::default());
933        let token = in_mgrs[0].store(m).expect("store ok");
934        assert_eq!(
935            in_links[0].try_push(token, &policy, &in_mgrs[0]),
936            crate::edge::EnqueueResult::Enqueued
937        );
938    }
939
940    let in_before = *in_links[0].occupancy(&policy).items();
941
942    let mut ctx = build_step_context(
943        &mut in_links,
944        &mut out_links,
945        &mut in_mgrs,
946        &mut out_mgrs,
947        &clock,
948        &mut tele,
949    );
950
951    let res = nlink.step(&mut ctx).expect("step_batch ok");
952    assert!(res != crate::node::StepResult::NoInput);
953
954    let in_after = *ctx.in_occupancy(0).items();
955
956    let stride_to_pop = core::cmp::min(TEST_STRIDE, in_before);
957    let removed = in_before.saturating_sub(in_after);
958
959    assert_eq!(
960        removed, stride_to_pop,
961        "unexpected number popped: removed={}, expected stride {}",
962        removed, stride_to_pop
963    );
964
965    let fixed_n = nlink.node().policy().batching().fixed_n().unwrap_or(1usize);
966    let expected_present = core::cmp::min(in_before, fixed_n);
967
968    if OUT > 0 {
969        let mut out_count = 0usize;
970        while let Ok(_token) = out_links[0].try_pop(&out_mgrs[0]) {
971            out_count += 1;
972        }
973        assert_eq!(
974            out_count, expected_present,
975            "expected out_count == expected_present (got {}, expected {})",
976            out_count, expected_present
977        );
978    }
979
980    let metrics = tele.metrics();
981    if fixed_n > 1 {
982        assert_eq!(
983            *metrics.nodes()[0].processed(),
984            fixed_n as u64,
985            "expected processed == fixed_n for batched step"
986        );
987    } else {
988        assert!(*metrics.nodes()[0].processed() >= 1u64);
989    }
990}
991
992/// Mapping of output backpressure and queue errors to node-level results.
993///
994/// - Prefills an output queue to cause admission/backpressure conditions.
995/// - Pushes a single input and calls `step()`.
996/// - Accepts either:
997///     - `Ok(StepResult::Backpressured|MadeProgress|… )`, or
998///     - `Err(NodeError::backpressured())` / `Err(NodeError::execution_failed())`,
999///       depending on whether the implementation surfaces backpressure as a
1000///       `StepResult` or an error.
1001///
1002/// Ensures the node maps queue/enqueue failures into the documented contract
1003/// (progress vs. backpressure vs. execution failure).
1004pub fn run_step_maps_backpressure_and_errors<N, const IN: usize, const OUT: usize, InP, OutP>(
1005    mut make_nodelink: impl FnMut() -> NodeLink<N, IN, OUT, InP, OutP>,
1006) where
1007    InP: crate::message::payload::Payload + Default + Clone,
1008    OutP: crate::message::payload::Payload + Default + Clone,
1009    N: crate::node::Node<IN, OUT, InP, OutP>,
1010{
1011    if IN == 0 || OUT == 0 {
1012        return;
1013    }
1014
1015    let mut nlink = make_nodelink();
1016    let clock = NoStdLinuxMonotonicClock::new();
1017    let mut tele = make_graph_telemetry();
1018    nlink.initialize(&clock, &mut tele).expect("init ok");
1019
1020    let (mut in_links, mut out_links) =
1021        make_edge_links_for_node::<IN, OUT>(NodeIndex::new(0), NodeIndex::new(1));
1022    let mut in_mgrs: [StaticMemoryManager<InP, 16>; IN] =
1023        core::array::from_fn(|_| StaticMemoryManager::new());
1024    let mut out_mgrs: [StaticMemoryManager<OutP, 16>; OUT] =
1025        core::array::from_fn(|_| StaticMemoryManager::new());
1026
1027    // prefill output 0 until it rejects or drops
1028    let policy = TEST_EDGE_POLICY;
1029    loop {
1030        let dummy_out_msg = Message::new(MessageHeader::empty(), OutP::default());
1031        let token = match out_mgrs[0].store(dummy_out_msg) {
1032            Ok(t) => t,
1033            Err(_) => break, // manager full
1034        };
1035        match out_links[0].try_push(token, &policy, &out_mgrs[0]) {
1036            crate::edge::EnqueueResult::Enqueued => continue,
1037            crate::edge::EnqueueResult::DroppedNewest | crate::edge::EnqueueResult::Rejected => {
1038                break
1039            }
1040        }
1041    }
1042
1043    // push a single input so step will attempt to push to the full output
1044    let mut hdr = MessageHeader::empty();
1045    hdr.set_creation_tick(clock.now_ticks());
1046    let msg = Message::new(hdr, InP::default());
1047    let token = in_mgrs[0].store(msg).expect("store ok");
1048    assert_eq!(
1049        in_links[0].try_push(token, &policy, &in_mgrs[0]),
1050        crate::edge::EnqueueResult::Enqueued
1051    );
1052
1053    let mut ctx = build_step_context(
1054        &mut in_links,
1055        &mut out_links,
1056        &mut in_mgrs,
1057        &mut out_mgrs,
1058        &clock,
1059        &mut tele,
1060    );
1061
1062    match nlink.step(&mut ctx) {
1063        Ok(res) => {
1064            assert!(res != crate::node::StepResult::NoInput);
1065        }
1066        Err(_e) => {
1067            // Error is acceptable
1068        }
1069    }
1070}
1071
1072// -----------------------
1073// Source-specific Fixtures
1074// -----------------------
1075
1076/// Source-node specific checks.
1077///
1078/// Applicable only when `node.node_kind() == NodeKind::Source` (and `IN == 0`).
1079/// - Calls `step()` and asserts `NoInput` or `MadeProgress` (sources can
1080///   produce at most one item per `step()`).
1081/// - Calls `step_batch()` to ensure it is callable and does not panic; this
1082///   also exercises ingress occupancy/peek semantics indirectly.
1083///
1084/// Does not assert source payload contents — only node-level readiness and
1085/// non-panicking behaviour.
1086pub fn run_source_specific_tests<N, const IN: usize, const OUT: usize, InP, OutP>(
1087    mut make_nodelink: impl FnMut() -> NodeLink<N, IN, OUT, InP, OutP>,
1088) where
1089    InP: crate::message::payload::Payload + Default + Clone,
1090    OutP: crate::message::payload::Payload + Default + Clone,
1091    N: crate::node::Node<IN, OUT, InP, OutP>,
1092{
1093    let mut nlink = make_nodelink();
1094    let kind = nlink.node().node_kind();
1095    if kind != crate::node::NodeKind::Source {
1096        return;
1097    }
1098
1099    let clock = NoStdLinuxMonotonicClock::new();
1100    let mut tele = make_graph_telemetry();
1101    nlink.initialize(&clock, &mut tele).expect("init ok");
1102
1103    let (mut in_links, mut out_links) =
1104        make_edge_links_for_node::<IN, OUT>(NodeIndex::new(0), NodeIndex::new(1));
1105    let mut in_mgrs: [StaticMemoryManager<InP, 16>; IN] =
1106        core::array::from_fn(|_| StaticMemoryManager::new());
1107    let mut out_mgrs: [StaticMemoryManager<OutP, 16>; OUT] =
1108        core::array::from_fn(|_| StaticMemoryManager::new());
1109
1110    let mut ctx = build_step_context(
1111        &mut in_links,
1112        &mut out_links,
1113        &mut in_mgrs,
1114        &mut out_mgrs,
1115        &clock,
1116        &mut tele,
1117    );
1118
1119    let res = nlink.step(&mut ctx).expect("step ok");
1120    assert!(
1121        res == crate::node::StepResult::NoInput || res == crate::node::StepResult::MadeProgress,
1122        "source.step should return NoInput or MadeProgress"
1123    );
1124
1125    let _ = nlink.step(&mut ctx);
1126}
1127
1128// -----------------------
1129// Sink-specific Fixtures
1130// -----------------------
1131
1132/// Sink-node specific checks.
1133///
1134/// Applicable only when `node.node_kind() == NodeKind::Sink`.
1135/// - Pushes a message into input port 0 and calls `step()`.
1136/// - Asserts the sink either returns `MadeProgress` (consumed) or `NoInput`,
1137///   or returns an execution error to indicate failure of the underlying
1138///   sink implementation.
1139///
1140/// This fixture verifies the adapter `SinkNode` invokes `Sink::consume` and
1141/// maps sink errors to `NodeError::execution_failed()` as appropriate.
1142pub fn run_sink_specific_tests<N, const IN: usize, const OUT: usize, InP, OutP>(
1143    mut make_nodelink: impl FnMut() -> NodeLink<N, IN, OUT, InP, OutP>,
1144) where
1145    InP: crate::message::payload::Payload + Default + Clone,
1146    OutP: crate::message::payload::Payload + Default + Clone,
1147    N: crate::node::Node<IN, OUT, InP, OutP>,
1148{
1149    let mut nlink = make_nodelink();
1150    let kind = nlink.node().node_kind();
1151    if kind != crate::node::NodeKind::Sink {
1152        return;
1153    }
1154
1155    let clock = NoStdLinuxMonotonicClock::new();
1156    let mut tele = make_graph_telemetry();
1157    nlink.initialize(&clock, &mut tele).expect("init ok");
1158
1159    let (mut in_links, mut out_links) =
1160        make_edge_links_for_node::<IN, OUT>(NodeIndex::new(0), NodeIndex::new(1));
1161    let mut in_mgrs: [StaticMemoryManager<InP, 16>; IN] =
1162        core::array::from_fn(|_| StaticMemoryManager::new());
1163    let mut out_mgrs: [StaticMemoryManager<OutP, 16>; OUT] =
1164        core::array::from_fn(|_| StaticMemoryManager::new());
1165
1166    if IN == 0 {
1167        return;
1168    }
1169
1170    let mut hdr = MessageHeader::empty();
1171    hdr.set_creation_tick(clock.now_ticks());
1172    let msg = Message::new(hdr, InP::default());
1173    let policy = TEST_EDGE_POLICY;
1174    let token = in_mgrs[0].store(msg).expect("store ok");
1175    assert_eq!(
1176        in_links[0].try_push(token, &policy, &in_mgrs[0]),
1177        crate::edge::EnqueueResult::Enqueued
1178    );
1179
1180    let mut ctx = build_step_context(
1181        &mut in_links,
1182        &mut out_links,
1183        &mut in_mgrs,
1184        &mut out_mgrs,
1185        &clock,
1186        &mut tele,
1187    );
1188
1189    let res = nlink.step(&mut ctx);
1190    match res {
1191        Ok(r) => {
1192            assert!(
1193                r == crate::node::StepResult::MadeProgress || r == crate::node::StepResult::NoInput,
1194                "sink.step returned unexpected StepResult"
1195            );
1196        }
1197        Err(_e) => {}
1198    }
1199}
1200
1201// -----------------------
1202// Model-specific Fixtures
1203// -----------------------
1204
1205/// Model-node batching smoke-test.
1206///
1207/// Applicable when `node.node_kind() == NodeKind::Model` and the node is `1×1`.
1208/// - Pushes `fixed_n` messages (from the node policy) and calls `step_batch()`.
1209/// - Asserts progress and that at least one output is produced, and not more
1210///   than the requested `fixed_n`.
1211///
1212/// This validates that `InferenceModel`-style nodes honor the node's fixed
1213/// batching hints and produce a reasonable number of outputs. It intentionally
1214/// does not assert payload contents or exact clamping by backend caps (those
1215/// are implementation-specific).
1216pub fn run_model_batching_tests<N, const IN: usize, const OUT: usize, InP, OutP>(
1217    mut make_nodelink: impl FnMut() -> NodeLink<N, IN, OUT, InP, OutP>,
1218) where
1219    InP: crate::message::payload::Payload + Default + Clone,
1220    OutP: crate::message::payload::Payload + Default + Clone,
1221    N: crate::node::Node<IN, OUT, InP, OutP>,
1222{
1223    let mut nlink = make_nodelink();
1224    if nlink.node().node_kind() != crate::node::NodeKind::Model || IN != 1 || OUT != 1 {
1225        return;
1226    }
1227
1228    const TEST_FIXED_N: usize = 4;
1229    let base_policy = nlink.node().policy();
1230    let batching = crate::policy::BatchingPolicy::with_window(
1231        Some(TEST_FIXED_N),
1232        None,
1233        crate::policy::WindowKind::Disjoint,
1234    );
1235    let new_policy = NodePolicy::new(batching, *base_policy.budget(), *base_policy.deadline());
1236    nlink.set_policy(new_policy);
1237
1238    let clock = NoStdLinuxMonotonicClock::new();
1239    let mut tele = make_graph_telemetry();
1240    nlink.initialize(&clock, &mut tele).expect("init ok");
1241
1242    let (mut in_links, mut out_links) =
1243        make_edge_links_for_node::<IN, OUT>(NodeIndex::new(0), NodeIndex::new(1));
1244    let mut in_mgrs: [StaticMemoryManager<InP, 16>; IN] =
1245        core::array::from_fn(|_| StaticMemoryManager::new());
1246    let mut out_mgrs: [StaticMemoryManager<OutP, 16>; OUT] =
1247        core::array::from_fn(|_| StaticMemoryManager::new());
1248
1249    let requested_fixed = nlink.node().policy().batching().fixed_n().unwrap_or(1usize);
1250
1251    let policy = TEST_EDGE_POLICY;
1252    for t in 1u64..=(requested_fixed as u64) {
1253        let mut hdr = MessageHeader::empty();
1254        hdr.set_creation_tick(Ticks::new(t));
1255        let m = Message::new(hdr, InP::default());
1256        let token = in_mgrs[0].store(m).expect("store ok");
1257        assert_eq!(
1258            in_links[0].try_push(token, &policy, &in_mgrs[0]),
1259            crate::edge::EnqueueResult::Enqueued
1260        );
1261    }
1262
1263    let mut ctx = build_step_context(
1264        &mut in_links,
1265        &mut out_links,
1266        &mut in_mgrs,
1267        &mut out_mgrs,
1268        &clock,
1269        &mut tele,
1270    );
1271
1272    let res = nlink.step(&mut ctx).expect("step_batch ok");
1273    assert!(
1274        res != crate::node::StepResult::NoInput,
1275        "model.step_batch returned NoInput"
1276    );
1277
1278    if OUT > 0 {
1279        let mut out_count = 0usize;
1280        while let Ok(_token) = out_links[0].try_pop(&out_mgrs[0]) {
1281            out_count += 1;
1282        }
1283
1284        assert!(
1285            out_count >= 1,
1286            "expected at least one output from model batching"
1287        );
1288
1289        assert!(
1290            out_count <= requested_fixed,
1291            "unexpectedly produced more outputs ({}) than requested_fixed ({})",
1292            out_count,
1293            requested_fixed
1294        );
1295    }
1296
1297    let metrics = tele.metrics();
1298    assert_eq!(
1299        *metrics.nodes()[0].processed(),
1300        requested_fixed as u64,
1301        "expected processed == requested_fixed for a model batched step"
1302    );
1303}
1304
1305// -----------------------
1306// Fixed-N + max_delta_t Fixtures
1307// -----------------------
1308
1309/// Tests for `fixed_n + max_delta_t` span validation.
1310///
1311/// - **Valid span**: pushes `fixed_n` messages whose `creation_tick` values
1312///   lie within `max_delta_t` and asserts `step_batch()` processes a full
1313///   batch (exactly `fixed_n` outputs if `OUT > 0`).
1314/// - **Invalid span**: pushes `fixed_n` messages with creation ticks spaced
1315///   farther apart than `max_delta_t` and asserts `step_batch()` either
1316///   returns `NoInput` (preferred) or makes partial progress (allowed).
1317///
1318/// This fixture verifies the scheduler readiness predicate and peek-based
1319/// span validation used by batched nodes.
1320pub fn run_step_batch_fixed_n_max_delta_t_tests<N, const IN: usize, const OUT: usize, InP, OutP>(
1321    mut make_nodelink: impl FnMut() -> NodeLink<N, IN, OUT, InP, OutP>,
1322) where
1323    InP: crate::message::payload::Payload + Default + Clone,
1324    OutP: crate::message::payload::Payload + Default + Clone,
1325    N: crate::node::Node<IN, OUT, InP, OutP>,
1326{
1327    if IN == 0 {
1328        return;
1329    }
1330
1331    let mut nlink = make_nodelink();
1332
1333    const TEST_FIXED_N: usize = 4;
1334    const TEST_MAX_DELTA_TICKS: u64 = 5u64;
1335
1336    let base_policy = nlink.node().policy();
1337    let batching = crate::policy::BatchingPolicy::with_window(
1338        Some(TEST_FIXED_N),
1339        Some(crate::types::Ticks::new(TEST_MAX_DELTA_TICKS)),
1340        crate::policy::WindowKind::Disjoint,
1341    );
1342    let new_policy = NodePolicy::new(batching, *base_policy.budget(), *base_policy.deadline());
1343    nlink.set_policy(new_policy);
1344
1345    let policy_installed = *nlink.node().policy().batching();
1346    let fixed_opt = *policy_installed.fixed_n();
1347    let delta_opt = *policy_installed.max_delta_t();
1348    if fixed_opt.is_none() || delta_opt.is_none() {
1349        return;
1350    }
1351    let fixed_n = fixed_opt.unwrap();
1352    let max_delta = *delta_opt.unwrap().as_u64();
1353
1354    let clock = NoStdLinuxMonotonicClock::new();
1355    let mut tele = make_graph_telemetry();
1356    nlink.initialize(&clock, &mut tele).expect("init ok");
1357
1358    // 1) VALID SPAN
1359    {
1360        let (mut in_links, mut out_links) =
1361            make_edge_links_for_node::<IN, OUT>(NodeIndex::new(0), NodeIndex::new(1));
1362        let mut in_mgrs: [StaticMemoryManager<InP, 16>; IN] =
1363            core::array::from_fn(|_| StaticMemoryManager::new());
1364        let mut out_mgrs: [StaticMemoryManager<OutP, 16>; OUT] =
1365            core::array::from_fn(|_| StaticMemoryManager::new());
1366
1367        let policy = TEST_EDGE_POLICY;
1368
1369        for i in 0..fixed_n {
1370            let tick = i as u64;
1371            let mut hdr = MessageHeader::empty();
1372            hdr.set_creation_tick(Ticks::new(tick));
1373            let m = Message::new(hdr, InP::default());
1374            let token = in_mgrs[0].store(m).expect("store ok");
1375            assert_eq!(
1376                in_links[0].try_push(token, &policy, &in_mgrs[0]),
1377                crate::edge::EnqueueResult::Enqueued
1378            );
1379        }
1380
1381        let metrics_before = tele.metrics();
1382        let processed_before = *metrics_before.nodes()[0].processed();
1383
1384        let mut ctx = build_step_context(
1385            &mut in_links,
1386            &mut out_links,
1387            &mut in_mgrs,
1388            &mut out_mgrs,
1389            &clock,
1390            &mut tele,
1391        );
1392
1393        let res = nlink.step(&mut ctx).expect("step_batch ok (valid span)");
1394        assert!(
1395            res != crate::node::StepResult::NoInput,
1396            "expected batch processed for valid span"
1397        );
1398
1399        if OUT > 0 {
1400            let mut out_count = 0usize;
1401            while let Ok(_token) = out_links[0].try_pop(&out_mgrs[0]) {
1402                out_count += 1;
1403            }
1404            assert_eq!(
1405                out_count, fixed_n,
1406                "expected exactly fixed_n outputs ({}) for valid span, got {}",
1407                fixed_n, out_count
1408            );
1409        }
1410
1411        let metrics_after = tele.metrics();
1412        let processed_after = *metrics_after.nodes()[0].processed();
1413        assert_eq!(
1414            processed_after.saturating_sub(processed_before),
1415            fixed_n as u64,
1416            "expected telemetry processed to increase by fixed_n for valid span"
1417        );
1418    }
1419
1420    // 2) INVALID SPAN
1421    {
1422        let (mut in_links, mut out_links) =
1423            make_edge_links_for_node::<IN, OUT>(NodeIndex::new(0), NodeIndex::new(1));
1424        let mut in_mgrs: [StaticMemoryManager<InP, 16>; IN] =
1425            core::array::from_fn(|_| StaticMemoryManager::new());
1426        let mut out_mgrs: [StaticMemoryManager<OutP, 16>; OUT] =
1427            core::array::from_fn(|_| StaticMemoryManager::new());
1428
1429        let policy = TEST_EDGE_POLICY;
1430        for i in 0..fixed_n {
1431            let tick = (i as u64) * (max_delta + 1000u64);
1432            let mut hdr = MessageHeader::empty();
1433            hdr.set_creation_tick(Ticks::new(tick));
1434            let m = Message::new(hdr, InP::default());
1435            let token = in_mgrs[0].store(m).expect("store ok");
1436            assert_eq!(
1437                in_links[0].try_push(token, &policy, &in_mgrs[0]),
1438                crate::edge::EnqueueResult::Enqueued
1439            );
1440        }
1441
1442        let metrics_before_invalid = tele.metrics();
1443        let processed_before_invalid = *metrics_before_invalid.nodes()[0].processed();
1444
1445        let mut ctx = build_step_context(
1446            &mut in_links,
1447            &mut out_links,
1448            &mut in_mgrs,
1449            &mut out_mgrs,
1450            &clock,
1451            &mut tele,
1452        );
1453
1454        let res = nlink.step(&mut ctx).expect("step_batch ok (invalid span)");
1455
1456        if res == crate::node::StepResult::NoInput {
1457            let metrics_after_invalid = tele.metrics();
1458            let processed_after_invalid = *metrics_after_invalid.nodes()[0].processed();
1459            assert_eq!(
1460                processed_after_invalid, processed_before_invalid,
1461                "expected no telemetry change when invalid span results in NoInput"
1462            );
1463        } else {
1464            assert_eq!(
1465                res,
1466                crate::node::StepResult::MadeProgress,
1467                "unexpected StepResult for invalid span: {:?}",
1468                res
1469            );
1470
1471            if OUT > 0 {
1472                let mut out_count = 0usize;
1473                while let Ok(_token) = out_links[0].try_pop(&out_mgrs[0]) {
1474                    out_count += 1;
1475                }
1476                assert!(
1477                    out_count > 0 && out_count < fixed_n,
1478                    "expected partial progress for invalid span (0 < out_count < fixed_n), got {}",
1479                    out_count
1480                );
1481            }
1482        }
1483    }
1484}
1485
1486/// Regression: push_output with DropOldest must evict exactly once when the output
1487/// queue is between soft and hard cap (Evict(1) decision), not twice.
1488///
1489/// Pre-fills output to 3 items (soft=2, hard=4 → BetweenSoftAndHard → Evict(1)).
1490/// push_output pre-evicts 1 (queue: 3→2), then calls try_push.
1491/// OLD try_push: called get_admission_decision again, saw 2 items ≥ soft=2, evicted
1492/// a second time (2→1), then pushed → 2 items. Double-eviction.
1493/// NEW try_push: Evict branch does not pop. Checks at_or_above_hard(2,_) = false,
1494/// enqueues → 3 items. Correct.
1495pub fn run_push_output_drop_oldest_evicts_oldest_once<
1496    N,
1497    const IN: usize,
1498    const OUT: usize,
1499    InP,
1500    OutP,
1501>(
1502    mut make_nodelink: impl FnMut() -> NodeLink<N, IN, OUT, InP, OutP>,
1503) where
1504    InP: crate::message::payload::Payload + Default + Clone,
1505    OutP: crate::message::payload::Payload + Default + Clone,
1506    N: crate::node::Node<IN, OUT, InP, OutP>,
1507{
1508    if IN == 0 || OUT == 0 {
1509        return;
1510    }
1511
1512    let mut nlink = make_nodelink();
1513    let clock = NoStdLinuxMonotonicClock::new();
1514    let mut tele = make_graph_telemetry();
1515    nlink.initialize(&clock, &mut tele).expect("init ok");
1516
1517    let (mut in_links, mut out_links) =
1518        make_edge_links_for_node::<IN, OUT>(NodeIndex::new(0), NodeIndex::new(1));
1519    let mut in_mgrs: [StaticMemoryManager<InP, 16>; IN] =
1520        core::array::from_fn(|_| StaticMemoryManager::new());
1521    let mut out_mgrs: [StaticMemoryManager<OutP, 16>; OUT] =
1522        core::array::from_fn(|_| StaticMemoryManager::new());
1523
1524    // Pre-fill output port 0 with 3 items (between soft=2 and hard=4 for
1525    // TEST_DROP_OLDEST_POLICY). Use TEST_EDGE_POLICY for the push — its caps
1526    // (max=16, soft=14) admit all items unconditionally at this fill level.
1527    for i in 0u64..3 {
1528        let mut hdr = MessageHeader::empty();
1529        hdr.set_creation_tick(Ticks::new(i + 1));
1530        let tok = out_mgrs[0]
1531            .store(Message::new(hdr, OutP::default()))
1532            .expect("store filler");
1533        assert_eq!(
1534            out_links[0].try_push(tok, &TEST_EDGE_POLICY, &out_mgrs[0]),
1535            crate::edge::EnqueueResult::Enqueued,
1536        );
1537    }
1538
1539    // Push one input so step() fires process_message → Output → push_output.
1540    let in_tok = {
1541        let mut hdr = MessageHeader::empty();
1542        hdr.set_creation_tick(clock.now_ticks());
1543        in_mgrs[0]
1544            .store(Message::new(hdr, InP::default()))
1545            .expect("store input")
1546    };
1547    assert_eq!(
1548        in_links[0].try_push(in_tok, &TEST_EDGE_POLICY, &in_mgrs[0]),
1549        crate::edge::EnqueueResult::Enqueued,
1550    );
1551
1552    // Output policy is TEST_DROP_OLDEST_POLICY (max=4, soft=2, DropOldest).
1553    // With 3 items pre-filled, push_output sees Evict(1).
1554    let mut ctx = build_step_context_with_out_policy(
1555        &mut in_links,
1556        &mut out_links,
1557        &mut in_mgrs,
1558        &mut out_mgrs,
1559        TEST_DROP_OLDEST_POLICY,
1560        &clock,
1561        &mut tele,
1562    );
1563
1564    let res = nlink.step(&mut ctx).expect("step ok");
1565    assert_eq!(res, crate::node::StepResult::MadeProgress);
1566
1567    // 3 pre-filled − 1 evicted + 1 pushed = 3. Double-eviction gives 2.
1568    let occ = out_links[0].occupancy(&TEST_DROP_OLDEST_POLICY);
1569    assert_eq!(
1570        *occ.items(),
1571        3,
1572        "expected 3 items (exactly 1 evicted, 1 pushed); double-eviction gives 2"
1573    );
1574}
1575
1576/// push_output must free the manager slot when output is backpressured (DropNewest).
1577/// A leaked token would exhaust manager capacity and eventually panic on store.
1578/// Uses MemoryManager::available() before and after to verify the invariant.
1579pub fn run_push_output_no_token_leak_on_backpressure<
1580    N,
1581    const IN: usize,
1582    const OUT: usize,
1583    InP,
1584    OutP,
1585>(
1586    mut make_nodelink: impl FnMut() -> NodeLink<N, IN, OUT, InP, OutP>,
1587) where
1588    InP: crate::message::payload::Payload + Default + Clone,
1589    OutP: crate::message::payload::Payload + Default + Clone,
1590    N: crate::node::Node<IN, OUT, InP, OutP>,
1591{
1592    if IN == 0 || OUT == 0 {
1593        return;
1594    }
1595
1596    let mut nlink = make_nodelink();
1597    let clock = NoStdLinuxMonotonicClock::new();
1598    let mut tele = make_graph_telemetry();
1599    nlink.initialize(&clock, &mut tele).expect("init ok");
1600
1601    // Tight DropNewest: max=2, soft=1. One item in queue puts it at soft,
1602    // so the next push triggers DropNewest immediately.
1603    let tight_drop_newest = EdgePolicy::new(
1604        QueueCaps::new(2, 1, None, None),
1605        AdmissionPolicy::DropNewest,
1606        OverBudgetAction::Drop,
1607    );
1608
1609    let (mut in_links, mut out_links) =
1610        make_edge_links_for_node::<IN, OUT>(NodeIndex::new(0), NodeIndex::new(1));
1611    let mut in_mgrs: [StaticMemoryManager<InP, 16>; IN] =
1612        core::array::from_fn(|_| StaticMemoryManager::new());
1613    let mut out_mgrs: [StaticMemoryManager<OutP, 16>; OUT] =
1614        core::array::from_fn(|_| StaticMemoryManager::new());
1615
1616    // Pre-fill output port 0 with 1 item, using tight_drop_newest to confirm
1617    // the first push is admitted (queue is empty → below soft).
1618    {
1619        let mut hdr = MessageHeader::empty();
1620        hdr.set_creation_tick(Ticks::new(1));
1621        let tok = out_mgrs[0]
1622            .store(Message::new(hdr, OutP::default()))
1623            .expect("store filler");
1624        assert_eq!(
1625            out_links[0].try_push(tok, &tight_drop_newest, &out_mgrs[0]),
1626            crate::edge::EnqueueResult::Enqueued,
1627        );
1628    }
1629
1630    // Record manager availability after pre-fill. After the backpressured step
1631    // this must be identical: push_output stores 1 token then frees it on DropNewest.
1632    let available_before = out_mgrs[0].available();
1633
1634    // Push one input so step() fires.
1635    let in_tok = {
1636        let mut hdr = MessageHeader::empty();
1637        hdr.set_creation_tick(clock.now_ticks());
1638        in_mgrs[0]
1639            .store(Message::new(hdr, InP::default()))
1640            .expect("store input")
1641    };
1642    assert_eq!(
1643        in_links[0].try_push(in_tok, &TEST_EDGE_POLICY, &in_mgrs[0]),
1644        crate::edge::EnqueueResult::Enqueued,
1645    );
1646
1647    let mut ctx = build_step_context_with_out_policy(
1648        &mut in_links,
1649        &mut out_links,
1650        &mut in_mgrs,
1651        &mut out_mgrs,
1652        tight_drop_newest,
1653        &clock,
1654        &mut tele,
1655    );
1656
1657    // step() → push_output: stores new token (1 slot used), hits DropNewest
1658    // (queue already at soft=1), must free the stored token, return Backpressured.
1659    let res = nlink.step(&mut ctx).expect("step ok");
1660    assert_eq!(res, crate::node::StepResult::Backpressured);
1661
1662    // Slot must have been freed: available is unchanged.
1663    assert_eq!(
1664        out_mgrs[0].available(),
1665        available_before,
1666        "manager slot leaked: push_output must free token on DropNewest backpressure"
1667    );
1668
1669    // Queue occupancy is also unchanged (the dropped item was never enqueued).
1670    let occ = out_links[0].occupancy(&tight_drop_newest);
1671    assert_eq!(
1672        *occ.items(),
1673        1,
1674        "queue occupancy must not change on backpressure"
1675    );
1676}
1677
1678/// Regression: push_output with DropOldest at hard cap (EvictUntilBelowHard) must
1679/// not double-evict when the caller's pre-eviction leaves the queue above soft.
1680///
1681/// Pre-fills output to 4 items (max=4 for TEST_DROP_OLDEST_POLICY → AtOrAboveHard
1682/// → EvictUntilBelowHard). push_output drains until below hard (4→3), then calls
1683/// try_push. 3 items > soft=2 → try_push decision is Evict(1).
1684/// OLD try_push: popped again (3→2), then pushed → 3 items. Double-eviction.
1685/// NEW try_push: Evict branch checks at_or_above_hard(3,_) = false, enqueues → 4.
1686pub fn run_push_output_evict_until_below_hard_no_double_eviction<
1687    N,
1688    const IN: usize,
1689    const OUT: usize,
1690    InP,
1691    OutP,
1692>(
1693    mut make_nodelink: impl FnMut() -> NodeLink<N, IN, OUT, InP, OutP>,
1694) where
1695    InP: crate::message::payload::Payload + Default + Clone,
1696    OutP: crate::message::payload::Payload + Default + Clone,
1697    N: crate::node::Node<IN, OUT, InP, OutP>,
1698{
1699    if IN == 0 || OUT == 0 {
1700        return;
1701    }
1702
1703    let mut nlink = make_nodelink();
1704    let clock = NoStdLinuxMonotonicClock::new();
1705    let mut tele = make_graph_telemetry();
1706    nlink.initialize(&clock, &mut tele).expect("init ok");
1707
1708    let (mut in_links, mut out_links) =
1709        make_edge_links_for_node::<IN, OUT>(NodeIndex::new(0), NodeIndex::new(1));
1710    let mut in_mgrs: [StaticMemoryManager<InP, 16>; IN] =
1711        core::array::from_fn(|_| StaticMemoryManager::new());
1712    let mut out_mgrs: [StaticMemoryManager<OutP, 16>; OUT] =
1713        core::array::from_fn(|_| StaticMemoryManager::new());
1714
1715    // Pre-fill output port 0 to hard cap (4 items = max for TEST_DROP_OLDEST_POLICY).
1716    // Use TEST_EDGE_POLICY for the push (large caps → always admits at this level).
1717    for i in 0u64..4 {
1718        let mut hdr = MessageHeader::empty();
1719        hdr.set_creation_tick(Ticks::new(i + 1));
1720        let tok = out_mgrs[0]
1721            .store(Message::new(hdr, OutP::default()))
1722            .expect("store filler");
1723        assert_eq!(
1724            out_links[0].try_push(tok, &TEST_EDGE_POLICY, &out_mgrs[0]),
1725            crate::edge::EnqueueResult::Enqueued,
1726        );
1727    }
1728
1729    // Push one input so step() fires.
1730    let in_tok = {
1731        let mut hdr = MessageHeader::empty();
1732        hdr.set_creation_tick(clock.now_ticks());
1733        in_mgrs[0]
1734            .store(Message::new(hdr, InP::default()))
1735            .expect("store input")
1736    };
1737    assert_eq!(
1738        in_links[0].try_push(in_tok, &TEST_EDGE_POLICY, &in_mgrs[0]),
1739        crate::edge::EnqueueResult::Enqueued,
1740    );
1741
1742    // Output policy is TEST_DROP_OLDEST_POLICY (max=4, soft=2, DropOldest).
1743    // With 4 items pre-filled, push_output sees EvictUntilBelowHard.
1744    let mut ctx = build_step_context_with_out_policy(
1745        &mut in_links,
1746        &mut out_links,
1747        &mut in_mgrs,
1748        &mut out_mgrs,
1749        TEST_DROP_OLDEST_POLICY,
1750        &clock,
1751        &mut tele,
1752    );
1753
1754    let res = nlink.step(&mut ctx).expect("step ok");
1755    assert_eq!(res, crate::node::StepResult::MadeProgress);
1756
1757    // 4 pre-filled − 1 evicted (EvictUntilBelowHard stops at 3 < hard=4) + 1 pushed = 4.
1758    // Double-eviction (old try_push): evicts 1 more on Evict(1) branch → 3.
1759    let occ = out_links[0].occupancy(&TEST_DROP_OLDEST_POLICY);
1760    assert_eq!(
1761        *occ.items(),
1762        4,
1763        "expected 4 items (1 pre-evicted, 1 pushed, net stable); \
1764           double-eviction (old try_push Evict branch) gives 3"
1765    );
1766}