1use super::*;
80use crate::{
81 message::MessageHeader,
82 policy::{AdmissionPolicy, OverBudgetAction, QueueCaps},
83 prelude::{
84 fixed_buffer_line_writer, EdgeLink, FixedBuffer, FmtLineWriter, GraphTelemetry,
85 NoStdLinuxMonotonicClock, NodeLink, StaticMemoryManager, TestSpscRingBuf,
86 },
87 types::{EdgeIndex, NodeIndex, PortId, PortIndex, Ticks},
88};
89
90use heapless::Vec;
91
92const TELE_NODES: usize = 8;
94const TELE_EDGES: usize = 16;
95const TELE_BUF_BYTES: usize = 1024;
96
97const TEST_EDGE_POLICY: EdgePolicy = EdgePolicy::new(
98 QueueCaps::new(16, 14, None, None),
99 AdmissionPolicy::DropNewest,
100 OverBudgetAction::Drop,
101);
102
103const TEST_DROP_OLDEST_POLICY: EdgePolicy = EdgePolicy::new(
106 QueueCaps::new(4, 2, None, None),
107 AdmissionPolicy::DropOldest,
108 OverBudgetAction::Drop,
109);
110
111#[macro_export]
166macro_rules! run_node_contract_tests {
167 ($mod_name:ident, {
168 make_nodelink: $make_nodelink:expr
169 }) => {
170 #[cfg(test)]
171 mod $mod_name {
172 use super::*;
173 use $crate::node::contract_tests as fixtures;
174
175 #[test]
176 fn initialize_start_stop_roundtrip() {
177 fixtures::run_initialize_start_stop_roundtrip(|| $make_nodelink());
178 }
179
180 #[test]
181 fn process_message_enqueues_and_made_progress() {
182 fixtures::run_process_message_enqueues_and_made_progress(|| $make_nodelink());
183 }
184
185 #[test]
186 fn step_on_empty_returns_noinput() {
187 fixtures::run_step_on_empty_returns_noinput(|| $make_nodelink());
188 }
189
190 #[test]
191 fn step_pops_and_calls_process_message() {
192 fixtures::run_step_pops_and_calls_process_message(|| $make_nodelink());
193 }
194
195 #[test]
196 fn step_batch_respects_fixed_n_disjoint() {
197 fixtures::run_step_batch_fixed_n_disjoint(|| $make_nodelink());
198 }
199
200 #[test]
201 fn step_batch_respects_sliding_window() {
202 fixtures::run_step_batch_sliding_window(|| $make_nodelink());
203 }
204
205 #[test]
206 fn step_maps_backpressure_and_errors() {
207 fixtures::run_step_maps_backpressure_and_errors(|| $make_nodelink());
208 }
209
210 #[test]
211 fn source_specific_behaviour() {
212 fixtures::run_source_specific_tests(|| $make_nodelink());
213 }
214
215 #[test]
216 fn sink_specific_behaviour() {
217 fixtures::run_sink_specific_tests(|| $make_nodelink());
218 }
219
220 #[test]
221 fn model_specific_batching_behaviour() {
222 fixtures::run_model_batching_tests(|| $make_nodelink());
223 }
224
225 #[test]
226 fn fixed_n_with_max_delta_t_behaviour() {
227 fixtures::run_step_batch_fixed_n_max_delta_t_tests(|| $make_nodelink());
228 }
229
230 #[test]
231 fn push_output_drop_oldest_evicts_oldest_once() {
232 fixtures::run_push_output_drop_oldest_evicts_oldest_once(|| $make_nodelink());
233 }
234
235 #[test]
236 fn push_output_no_token_leak_on_backpressure() {
237 fixtures::run_push_output_no_token_leak_on_backpressure(|| $make_nodelink());
238 }
239
240 #[test]
241 fn push_output_evict_until_below_hard_no_double_eviction() {
242 fixtures::run_push_output_evict_until_below_hard_no_double_eviction(|| {
243 $make_nodelink()
244 });
245 }
246 }
247 };
248}
249
250fn make_graph_telemetry(
260) -> GraphTelemetry<TELE_NODES, TELE_EDGES, FmtLineWriter<FixedBuffer<TELE_BUF_BYTES>>> {
261 GraphTelemetry::new(0u32, true, fixed_buffer_line_writer::<TELE_BUF_BYTES>())
262}
263
264#[allow(clippy::type_complexity)]
276fn make_edge_links_for_node<const IN: usize, const OUT: usize>(
277 base_upstream_node: NodeIndex,
278 base_downstream_node: NodeIndex,
279) -> (
280 [EdgeLink<TestSpscRingBuf<16>>; IN],
281 [EdgeLink<TestSpscRingBuf<16>>; OUT],
282) {
283 let inputs = core::array::from_fn(|i| {
284 let queue = TestSpscRingBuf::<16>::new();
285 let id = EdgeIndex::new(i + 1);
286 let upstream_port = PortId::new(base_upstream_node, PortIndex::new(i));
287 let downstream_port = PortId::new(base_downstream_node, PortIndex::new(i));
288 EdgeLink::new(
289 queue,
290 id,
291 upstream_port,
292 downstream_port,
293 TEST_EDGE_POLICY,
294 Some("in"),
295 )
296 });
297
298 let outputs = core::array::from_fn(|o| {
299 let queue = TestSpscRingBuf::<16>::new();
300 let id = EdgeIndex::new(o + 1);
301 let upstream_port = PortId::new(base_upstream_node, PortIndex::new(o));
302 let downstream_port = PortId::new(base_downstream_node, PortIndex::new(o));
303 EdgeLink::new(
304 queue,
305 id,
306 upstream_port,
307 downstream_port,
308 TEST_EDGE_POLICY,
309 Some("out"),
310 )
311 });
312
313 (inputs, outputs)
314}
315
316#[allow(clippy::type_complexity)]
328fn build_step_context<
329 'graph,
330 'telemetry,
331 'clock,
332 const IN: usize,
333 const OUT: usize,
334 InP,
335 OutP,
336 C,
337 T,
338>(
339 inputs: &'graph mut [EdgeLink<TestSpscRingBuf<16>>; IN],
340 outputs: &'graph mut [EdgeLink<TestSpscRingBuf<16>>; OUT],
341 in_managers: &'graph mut [StaticMemoryManager<InP, 16>; IN],
342 out_managers: &'graph mut [StaticMemoryManager<OutP, 16>; OUT],
343 clock: &'clock C,
344 telemetry: &'telemetry mut T,
345) -> crate::node::StepContext<
346 'graph,
347 'telemetry,
348 'clock,
349 IN,
350 OUT,
351 InP,
352 OutP,
353 EdgeLink<TestSpscRingBuf<16>>,
354 EdgeLink<TestSpscRingBuf<16>>,
355 StaticMemoryManager<InP, 16>,
356 StaticMemoryManager<OutP, 16>,
357 C,
358 T,
359>
360where
361 InP: crate::message::payload::Payload + Default + Clone,
362 OutP: crate::message::payload::Payload + Default + Clone,
363 C: PlatformClock + Sized,
364 T: Telemetry + Sized,
365{
366 let in_policies = core::array::from_fn(|_| TEST_EDGE_POLICY);
367 let out_policies = core::array::from_fn(|_| TEST_EDGE_POLICY);
368
369 let mut inputs_ref_vec: Vec<&mut EdgeLink<TestSpscRingBuf<16>>, IN> = Vec::new();
370 for elem in inputs.iter_mut() {
371 assert!(inputs_ref_vec.push(elem).is_ok(), "inputs_ref_vec overflow");
372 }
373
374 let mut outputs_ref_vec: Vec<&mut EdgeLink<TestSpscRingBuf<16>>, OUT> = Vec::new();
375 for elem in outputs.iter_mut() {
376 assert!(
377 outputs_ref_vec.push(elem).is_ok(),
378 "outputs_ref_vec overflow"
379 );
380 }
381
382 let mut in_mgrs_ref_vec: Vec<&mut StaticMemoryManager<InP, 16>, IN> = Vec::new();
383 for elem in in_managers.iter_mut() {
384 assert!(
385 in_mgrs_ref_vec.push(elem).is_ok(),
386 "in_mgrs_ref_vec overflow"
387 );
388 }
389
390 let mut out_mgrs_ref_vec: Vec<&mut StaticMemoryManager<OutP, 16>, OUT> = Vec::new();
391 for elem in out_managers.iter_mut() {
392 assert!(
393 out_mgrs_ref_vec.push(elem).is_ok(),
394 "out_mgrs_ref_vec overflow"
395 );
396 }
397
398 let inputs_ref: [&mut EdgeLink<TestSpscRingBuf<16>>; IN] = match inputs_ref_vec.into_array() {
399 Ok(arr) => arr,
400 Err(_) => panic!("inputs_ref_vec length mismatch"),
401 };
402
403 let outputs_ref: [&mut EdgeLink<TestSpscRingBuf<16>>; OUT] = match outputs_ref_vec.into_array()
404 {
405 Ok(arr) => arr,
406 Err(_) => panic!("outputs_ref_vec length mismatch"),
407 };
408
409 let in_mgrs_ref: [&mut StaticMemoryManager<InP, 16>; IN] = match in_mgrs_ref_vec.into_array() {
410 Ok(arr) => arr,
411 Err(_) => panic!("in_mgrs_ref_vec length mismatch"),
412 };
413
414 let out_mgrs_ref: [&mut StaticMemoryManager<OutP, 16>; OUT] =
415 match out_mgrs_ref_vec.into_array() {
416 Ok(arr) => arr,
417 Err(_) => panic!("out_mgrs_ref_vec length mismatch"),
418 };
419
420 let in_edge_ids = core::array::from_fn(|i| *inputs_ref[i].id().as_usize() as u32);
421 let out_edge_ids = core::array::from_fn(|o| *outputs_ref[o].id().as_usize() as u32);
422
423 crate::node::StepContext::new(
424 inputs_ref,
425 outputs_ref,
426 in_mgrs_ref,
427 out_mgrs_ref,
428 in_policies,
429 out_policies,
430 0u32,
431 in_edge_ids,
432 out_edge_ids,
433 clock,
434 telemetry,
435 )
436}
437
438#[allow(clippy::type_complexity)]
442fn build_step_context_with_out_policy<
443 'graph,
444 'telemetry,
445 'clock,
446 const IN: usize,
447 const OUT: usize,
448 InP,
449 OutP,
450 C,
451 T,
452>(
453 inputs: &'graph mut [EdgeLink<TestSpscRingBuf<16>>; IN],
454 outputs: &'graph mut [EdgeLink<TestSpscRingBuf<16>>; OUT],
455 in_managers: &'graph mut [StaticMemoryManager<InP, 16>; IN],
456 out_managers: &'graph mut [StaticMemoryManager<OutP, 16>; OUT],
457 out_policy: EdgePolicy,
458 clock: &'clock C,
459 telemetry: &'telemetry mut T,
460) -> crate::node::StepContext<
461 'graph,
462 'telemetry,
463 'clock,
464 IN,
465 OUT,
466 InP,
467 OutP,
468 EdgeLink<TestSpscRingBuf<16>>,
469 EdgeLink<TestSpscRingBuf<16>>,
470 StaticMemoryManager<InP, 16>,
471 StaticMemoryManager<OutP, 16>,
472 C,
473 T,
474>
475where
476 InP: crate::message::payload::Payload + Default + Clone,
477 OutP: crate::message::payload::Payload + Default + Clone,
478 C: PlatformClock + Sized,
479 T: Telemetry + Sized,
480{
481 let in_policies = core::array::from_fn(|_| TEST_EDGE_POLICY);
482 let out_policies = core::array::from_fn(|_| out_policy); let mut inputs_ref_vec: Vec<&mut EdgeLink<TestSpscRingBuf<16>>, IN> = Vec::new();
485 for elem in inputs.iter_mut() {
486 assert!(inputs_ref_vec.push(elem).is_ok(), "inputs_ref_vec overflow");
487 }
488 let mut outputs_ref_vec: Vec<&mut EdgeLink<TestSpscRingBuf<16>>, OUT> = Vec::new();
489 for elem in outputs.iter_mut() {
490 assert!(
491 outputs_ref_vec.push(elem).is_ok(),
492 "outputs_ref_vec overflow"
493 );
494 }
495 let mut in_mgrs_ref_vec: Vec<&mut StaticMemoryManager<InP, 16>, IN> = Vec::new();
496 for elem in in_managers.iter_mut() {
497 assert!(
498 in_mgrs_ref_vec.push(elem).is_ok(),
499 "in_mgrs_ref_vec overflow"
500 );
501 }
502 let mut out_mgrs_ref_vec: Vec<&mut StaticMemoryManager<OutP, 16>, OUT> = Vec::new();
503 for elem in out_managers.iter_mut() {
504 assert!(
505 out_mgrs_ref_vec.push(elem).is_ok(),
506 "out_mgrs_ref_vec overflow"
507 );
508 }
509
510 let inputs_ref: [&mut EdgeLink<TestSpscRingBuf<16>>; IN] = match inputs_ref_vec.into_array() {
511 Ok(arr) => arr,
512 Err(_) => panic!("inputs_ref_vec length mismatch"),
513 };
514 let outputs_ref: [&mut EdgeLink<TestSpscRingBuf<16>>; OUT] = match outputs_ref_vec.into_array()
515 {
516 Ok(arr) => arr,
517 Err(_) => panic!("outputs_ref_vec length mismatch"),
518 };
519 let in_mgrs_ref: [&mut StaticMemoryManager<InP, 16>; IN] = match in_mgrs_ref_vec.into_array() {
520 Ok(arr) => arr,
521 Err(_) => panic!("in_mgrs_ref_vec length mismatch"),
522 };
523 let out_mgrs_ref: [&mut StaticMemoryManager<OutP, 16>; OUT] =
524 match out_mgrs_ref_vec.into_array() {
525 Ok(arr) => arr,
526 Err(_) => panic!("out_mgrs_ref_vec length mismatch"),
527 };
528
529 let in_edge_ids = core::array::from_fn(|i| *inputs_ref[i].id().as_usize() as u32);
530 let out_edge_ids = core::array::from_fn(|o| *outputs_ref[o].id().as_usize() as u32);
531
532 crate::node::StepContext::new(
533 inputs_ref,
534 outputs_ref,
535 in_mgrs_ref,
536 out_mgrs_ref,
537 in_policies,
538 out_policies,
539 0u32,
540 in_edge_ids,
541 out_edge_ids,
542 clock,
543 telemetry,
544 )
545}
546
547pub fn run_initialize_start_stop_roundtrip<N, const IN: usize, const OUT: usize, InP, OutP>(
558 mut make_nodelink: impl FnMut() -> NodeLink<N, IN, OUT, InP, OutP>,
559) where
560 InP: crate::message::payload::Payload + Default + Clone,
561 OutP: crate::message::payload::Payload + Default + Clone,
562 N: crate::node::Node<IN, OUT, InP, OutP>,
563{
564 let mut nlink = make_nodelink();
565 let clock = NoStdLinuxMonotonicClock::new();
566 let mut tele = make_graph_telemetry();
567
568 nlink.initialize(&clock, &mut tele).expect("init ok");
569 nlink.start(&clock, &mut tele).expect("start ok");
570
571 let _ = nlink
572 .on_watchdog_timeout(&clock, &mut tele)
573 .expect("watchdog ok");
574
575 nlink.stop(&clock, &mut tele).expect("stop ok");
576}
577
578pub fn run_process_message_enqueues_and_made_progress<
589 N,
590 const IN: usize,
591 const OUT: usize,
592 InP,
593 OutP,
594>(
595 mut make_nodelink: impl FnMut() -> NodeLink<N, IN, OUT, InP, OutP>,
596) where
597 InP: crate::message::payload::Payload + Default + Clone,
598 OutP: crate::message::payload::Payload + Default + Clone,
599 N: crate::node::Node<IN, OUT, InP, OutP>,
600{
601 let mut nlink = make_nodelink();
602 let clock = NoStdLinuxMonotonicClock::new();
603 let mut tele = make_graph_telemetry();
604
605 nlink.initialize(&clock, &mut tele).expect("init ok");
606
607 let (mut in_links, mut out_links) =
608 make_edge_links_for_node::<IN, OUT>(NodeIndex::new(0), NodeIndex::new(1));
609 let mut in_mgrs: [StaticMemoryManager<InP, 16>; IN] =
610 core::array::from_fn(|_| StaticMemoryManager::new());
611 let mut out_mgrs: [StaticMemoryManager<OutP, 16>; OUT] =
612 core::array::from_fn(|_| StaticMemoryManager::new());
613
614 if IN == 0 {
615 return;
616 }
617
618 let mut hdr = MessageHeader::empty();
619 hdr.set_creation_tick(clock.now_ticks());
620 let msg = Message::new(hdr, InP::default());
621
622 let in_policy = TEST_EDGE_POLICY;
623 let token = in_mgrs[0].store(msg).expect("store ok");
624 assert_eq!(
625 in_links[0].try_push(token, &in_policy, &in_mgrs[0]),
626 crate::edge::EnqueueResult::Enqueued
627 );
628
629 let mut ctx = build_step_context(
630 &mut in_links,
631 &mut out_links,
632 &mut in_mgrs,
633 &mut out_mgrs,
634 &clock,
635 &mut tele,
636 );
637
638 let res = nlink.step(&mut ctx).expect("step ok");
639 assert!(res != crate::node::StepResult::NoInput);
640
641 if OUT > 0 {
642 let mut pushed = 0usize;
643 loop {
644 match out_links[0].try_pop(&out_mgrs[0]) {
645 Ok(_token) => pushed += 1,
646 Err(QueueError::Empty) => break,
647 Err(e) => panic!("unexpected queue error: {:?}", e),
648 }
649 }
650 assert!(
651 pushed > 0,
652 "expected node to push at least one message on output 0"
653 );
654 }
655
656 let metrics = tele.metrics();
657 let processed = metrics.nodes()[0].processed();
658 assert!(
659 *processed >= 1u64,
660 "expected processed >= 1, got {}",
661 processed
662 );
663}
664
665pub fn run_step_on_empty_returns_noinput<N, const IN: usize, const OUT: usize, InP, OutP>(
671 mut make_nodelink: impl FnMut() -> NodeLink<N, IN, OUT, InP, OutP>,
672) where
673 InP: crate::message::payload::Payload + Default + Clone,
674 OutP: crate::message::payload::Payload + Default + Clone,
675 N: crate::node::Node<IN, OUT, InP, OutP>,
676{
677 let mut nlink = make_nodelink();
678 let clock = NoStdLinuxMonotonicClock::new();
679 let mut tele = make_graph_telemetry();
680
681 let (mut in_links, mut out_links) =
682 make_edge_links_for_node::<IN, OUT>(NodeIndex::new(0), NodeIndex::new(1));
683 let mut in_mgrs: [StaticMemoryManager<InP, 16>; IN] =
684 core::array::from_fn(|_| StaticMemoryManager::new());
685 let mut out_mgrs: [StaticMemoryManager<OutP, 16>; OUT] =
686 core::array::from_fn(|_| StaticMemoryManager::new());
687
688 let mut ctx = build_step_context(
689 &mut in_links,
690 &mut out_links,
691 &mut in_mgrs,
692 &mut out_mgrs,
693 &clock,
694 &mut tele,
695 );
696
697 let res = nlink.step(&mut ctx).expect("step ok");
698
699 if IN == 0 {
700 assert!(
701 res == crate::node::StepResult::NoInput || res == crate::node::StepResult::MadeProgress,
702 "expected NoInput or MadeProgress for zero-input node, got {:?}",
703 res
704 );
705 } else {
706 assert_eq!(res, crate::node::StepResult::NoInput);
707 }
708}
709
710pub fn run_step_pops_and_calls_process_message<N, const IN: usize, const OUT: usize, InP, OutP>(
719 mut make_nodelink: impl FnMut() -> NodeLink<N, IN, OUT, InP, OutP>,
720) where
721 InP: crate::message::payload::Payload + Default + Clone,
722 OutP: crate::message::payload::Payload + Default + Clone,
723 N: crate::node::Node<IN, OUT, InP, OutP>,
724{
725 let mut nlink = make_nodelink();
726 let clock = NoStdLinuxMonotonicClock::new();
727 let mut tele = make_graph_telemetry();
728 nlink.initialize(&clock, &mut tele).expect("init ok");
729
730 let (mut in_links, mut out_links) =
731 make_edge_links_for_node::<IN, OUT>(NodeIndex::new(0), NodeIndex::new(1));
732 let mut in_mgrs: [StaticMemoryManager<InP, 16>; IN] =
733 core::array::from_fn(|_| StaticMemoryManager::new());
734 let mut out_mgrs: [StaticMemoryManager<OutP, 16>; OUT] =
735 core::array::from_fn(|_| StaticMemoryManager::new());
736
737 if IN == 0 {
738 return;
739 }
740
741 let mut hdr = MessageHeader::empty();
742 hdr.set_creation_tick(clock.now_ticks());
743 let msg = Message::new(hdr, InP::default());
744
745 let policy = TEST_EDGE_POLICY;
746 let token = in_mgrs[0].store(msg).expect("store ok");
747 assert_eq!(
748 in_links[0].try_push(token, &policy, &in_mgrs[0]),
749 crate::edge::EnqueueResult::Enqueued
750 );
751
752 let mut ctx = build_step_context(
753 &mut in_links,
754 &mut out_links,
755 &mut in_mgrs,
756 &mut out_mgrs,
757 &clock,
758 &mut tele,
759 );
760
761 let res = nlink.step(&mut ctx).expect("step ok");
762 assert!(res != crate::node::StepResult::NoInput);
763
764 if OUT > 0 {
765 let mut popped = 0usize;
766 while let Ok(_token) = out_links[0].try_pop(&out_mgrs[0]) {
767 popped += 1;
768 }
769 assert!(popped > 0, "expected output items");
770 }
771
772 let metrics = tele.metrics();
773 assert!(*metrics.nodes()[0].processed() >= 1u64);
774}
775
776pub fn run_step_batch_fixed_n_disjoint<N, const IN: usize, const OUT: usize, InP, OutP>(
786 mut make_nodelink: impl FnMut() -> NodeLink<N, IN, OUT, InP, OutP>,
787) where
788 InP: crate::message::payload::Payload + Default + Clone,
789 OutP: crate::message::payload::Payload + Default + Clone,
790 N: crate::node::Node<IN, OUT, InP, OutP>,
791{
792 let mut nlink = make_nodelink();
793 const TEST_FIXED_N: usize = 3;
794
795 let base_policy = nlink.node().policy();
796 let batching = crate::policy::BatchingPolicy::with_window(
797 Some(TEST_FIXED_N),
798 None,
799 crate::policy::WindowKind::Disjoint,
800 );
801 let new_policy = NodePolicy::new(batching, *base_policy.budget(), *base_policy.deadline());
802 nlink.set_policy(new_policy);
803
804 let clock = NoStdLinuxMonotonicClock::new();
805 let mut tele = make_graph_telemetry();
806 nlink.initialize(&clock, &mut tele).expect("init ok");
807
808 let (mut in_links, mut out_links) =
809 make_edge_links_for_node::<IN, OUT>(NodeIndex::new(0), NodeIndex::new(1));
810 let mut in_mgrs: [StaticMemoryManager<InP, 16>; IN] =
811 core::array::from_fn(|_| StaticMemoryManager::new());
812 let mut out_mgrs: [StaticMemoryManager<OutP, 16>; OUT] =
813 core::array::from_fn(|_| StaticMemoryManager::new());
814
815 if IN == 0 {
816 return;
817 }
818
819 let fixed_n = nlink.node().policy().batching().fixed_n().unwrap_or(1usize);
820
821 let policy = TEST_EDGE_POLICY;
822 for t in 1u64..=(fixed_n as u64 + 1) {
823 let mut hdr = MessageHeader::empty();
824 hdr.set_creation_tick(Ticks::new(t));
825 let m = Message::new(hdr, InP::default());
826 let token = in_mgrs[0].store(m).expect("store ok");
827 assert_eq!(
828 in_links[0].try_push(token, &policy, &in_mgrs[0]),
829 crate::edge::EnqueueResult::Enqueued
830 );
831 }
832
833 let in_before = *in_links[0].occupancy(&policy).items();
834
835 let mut ctx = build_step_context(
836 &mut in_links,
837 &mut out_links,
838 &mut in_mgrs,
839 &mut out_mgrs,
840 &clock,
841 &mut tele,
842 );
843
844 let res = nlink.step(&mut ctx).expect("step_batch ok");
845 assert!(res != crate::node::StepResult::NoInput);
846
847 let in_after = *ctx.in_occupancy(0).items();
848 assert_eq!(
849 in_before.saturating_sub(in_after),
850 fixed_n,
851 "expected fixed_n items popped from input"
852 );
853
854 if OUT > 0 {
855 let mut out_count = 0usize;
856 while let Ok(_token) = out_links[0].try_pop(&out_mgrs[0]) {
857 out_count += 1;
858 }
859 if fixed_n > 0 {
860 assert_eq!(
861 out_count, fixed_n,
862 "expected out_count == fixed_n (got {}, fixed_n={})",
863 out_count, fixed_n
864 );
865 } else {
866 assert!(out_count >= 1, "expected at least one output");
867 }
868 }
869
870 let metrics = tele.metrics();
871 if fixed_n > 1 {
872 assert_eq!(
873 *metrics.nodes()[0].processed(),
874 fixed_n as u64,
875 "expected processed == fixed_n for batched step"
876 );
877 } else {
878 assert!(*metrics.nodes()[0].processed() >= 1u64);
879 }
880}
881
882pub fn run_step_batch_sliding_window<N, const IN: usize, const OUT: usize, InP, OutP>(
893 mut make_nodelink: impl FnMut() -> NodeLink<N, IN, OUT, InP, OutP>,
894) where
895 InP: crate::message::payload::Payload + Default + Clone,
896 OutP: crate::message::payload::Payload + Default + Clone,
897 N: crate::node::Node<IN, OUT, InP, OutP>,
898{
899 let mut nlink = make_nodelink();
900
901 const TEST_FIXED_N: usize = 4;
902 const TEST_STRIDE: usize = 2;
903
904 let base_policy = nlink.node().policy();
905 let batching = crate::policy::BatchingPolicy::with_window(
906 Some(TEST_FIXED_N),
907 None,
908 crate::policy::WindowKind::Sliding(crate::policy::SlidingWindow::new(TEST_STRIDE)),
909 );
910 let new_policy = NodePolicy::new(batching, *base_policy.budget(), *base_policy.deadline());
911 nlink.set_policy(new_policy);
912
913 let clock = NoStdLinuxMonotonicClock::new();
914 let mut tele = make_graph_telemetry();
915 nlink.initialize(&clock, &mut tele).expect("init ok");
916
917 let (mut in_links, mut out_links) =
918 make_edge_links_for_node::<IN, OUT>(NodeIndex::new(0), NodeIndex::new(1));
919 let mut in_mgrs: [StaticMemoryManager<InP, 16>; IN] =
920 core::array::from_fn(|_| StaticMemoryManager::new());
921 let mut out_mgrs: [StaticMemoryManager<OutP, 16>; OUT] =
922 core::array::from_fn(|_| StaticMemoryManager::new());
923
924 if IN == 0 {
925 return;
926 }
927
928 let policy = TEST_EDGE_POLICY;
929 for t in 1u64..=6u64 {
930 let mut hdr = MessageHeader::empty();
931 hdr.set_creation_tick(Ticks::new(t));
932 let m = Message::new(hdr, InP::default());
933 let token = in_mgrs[0].store(m).expect("store ok");
934 assert_eq!(
935 in_links[0].try_push(token, &policy, &in_mgrs[0]),
936 crate::edge::EnqueueResult::Enqueued
937 );
938 }
939
940 let in_before = *in_links[0].occupancy(&policy).items();
941
942 let mut ctx = build_step_context(
943 &mut in_links,
944 &mut out_links,
945 &mut in_mgrs,
946 &mut out_mgrs,
947 &clock,
948 &mut tele,
949 );
950
951 let res = nlink.step(&mut ctx).expect("step_batch ok");
952 assert!(res != crate::node::StepResult::NoInput);
953
954 let in_after = *ctx.in_occupancy(0).items();
955
956 let stride_to_pop = core::cmp::min(TEST_STRIDE, in_before);
957 let removed = in_before.saturating_sub(in_after);
958
959 assert_eq!(
960 removed, stride_to_pop,
961 "unexpected number popped: removed={}, expected stride {}",
962 removed, stride_to_pop
963 );
964
965 let fixed_n = nlink.node().policy().batching().fixed_n().unwrap_or(1usize);
966 let expected_present = core::cmp::min(in_before, fixed_n);
967
968 if OUT > 0 {
969 let mut out_count = 0usize;
970 while let Ok(_token) = out_links[0].try_pop(&out_mgrs[0]) {
971 out_count += 1;
972 }
973 assert_eq!(
974 out_count, expected_present,
975 "expected out_count == expected_present (got {}, expected {})",
976 out_count, expected_present
977 );
978 }
979
980 let metrics = tele.metrics();
981 if fixed_n > 1 {
982 assert_eq!(
983 *metrics.nodes()[0].processed(),
984 fixed_n as u64,
985 "expected processed == fixed_n for batched step"
986 );
987 } else {
988 assert!(*metrics.nodes()[0].processed() >= 1u64);
989 }
990}
991
992pub fn run_step_maps_backpressure_and_errors<N, const IN: usize, const OUT: usize, InP, OutP>(
1005 mut make_nodelink: impl FnMut() -> NodeLink<N, IN, OUT, InP, OutP>,
1006) where
1007 InP: crate::message::payload::Payload + Default + Clone,
1008 OutP: crate::message::payload::Payload + Default + Clone,
1009 N: crate::node::Node<IN, OUT, InP, OutP>,
1010{
1011 if IN == 0 || OUT == 0 {
1012 return;
1013 }
1014
1015 let mut nlink = make_nodelink();
1016 let clock = NoStdLinuxMonotonicClock::new();
1017 let mut tele = make_graph_telemetry();
1018 nlink.initialize(&clock, &mut tele).expect("init ok");
1019
1020 let (mut in_links, mut out_links) =
1021 make_edge_links_for_node::<IN, OUT>(NodeIndex::new(0), NodeIndex::new(1));
1022 let mut in_mgrs: [StaticMemoryManager<InP, 16>; IN] =
1023 core::array::from_fn(|_| StaticMemoryManager::new());
1024 let mut out_mgrs: [StaticMemoryManager<OutP, 16>; OUT] =
1025 core::array::from_fn(|_| StaticMemoryManager::new());
1026
1027 let policy = TEST_EDGE_POLICY;
1029 loop {
1030 let dummy_out_msg = Message::new(MessageHeader::empty(), OutP::default());
1031 let token = match out_mgrs[0].store(dummy_out_msg) {
1032 Ok(t) => t,
1033 Err(_) => break, };
1035 match out_links[0].try_push(token, &policy, &out_mgrs[0]) {
1036 crate::edge::EnqueueResult::Enqueued => continue,
1037 crate::edge::EnqueueResult::DroppedNewest | crate::edge::EnqueueResult::Rejected => {
1038 break
1039 }
1040 }
1041 }
1042
1043 let mut hdr = MessageHeader::empty();
1045 hdr.set_creation_tick(clock.now_ticks());
1046 let msg = Message::new(hdr, InP::default());
1047 let token = in_mgrs[0].store(msg).expect("store ok");
1048 assert_eq!(
1049 in_links[0].try_push(token, &policy, &in_mgrs[0]),
1050 crate::edge::EnqueueResult::Enqueued
1051 );
1052
1053 let mut ctx = build_step_context(
1054 &mut in_links,
1055 &mut out_links,
1056 &mut in_mgrs,
1057 &mut out_mgrs,
1058 &clock,
1059 &mut tele,
1060 );
1061
1062 match nlink.step(&mut ctx) {
1063 Ok(res) => {
1064 assert!(res != crate::node::StepResult::NoInput);
1065 }
1066 Err(_e) => {
1067 }
1069 }
1070}
1071
1072pub fn run_source_specific_tests<N, const IN: usize, const OUT: usize, InP, OutP>(
1087 mut make_nodelink: impl FnMut() -> NodeLink<N, IN, OUT, InP, OutP>,
1088) where
1089 InP: crate::message::payload::Payload + Default + Clone,
1090 OutP: crate::message::payload::Payload + Default + Clone,
1091 N: crate::node::Node<IN, OUT, InP, OutP>,
1092{
1093 let mut nlink = make_nodelink();
1094 let kind = nlink.node().node_kind();
1095 if kind != crate::node::NodeKind::Source {
1096 return;
1097 }
1098
1099 let clock = NoStdLinuxMonotonicClock::new();
1100 let mut tele = make_graph_telemetry();
1101 nlink.initialize(&clock, &mut tele).expect("init ok");
1102
1103 let (mut in_links, mut out_links) =
1104 make_edge_links_for_node::<IN, OUT>(NodeIndex::new(0), NodeIndex::new(1));
1105 let mut in_mgrs: [StaticMemoryManager<InP, 16>; IN] =
1106 core::array::from_fn(|_| StaticMemoryManager::new());
1107 let mut out_mgrs: [StaticMemoryManager<OutP, 16>; OUT] =
1108 core::array::from_fn(|_| StaticMemoryManager::new());
1109
1110 let mut ctx = build_step_context(
1111 &mut in_links,
1112 &mut out_links,
1113 &mut in_mgrs,
1114 &mut out_mgrs,
1115 &clock,
1116 &mut tele,
1117 );
1118
1119 let res = nlink.step(&mut ctx).expect("step ok");
1120 assert!(
1121 res == crate::node::StepResult::NoInput || res == crate::node::StepResult::MadeProgress,
1122 "source.step should return NoInput or MadeProgress"
1123 );
1124
1125 let _ = nlink.step(&mut ctx);
1126}
1127
1128pub fn run_sink_specific_tests<N, const IN: usize, const OUT: usize, InP, OutP>(
1143 mut make_nodelink: impl FnMut() -> NodeLink<N, IN, OUT, InP, OutP>,
1144) where
1145 InP: crate::message::payload::Payload + Default + Clone,
1146 OutP: crate::message::payload::Payload + Default + Clone,
1147 N: crate::node::Node<IN, OUT, InP, OutP>,
1148{
1149 let mut nlink = make_nodelink();
1150 let kind = nlink.node().node_kind();
1151 if kind != crate::node::NodeKind::Sink {
1152 return;
1153 }
1154
1155 let clock = NoStdLinuxMonotonicClock::new();
1156 let mut tele = make_graph_telemetry();
1157 nlink.initialize(&clock, &mut tele).expect("init ok");
1158
1159 let (mut in_links, mut out_links) =
1160 make_edge_links_for_node::<IN, OUT>(NodeIndex::new(0), NodeIndex::new(1));
1161 let mut in_mgrs: [StaticMemoryManager<InP, 16>; IN] =
1162 core::array::from_fn(|_| StaticMemoryManager::new());
1163 let mut out_mgrs: [StaticMemoryManager<OutP, 16>; OUT] =
1164 core::array::from_fn(|_| StaticMemoryManager::new());
1165
1166 if IN == 0 {
1167 return;
1168 }
1169
1170 let mut hdr = MessageHeader::empty();
1171 hdr.set_creation_tick(clock.now_ticks());
1172 let msg = Message::new(hdr, InP::default());
1173 let policy = TEST_EDGE_POLICY;
1174 let token = in_mgrs[0].store(msg).expect("store ok");
1175 assert_eq!(
1176 in_links[0].try_push(token, &policy, &in_mgrs[0]),
1177 crate::edge::EnqueueResult::Enqueued
1178 );
1179
1180 let mut ctx = build_step_context(
1181 &mut in_links,
1182 &mut out_links,
1183 &mut in_mgrs,
1184 &mut out_mgrs,
1185 &clock,
1186 &mut tele,
1187 );
1188
1189 let res = nlink.step(&mut ctx);
1190 match res {
1191 Ok(r) => {
1192 assert!(
1193 r == crate::node::StepResult::MadeProgress || r == crate::node::StepResult::NoInput,
1194 "sink.step returned unexpected StepResult"
1195 );
1196 }
1197 Err(_e) => {}
1198 }
1199}
1200
1201pub fn run_model_batching_tests<N, const IN: usize, const OUT: usize, InP, OutP>(
1217 mut make_nodelink: impl FnMut() -> NodeLink<N, IN, OUT, InP, OutP>,
1218) where
1219 InP: crate::message::payload::Payload + Default + Clone,
1220 OutP: crate::message::payload::Payload + Default + Clone,
1221 N: crate::node::Node<IN, OUT, InP, OutP>,
1222{
1223 let mut nlink = make_nodelink();
1224 if nlink.node().node_kind() != crate::node::NodeKind::Model || IN != 1 || OUT != 1 {
1225 return;
1226 }
1227
1228 const TEST_FIXED_N: usize = 4;
1229 let base_policy = nlink.node().policy();
1230 let batching = crate::policy::BatchingPolicy::with_window(
1231 Some(TEST_FIXED_N),
1232 None,
1233 crate::policy::WindowKind::Disjoint,
1234 );
1235 let new_policy = NodePolicy::new(batching, *base_policy.budget(), *base_policy.deadline());
1236 nlink.set_policy(new_policy);
1237
1238 let clock = NoStdLinuxMonotonicClock::new();
1239 let mut tele = make_graph_telemetry();
1240 nlink.initialize(&clock, &mut tele).expect("init ok");
1241
1242 let (mut in_links, mut out_links) =
1243 make_edge_links_for_node::<IN, OUT>(NodeIndex::new(0), NodeIndex::new(1));
1244 let mut in_mgrs: [StaticMemoryManager<InP, 16>; IN] =
1245 core::array::from_fn(|_| StaticMemoryManager::new());
1246 let mut out_mgrs: [StaticMemoryManager<OutP, 16>; OUT] =
1247 core::array::from_fn(|_| StaticMemoryManager::new());
1248
1249 let requested_fixed = nlink.node().policy().batching().fixed_n().unwrap_or(1usize);
1250
1251 let policy = TEST_EDGE_POLICY;
1252 for t in 1u64..=(requested_fixed as u64) {
1253 let mut hdr = MessageHeader::empty();
1254 hdr.set_creation_tick(Ticks::new(t));
1255 let m = Message::new(hdr, InP::default());
1256 let token = in_mgrs[0].store(m).expect("store ok");
1257 assert_eq!(
1258 in_links[0].try_push(token, &policy, &in_mgrs[0]),
1259 crate::edge::EnqueueResult::Enqueued
1260 );
1261 }
1262
1263 let mut ctx = build_step_context(
1264 &mut in_links,
1265 &mut out_links,
1266 &mut in_mgrs,
1267 &mut out_mgrs,
1268 &clock,
1269 &mut tele,
1270 );
1271
1272 let res = nlink.step(&mut ctx).expect("step_batch ok");
1273 assert!(
1274 res != crate::node::StepResult::NoInput,
1275 "model.step_batch returned NoInput"
1276 );
1277
1278 if OUT > 0 {
1279 let mut out_count = 0usize;
1280 while let Ok(_token) = out_links[0].try_pop(&out_mgrs[0]) {
1281 out_count += 1;
1282 }
1283
1284 assert!(
1285 out_count >= 1,
1286 "expected at least one output from model batching"
1287 );
1288
1289 assert!(
1290 out_count <= requested_fixed,
1291 "unexpectedly produced more outputs ({}) than requested_fixed ({})",
1292 out_count,
1293 requested_fixed
1294 );
1295 }
1296
1297 let metrics = tele.metrics();
1298 assert_eq!(
1299 *metrics.nodes()[0].processed(),
1300 requested_fixed as u64,
1301 "expected processed == requested_fixed for a model batched step"
1302 );
1303}
1304
1305pub fn run_step_batch_fixed_n_max_delta_t_tests<N, const IN: usize, const OUT: usize, InP, OutP>(
1321 mut make_nodelink: impl FnMut() -> NodeLink<N, IN, OUT, InP, OutP>,
1322) where
1323 InP: crate::message::payload::Payload + Default + Clone,
1324 OutP: crate::message::payload::Payload + Default + Clone,
1325 N: crate::node::Node<IN, OUT, InP, OutP>,
1326{
1327 if IN == 0 {
1328 return;
1329 }
1330
1331 let mut nlink = make_nodelink();
1332
1333 const TEST_FIXED_N: usize = 4;
1334 const TEST_MAX_DELTA_TICKS: u64 = 5u64;
1335
1336 let base_policy = nlink.node().policy();
1337 let batching = crate::policy::BatchingPolicy::with_window(
1338 Some(TEST_FIXED_N),
1339 Some(crate::types::Ticks::new(TEST_MAX_DELTA_TICKS)),
1340 crate::policy::WindowKind::Disjoint,
1341 );
1342 let new_policy = NodePolicy::new(batching, *base_policy.budget(), *base_policy.deadline());
1343 nlink.set_policy(new_policy);
1344
1345 let policy_installed = *nlink.node().policy().batching();
1346 let fixed_opt = *policy_installed.fixed_n();
1347 let delta_opt = *policy_installed.max_delta_t();
1348 if fixed_opt.is_none() || delta_opt.is_none() {
1349 return;
1350 }
1351 let fixed_n = fixed_opt.unwrap();
1352 let max_delta = *delta_opt.unwrap().as_u64();
1353
1354 let clock = NoStdLinuxMonotonicClock::new();
1355 let mut tele = make_graph_telemetry();
1356 nlink.initialize(&clock, &mut tele).expect("init ok");
1357
1358 {
1360 let (mut in_links, mut out_links) =
1361 make_edge_links_for_node::<IN, OUT>(NodeIndex::new(0), NodeIndex::new(1));
1362 let mut in_mgrs: [StaticMemoryManager<InP, 16>; IN] =
1363 core::array::from_fn(|_| StaticMemoryManager::new());
1364 let mut out_mgrs: [StaticMemoryManager<OutP, 16>; OUT] =
1365 core::array::from_fn(|_| StaticMemoryManager::new());
1366
1367 let policy = TEST_EDGE_POLICY;
1368
1369 for i in 0..fixed_n {
1370 let tick = i as u64;
1371 let mut hdr = MessageHeader::empty();
1372 hdr.set_creation_tick(Ticks::new(tick));
1373 let m = Message::new(hdr, InP::default());
1374 let token = in_mgrs[0].store(m).expect("store ok");
1375 assert_eq!(
1376 in_links[0].try_push(token, &policy, &in_mgrs[0]),
1377 crate::edge::EnqueueResult::Enqueued
1378 );
1379 }
1380
1381 let metrics_before = tele.metrics();
1382 let processed_before = *metrics_before.nodes()[0].processed();
1383
1384 let mut ctx = build_step_context(
1385 &mut in_links,
1386 &mut out_links,
1387 &mut in_mgrs,
1388 &mut out_mgrs,
1389 &clock,
1390 &mut tele,
1391 );
1392
1393 let res = nlink.step(&mut ctx).expect("step_batch ok (valid span)");
1394 assert!(
1395 res != crate::node::StepResult::NoInput,
1396 "expected batch processed for valid span"
1397 );
1398
1399 if OUT > 0 {
1400 let mut out_count = 0usize;
1401 while let Ok(_token) = out_links[0].try_pop(&out_mgrs[0]) {
1402 out_count += 1;
1403 }
1404 assert_eq!(
1405 out_count, fixed_n,
1406 "expected exactly fixed_n outputs ({}) for valid span, got {}",
1407 fixed_n, out_count
1408 );
1409 }
1410
1411 let metrics_after = tele.metrics();
1412 let processed_after = *metrics_after.nodes()[0].processed();
1413 assert_eq!(
1414 processed_after.saturating_sub(processed_before),
1415 fixed_n as u64,
1416 "expected telemetry processed to increase by fixed_n for valid span"
1417 );
1418 }
1419
1420 {
1422 let (mut in_links, mut out_links) =
1423 make_edge_links_for_node::<IN, OUT>(NodeIndex::new(0), NodeIndex::new(1));
1424 let mut in_mgrs: [StaticMemoryManager<InP, 16>; IN] =
1425 core::array::from_fn(|_| StaticMemoryManager::new());
1426 let mut out_mgrs: [StaticMemoryManager<OutP, 16>; OUT] =
1427 core::array::from_fn(|_| StaticMemoryManager::new());
1428
1429 let policy = TEST_EDGE_POLICY;
1430 for i in 0..fixed_n {
1431 let tick = (i as u64) * (max_delta + 1000u64);
1432 let mut hdr = MessageHeader::empty();
1433 hdr.set_creation_tick(Ticks::new(tick));
1434 let m = Message::new(hdr, InP::default());
1435 let token = in_mgrs[0].store(m).expect("store ok");
1436 assert_eq!(
1437 in_links[0].try_push(token, &policy, &in_mgrs[0]),
1438 crate::edge::EnqueueResult::Enqueued
1439 );
1440 }
1441
1442 let metrics_before_invalid = tele.metrics();
1443 let processed_before_invalid = *metrics_before_invalid.nodes()[0].processed();
1444
1445 let mut ctx = build_step_context(
1446 &mut in_links,
1447 &mut out_links,
1448 &mut in_mgrs,
1449 &mut out_mgrs,
1450 &clock,
1451 &mut tele,
1452 );
1453
1454 let res = nlink.step(&mut ctx).expect("step_batch ok (invalid span)");
1455
1456 if res == crate::node::StepResult::NoInput {
1457 let metrics_after_invalid = tele.metrics();
1458 let processed_after_invalid = *metrics_after_invalid.nodes()[0].processed();
1459 assert_eq!(
1460 processed_after_invalid, processed_before_invalid,
1461 "expected no telemetry change when invalid span results in NoInput"
1462 );
1463 } else {
1464 assert_eq!(
1465 res,
1466 crate::node::StepResult::MadeProgress,
1467 "unexpected StepResult for invalid span: {:?}",
1468 res
1469 );
1470
1471 if OUT > 0 {
1472 let mut out_count = 0usize;
1473 while let Ok(_token) = out_links[0].try_pop(&out_mgrs[0]) {
1474 out_count += 1;
1475 }
1476 assert!(
1477 out_count > 0 && out_count < fixed_n,
1478 "expected partial progress for invalid span (0 < out_count < fixed_n), got {}",
1479 out_count
1480 );
1481 }
1482 }
1483 }
1484}
1485
1486pub fn run_push_output_drop_oldest_evicts_oldest_once<
1496 N,
1497 const IN: usize,
1498 const OUT: usize,
1499 InP,
1500 OutP,
1501>(
1502 mut make_nodelink: impl FnMut() -> NodeLink<N, IN, OUT, InP, OutP>,
1503) where
1504 InP: crate::message::payload::Payload + Default + Clone,
1505 OutP: crate::message::payload::Payload + Default + Clone,
1506 N: crate::node::Node<IN, OUT, InP, OutP>,
1507{
1508 if IN == 0 || OUT == 0 {
1509 return;
1510 }
1511
1512 let mut nlink = make_nodelink();
1513 let clock = NoStdLinuxMonotonicClock::new();
1514 let mut tele = make_graph_telemetry();
1515 nlink.initialize(&clock, &mut tele).expect("init ok");
1516
1517 let (mut in_links, mut out_links) =
1518 make_edge_links_for_node::<IN, OUT>(NodeIndex::new(0), NodeIndex::new(1));
1519 let mut in_mgrs: [StaticMemoryManager<InP, 16>; IN] =
1520 core::array::from_fn(|_| StaticMemoryManager::new());
1521 let mut out_mgrs: [StaticMemoryManager<OutP, 16>; OUT] =
1522 core::array::from_fn(|_| StaticMemoryManager::new());
1523
1524 for i in 0u64..3 {
1528 let mut hdr = MessageHeader::empty();
1529 hdr.set_creation_tick(Ticks::new(i + 1));
1530 let tok = out_mgrs[0]
1531 .store(Message::new(hdr, OutP::default()))
1532 .expect("store filler");
1533 assert_eq!(
1534 out_links[0].try_push(tok, &TEST_EDGE_POLICY, &out_mgrs[0]),
1535 crate::edge::EnqueueResult::Enqueued,
1536 );
1537 }
1538
1539 let in_tok = {
1541 let mut hdr = MessageHeader::empty();
1542 hdr.set_creation_tick(clock.now_ticks());
1543 in_mgrs[0]
1544 .store(Message::new(hdr, InP::default()))
1545 .expect("store input")
1546 };
1547 assert_eq!(
1548 in_links[0].try_push(in_tok, &TEST_EDGE_POLICY, &in_mgrs[0]),
1549 crate::edge::EnqueueResult::Enqueued,
1550 );
1551
1552 let mut ctx = build_step_context_with_out_policy(
1555 &mut in_links,
1556 &mut out_links,
1557 &mut in_mgrs,
1558 &mut out_mgrs,
1559 TEST_DROP_OLDEST_POLICY,
1560 &clock,
1561 &mut tele,
1562 );
1563
1564 let res = nlink.step(&mut ctx).expect("step ok");
1565 assert_eq!(res, crate::node::StepResult::MadeProgress);
1566
1567 let occ = out_links[0].occupancy(&TEST_DROP_OLDEST_POLICY);
1569 assert_eq!(
1570 *occ.items(),
1571 3,
1572 "expected 3 items (exactly 1 evicted, 1 pushed); double-eviction gives 2"
1573 );
1574}
1575
1576pub fn run_push_output_no_token_leak_on_backpressure<
1580 N,
1581 const IN: usize,
1582 const OUT: usize,
1583 InP,
1584 OutP,
1585>(
1586 mut make_nodelink: impl FnMut() -> NodeLink<N, IN, OUT, InP, OutP>,
1587) where
1588 InP: crate::message::payload::Payload + Default + Clone,
1589 OutP: crate::message::payload::Payload + Default + Clone,
1590 N: crate::node::Node<IN, OUT, InP, OutP>,
1591{
1592 if IN == 0 || OUT == 0 {
1593 return;
1594 }
1595
1596 let mut nlink = make_nodelink();
1597 let clock = NoStdLinuxMonotonicClock::new();
1598 let mut tele = make_graph_telemetry();
1599 nlink.initialize(&clock, &mut tele).expect("init ok");
1600
1601 let tight_drop_newest = EdgePolicy::new(
1604 QueueCaps::new(2, 1, None, None),
1605 AdmissionPolicy::DropNewest,
1606 OverBudgetAction::Drop,
1607 );
1608
1609 let (mut in_links, mut out_links) =
1610 make_edge_links_for_node::<IN, OUT>(NodeIndex::new(0), NodeIndex::new(1));
1611 let mut in_mgrs: [StaticMemoryManager<InP, 16>; IN] =
1612 core::array::from_fn(|_| StaticMemoryManager::new());
1613 let mut out_mgrs: [StaticMemoryManager<OutP, 16>; OUT] =
1614 core::array::from_fn(|_| StaticMemoryManager::new());
1615
1616 {
1619 let mut hdr = MessageHeader::empty();
1620 hdr.set_creation_tick(Ticks::new(1));
1621 let tok = out_mgrs[0]
1622 .store(Message::new(hdr, OutP::default()))
1623 .expect("store filler");
1624 assert_eq!(
1625 out_links[0].try_push(tok, &tight_drop_newest, &out_mgrs[0]),
1626 crate::edge::EnqueueResult::Enqueued,
1627 );
1628 }
1629
1630 let available_before = out_mgrs[0].available();
1633
1634 let in_tok = {
1636 let mut hdr = MessageHeader::empty();
1637 hdr.set_creation_tick(clock.now_ticks());
1638 in_mgrs[0]
1639 .store(Message::new(hdr, InP::default()))
1640 .expect("store input")
1641 };
1642 assert_eq!(
1643 in_links[0].try_push(in_tok, &TEST_EDGE_POLICY, &in_mgrs[0]),
1644 crate::edge::EnqueueResult::Enqueued,
1645 );
1646
1647 let mut ctx = build_step_context_with_out_policy(
1648 &mut in_links,
1649 &mut out_links,
1650 &mut in_mgrs,
1651 &mut out_mgrs,
1652 tight_drop_newest,
1653 &clock,
1654 &mut tele,
1655 );
1656
1657 let res = nlink.step(&mut ctx).expect("step ok");
1660 assert_eq!(res, crate::node::StepResult::Backpressured);
1661
1662 assert_eq!(
1664 out_mgrs[0].available(),
1665 available_before,
1666 "manager slot leaked: push_output must free token on DropNewest backpressure"
1667 );
1668
1669 let occ = out_links[0].occupancy(&tight_drop_newest);
1671 assert_eq!(
1672 *occ.items(),
1673 1,
1674 "queue occupancy must not change on backpressure"
1675 );
1676}
1677
1678pub fn run_push_output_evict_until_below_hard_no_double_eviction<
1687 N,
1688 const IN: usize,
1689 const OUT: usize,
1690 InP,
1691 OutP,
1692>(
1693 mut make_nodelink: impl FnMut() -> NodeLink<N, IN, OUT, InP, OutP>,
1694) where
1695 InP: crate::message::payload::Payload + Default + Clone,
1696 OutP: crate::message::payload::Payload + Default + Clone,
1697 N: crate::node::Node<IN, OUT, InP, OutP>,
1698{
1699 if IN == 0 || OUT == 0 {
1700 return;
1701 }
1702
1703 let mut nlink = make_nodelink();
1704 let clock = NoStdLinuxMonotonicClock::new();
1705 let mut tele = make_graph_telemetry();
1706 nlink.initialize(&clock, &mut tele).expect("init ok");
1707
1708 let (mut in_links, mut out_links) =
1709 make_edge_links_for_node::<IN, OUT>(NodeIndex::new(0), NodeIndex::new(1));
1710 let mut in_mgrs: [StaticMemoryManager<InP, 16>; IN] =
1711 core::array::from_fn(|_| StaticMemoryManager::new());
1712 let mut out_mgrs: [StaticMemoryManager<OutP, 16>; OUT] =
1713 core::array::from_fn(|_| StaticMemoryManager::new());
1714
1715 for i in 0u64..4 {
1718 let mut hdr = MessageHeader::empty();
1719 hdr.set_creation_tick(Ticks::new(i + 1));
1720 let tok = out_mgrs[0]
1721 .store(Message::new(hdr, OutP::default()))
1722 .expect("store filler");
1723 assert_eq!(
1724 out_links[0].try_push(tok, &TEST_EDGE_POLICY, &out_mgrs[0]),
1725 crate::edge::EnqueueResult::Enqueued,
1726 );
1727 }
1728
1729 let in_tok = {
1731 let mut hdr = MessageHeader::empty();
1732 hdr.set_creation_tick(clock.now_ticks());
1733 in_mgrs[0]
1734 .store(Message::new(hdr, InP::default()))
1735 .expect("store input")
1736 };
1737 assert_eq!(
1738 in_links[0].try_push(in_tok, &TEST_EDGE_POLICY, &in_mgrs[0]),
1739 crate::edge::EnqueueResult::Enqueued,
1740 );
1741
1742 let mut ctx = build_step_context_with_out_policy(
1745 &mut in_links,
1746 &mut out_links,
1747 &mut in_mgrs,
1748 &mut out_mgrs,
1749 TEST_DROP_OLDEST_POLICY,
1750 &clock,
1751 &mut tele,
1752 );
1753
1754 let res = nlink.step(&mut ctx).expect("step ok");
1755 assert_eq!(res, crate::node::StepResult::MadeProgress);
1756
1757 let occ = out_links[0].occupancy(&TEST_DROP_OLDEST_POLICY);
1760 assert_eq!(
1761 *occ.items(),
1762 4,
1763 "expected 4 items (1 pre-evicted, 1 pushed, net stable); \
1764 double-eviction (old try_push Evict branch) gives 3"
1765 );
1766}