Skip to main content

limen_core/node/
bench.rs

1//! (Work)bench [test] Node implementations.
2
3use super::*;
4
5use crate::compute::{BackendCapabilities, ComputeBackend, ComputeModel, ModelMetadata};
6use crate::errors::{InferenceError, InferenceErrorKind};
7use crate::memory::{MemoryClass, PlacementAcceptance};
8use crate::message::Message;
9use crate::message::{MessageFlags, MessageHeader};
10use crate::node::model::InferenceModel;
11use crate::node::sink::Sink;
12use crate::node::source::Source;
13use crate::prelude::{create_test_tensor_from_array, TestTensor, TEST_TENSOR_BYTE_COUNT};
14use crate::types::{DeadlineNs, QoSClass, SequenceNumber, TraceId};
15
16#[cfg(feature = "std")]
17use crate::node::source::probe::{SourceIngressProbe, SourceIngressUpdater};
18
19use core::fmt::Write;
20
21/// Busy-waits for a pseudo random duration up to `max_delay_microseconds` microseconds.
22///
23/// # Input
24/// * `random_state`: mutable pseudo random number generator state; any `u32` value is accepted.
25///   If it is zero, it will be internally changed to one to avoid the all-zero XorShift32 state.
26/// * `max_delay_microseconds`: maximum delay in microseconds; if zero, this function returns immediately.
27fn random_test_node_delay(random_state: &mut u32, max_delay_microseconds: u32) {
28    // No delay requested.
29    if max_delay_microseconds == 0 {
30        return;
31    }
32
33    // XorShift32 step (simple pseudo random number generator)
34    if *random_state == 0 {
35        *random_state = 1;
36    }
37    let mut current_state = *random_state;
38    current_state ^= current_state << 13;
39    current_state ^= current_state >> 17;
40    current_state ^= current_state << 5;
41    *random_state = current_state;
42
43    // Random delay in "microseconds": 1..=max_delay_microseconds
44    let delay_microseconds = (current_state % max_delay_microseconds) + 1;
45
46    // Rough timing model for a laptop-class central processing unit.
47    let assumed_cpu_frequency_hertz: u32 = 2_000_000_000; // two gigahertz
48    let estimated_cpu_cycles_per_loop_iteration: u32 = 8;
49
50    // At two gigahertz there are two thousand cycles per microsecond.
51    let cycles_per_microsecond = assumed_cpu_frequency_hertz / 1_000_000;
52
53    // Convert cycles per microsecond into loop iterations per microsecond.
54    let mut iterations_per_microsecond =
55        cycles_per_microsecond / estimated_cpu_cycles_per_loop_iteration;
56    if iterations_per_microsecond == 0 {
57        iterations_per_microsecond = 1;
58    }
59
60    let total_iterations = delay_microseconds.saturating_mul(iterations_per_microsecond);
61
62    for _iteration in 0..total_iterations {
63        core::hint::spin_loop();
64    }
65}
66
67// -----------------------------------------------------------------------------
68// Test source node: 0 inputs, 1 output (TestTensor payload), emits
69// incrementing counter values encoded into a 3x3 tensor.
70// -----------------------------------------------------------------------------
71
72/// Encode a monotonically increasing counter into the shared `TestTensor`.
73///
74/// The counter is formatted as a zero-padded 9-digit decimal string
75/// (`counter % 1_000_000_000`) and then mapped row-major into the 3×3 tensor.
76///
77/// Examples:
78/// - `19`    -> `000000019` -> `[[0,0,0],[0,0,0],[0,1,9]]`
79/// - `10119` -> `000010119` -> `[[0,0,0],[0,1,0],[1,1,9]]`
80#[inline]
81fn create_test_tensor_from_counter(counter: u32) -> TestTensor {
82    let counter_modulo_nine_digits = counter % 1_000_000_000;
83
84    let digit_0 = (counter_modulo_nine_digits / 100_000_000) % 10;
85    let digit_1 = (counter_modulo_nine_digits / 10_000_000) % 10;
86    let digit_2 = (counter_modulo_nine_digits / 1_000_000) % 10;
87    let digit_3 = (counter_modulo_nine_digits / 100_000) % 10;
88    let digit_4 = (counter_modulo_nine_digits / 10_000) % 10;
89    let digit_5 = (counter_modulo_nine_digits / 1_000) % 10;
90    let digit_6 = (counter_modulo_nine_digits / 100) % 10;
91    let digit_7 = (counter_modulo_nine_digits / 10) % 10;
92    let digit_8 = counter_modulo_nine_digits % 10;
93
94    create_test_tensor_from_array([
95        [digit_0, digit_1, digit_2],
96        [digit_3, digit_4, digit_5],
97        [digit_6, digit_7, digit_8],
98    ])
99}
100
101/// A test source that:
102/// - Produces an incrementing counter encoded into a `TestTensor` on each `try_produce()`.
103/// - Exposes *ingress* pressure via either an internal backlog or a std probe.
104pub struct TestCounterSourceTensor<Clock, const BACKLOG_CAP: usize>
105where
106    Clock: PlatformClock,
107{
108    /// Monotonic platform clock used to stamp creation ticks.
109    clock: Clock,
110
111    // Next counter value to encode and emit.
112    next_counter_value_to_emit: u32,
113
114    // Header template fields:
115    trace_id: TraceId,
116    next_sequence: SequenceNumber,
117    deadline_ns: Option<DeadlineNs>,
118    qos: QoSClass,
119    flags: MessageFlags,
120
121    // Static properties:
122    node_capabilities: NodeCapabilities,
123    node_policy: NodePolicy,
124    output_placement_acceptance: [PlacementAcceptance; 1],
125    ingress_policy: EdgePolicy,
126
127    // ---- Upstream pressure modelling ----
128    // Layout: circular buffer with head index (oldest) and len (number of items).
129    // Capacity is small and fixed for tests.
130    backlog: [Option<Message<TestTensor>>; BACKLOG_CAP],
131    backlog_head: usize,
132    backlog_len: usize,
133    backlog_bytes: usize,
134
135    // std optional shared probe (if present, it is authoritative).
136    #[cfg(feature = "std")]
137    ingress_probe: Option<SourceIngressProbe>,
138    #[cfg(feature = "std")]
139    ingress_updater: Option<SourceIngressUpdater>,
140}
141
142impl<Clock, const BACKLOG_CAP: usize> TestCounterSourceTensor<Clock, BACKLOG_CAP>
143where
144    Clock: PlatformClock,
145{
146    /// Create a new tensor-producing test source.
147    #[allow(clippy::too_many_arguments)]
148    pub const fn new(
149        clock: Clock,
150        starting_value_inclusive: u32,
151        trace_id: TraceId,
152        starting_sequence: SequenceNumber,
153        deadline_ns: Option<DeadlineNs>,
154        qos: QoSClass,
155        flags: MessageFlags,
156        node_capabilities: NodeCapabilities,
157        node_policy: NodePolicy,
158        output_placement_acceptance: [PlacementAcceptance; 1],
159        ingress_policy: EdgePolicy,
160    ) -> Self {
161        // Hard check: backlog capacity must be at least the ingress edge max_items.
162        // Use a plain `panic!` with a string literal so this is `const`-friendly.
163        if BACKLOG_CAP < ingress_policy.caps.max_items {
164            panic!(
165                "TestCounterSourceTensor: backlog capacity must be >= ingress_policy.caps.max_items"
166            );
167        }
168
169        Self {
170            clock,
171            next_counter_value_to_emit: starting_value_inclusive,
172            trace_id,
173            next_sequence: starting_sequence,
174            deadline_ns,
175            qos,
176            flags,
177            node_capabilities,
178            node_policy,
179            output_placement_acceptance,
180            ingress_policy,
181            backlog: [None; BACKLOG_CAP],
182            backlog_head: 0usize,
183            backlog_len: 0usize,
184            backlog_bytes: 0usize,
185            #[cfg(feature = "std")]
186            ingress_probe: None,
187            #[cfg(feature = "std")]
188            ingress_updater: None,
189        }
190    }
191
192    /// Attach a std ingress probe + updater (authoritative for occupancy when present).
193    #[cfg(feature = "std")]
194    pub fn with_probe(mut self, probe: SourceIngressProbe, updater: SourceIngressUpdater) -> Self {
195        self.ingress_probe = Some(probe);
196        self.ingress_updater = Some(updater);
197        self
198    }
199
200    #[inline]
201    fn make_message(&self) -> Message<TestTensor> {
202        Message::new(
203            MessageHeader::new(
204                self.trace_id,
205                self.next_sequence,
206                self.clock.now_ticks(),
207                self.deadline_ns,
208                self.qos,
209                TEST_TENSOR_BYTE_COUNT,
210                self.flags,
211                MemoryClass::Host,
212            ),
213            create_test_tensor_from_counter(self.next_counter_value_to_emit),
214        )
215    }
216
217    /// Set a synthetic upstream backlog (items).
218    #[inline]
219    pub fn produce_n_items_in_backlog(&mut self, n: usize) {
220        // Append up to `n` synthetic items into the ring backlog without
221        // clearing existing items. If capacity reached, stop appending.
222        let mut to_add = n;
223        while to_add > 0 && self.backlog_len < BACKLOG_CAP {
224            let tail = (self.backlog_head + self.backlog_len) % BACKLOG_CAP;
225            self.backlog[tail] = Some(self.make_message());
226
227            self.backlog_len += 1;
228            to_add = to_add.saturating_sub(1);
229            self.backlog_bytes = self.backlog_len * TEST_TENSOR_BYTE_COUNT;
230
231            self.next_counter_value_to_emit = self.next_counter_value_to_emit.wrapping_add(1);
232            self.next_sequence = SequenceNumber::new(self.next_sequence.as_u64().wrapping_add(1));
233        }
234    }
235
236    /// Pop the oldest message from the software backlog (ring), if any.
237    ///
238    /// Returns `None` if the backlog is empty. This is destructive: it removes the
239    /// oldest item and advances the ring head.
240    #[inline]
241    fn try_pop_from_backlog(&mut self) -> Option<Message<TestTensor>> {
242        if self.backlog_len == 0 {
243            return None;
244        }
245
246        let head_index = self.backlog_head;
247
248        // Remove the oldest entry.
249        let message = self.backlog[head_index].take();
250
251        // Advance head and shrink length.
252        self.backlog_head = (self.backlog_head + 1) % BACKLOG_CAP;
253        self.backlog_len = self.backlog_len.saturating_sub(1);
254
255        // Keep bytes consistent with counters.
256        self.backlog_bytes = self.backlog_len * TEST_TENSOR_BYTE_COUNT;
257
258        message
259    }
260
261    #[inline]
262    fn random_backlog_add_count(&self) -> usize {
263        // Use the platform clock tick parity as a cheap jitter source.
264        // Even -> add 1, odd -> add 2.
265        let now_ticks_u64 = *self.clock.now_ticks().as_u64();
266        if (now_ticks_u64 & 1) == 0 {
267            1
268        } else {
269            2
270        }
271    }
272}
273
274impl<Clock, const BACKLOG_CAP: usize> Source<TestTensor, 1>
275    for TestCounterSourceTensor<Clock, BACKLOG_CAP>
276where
277    Clock: PlatformClock,
278{
279    type Error = core::convert::Infallible;
280
281    #[inline]
282    fn open(&mut self) -> Result<(), Self::Error> {
283        Ok(())
284    }
285
286    #[inline]
287    fn try_produce(&mut self) -> Option<(usize, Message<TestTensor>)> {
288        // Random test delay.
289        #[cfg(feature = "std")]
290        let mut random_seed: u32 = {
291            let now = std::time::SystemTime::now()
292                .duration_since(std::time::UNIX_EPOCH)
293                .unwrap_or_else(|e| e.duration());
294            (now.as_nanos() & 0xFFFF_FFFF) as u32
295        };
296        #[cfg(not(feature = "std"))]
297        let mut random_seed = 1;
298        random_test_node_delay(&mut random_seed, 250);
299
300        // Random ingress pressure update.
301        self.produce_n_items_in_backlog(self.random_backlog_add_count());
302
303        // Pop and send message.
304        self.try_pop_from_backlog().map(|message| (0, message))
305    }
306
307    #[inline]
308    fn ingress_occupancy(&self) -> EdgeOccupancy {
309        #[cfg(feature = "std")]
310        if let Some(probe) = &self.ingress_probe {
311            return probe.occupancy(&self.ingress_policy());
312        }
313
314        // Fallback to software backlog counters.
315        let items = self.backlog_len;
316        let bytes = self.backlog_bytes;
317        EdgeOccupancy::new(items, bytes, self.ingress_policy.watermark(items, bytes))
318    }
319
320    #[inline]
321    fn output_acceptance(&self) -> [PlacementAcceptance; 1] {
322        self.output_placement_acceptance
323    }
324
325    #[inline]
326    fn capabilities(&self) -> NodeCapabilities {
327        self.node_capabilities
328    }
329
330    #[inline]
331    fn policy(&self) -> NodePolicy {
332        self.node_policy
333    }
334
335    fn ingress_policy(&self) -> EdgePolicy {
336        self.ingress_policy
337    }
338
339    /// Peek the creation tick of the `item_index`'th ingress item (0 = oldest).
340    /// Non-blocking and non-destructive. Returns `None` if metadata is not
341    /// available (no backlog) or `item_index` is out of range.
342    #[inline]
343    fn peek_ingress_creation_tick(&self, item_index: usize) -> Option<u64> {
344        // If no backlog, nothing to peek.
345        if (self.backlog_len == 0) || (item_index >= self.backlog_len) {
346            return None;
347        }
348
349        Some(
350            *self.backlog[item_index]
351                .unwrap()
352                .header()
353                .creation_tick()
354                .as_u64(),
355        )
356    }
357}
358
359// -----------------------------------------------------------------------------
360// Test backend + model for TestTensor -> TestTensor identity, no alloc, no dyn, no unsafe.
361// -----------------------------------------------------------------------------
362
363/// ---------- Test model ----------
364pub struct TestTensorModel;
365
366impl ComputeModel<TestTensor, TestTensor> for TestTensorModel {
367    #[inline]
368    fn init(&mut self) -> Result<(), InferenceError> {
369        Ok(())
370    }
371
372    #[inline]
373    fn infer_one(&mut self, inp: &TestTensor, out: &mut TestTensor) -> Result<(), InferenceError> {
374        #[cfg(feature = "std")]
375        let mut random_seed: u32 = {
376            let now = std::time::SystemTime::now()
377                .duration_since(std::time::UNIX_EPOCH)
378                .unwrap_or_else(|e| e.duration());
379            (now.as_nanos() & 0xFFFF_FFFF) as u32
380        };
381        #[cfg(not(feature = "std"))]
382        let mut random_seed = 1;
383        random_test_node_delay(&mut random_seed, 500);
384
385        *out = *inp;
386        Ok(())
387    }
388
389    #[inline]
390    fn infer_batch(
391        &mut self,
392        inputs: crate::message::batch::Batch<'_, TestTensor>,
393        outputs: &mut [TestTensor],
394    ) -> Result<(), InferenceError> {
395        #[cfg(feature = "std")]
396        let mut random_seed: u32 = {
397            let now = std::time::SystemTime::now()
398                .duration_since(std::time::UNIX_EPOCH)
399                .unwrap_or_else(|e| e.duration());
400            (now.as_nanos() & 0xFFFF_FFFF) as u32
401        };
402        #[cfg(not(feature = "std"))]
403        let mut random_seed = 1;
404        random_test_node_delay(&mut random_seed, 1000);
405
406        let in_msgs = inputs.messages();
407        let in_len = in_msgs.len();
408
409        if outputs.len() < in_len {
410            return Err(InferenceError::new(InferenceErrorKind::ExecutionFailed, 0));
411        }
412
413        // copy inputs → outputs (identity) using references into Batch's messages
414        for (o, m) in outputs.iter_mut().zip(in_msgs.iter()) {
415            *o = *m.payload();
416        }
417
418        Ok(())
419    }
420
421    #[inline]
422    fn drain(&mut self) -> Result<(), InferenceError> {
423        Ok(())
424    }
425
426    #[inline]
427    fn reset(&mut self) -> Result<(), InferenceError> {
428        Ok(())
429    }
430
431    #[inline]
432    fn metadata(&self) -> ModelMetadata {
433        ModelMetadata::new(MemoryClass::Host, MemoryClass::Host, None, None)
434    }
435}
436
437/// ---------- Test backend ----------
438#[derive(Clone, Copy, Debug, Default)]
439pub struct TestTensorBackend;
440
441impl ComputeBackend<TestTensor, TestTensor> for TestTensorBackend {
442    type Model = TestTensorModel;
443    type Error = InferenceError;
444
445    // Unit descriptor: no artifact needed for this test model.
446    type ModelDescriptor<'d> = ();
447
448    #[inline]
449    fn capabilities(&self) -> BackendCapabilities {
450        BackendCapabilities::new(false, Some(usize::MAX), 0)
451    }
452
453    #[inline]
454    fn load_model<'d>(&self, _desc: Self::ModelDescriptor<'d>) -> Result<Self::Model, Self::Error> {
455        Ok(TestTensorModel)
456    }
457}
458
459/// Identity model node using the shared `TestTensor` payload.
460pub type TestIdentityModelNodeTensor<const MAX_BATCH: usize> =
461    InferenceModel<TestTensorBackend, TestTensor, TestTensor, MAX_BATCH>;
462
463impl<const MAX_BATCH: usize> TestIdentityModelNodeTensor<MAX_BATCH> {
464    /// Construct the identity test node with your policy/capability params.
465    #[inline]
466    pub fn new_identity(
467        node_capabilities: NodeCapabilities,
468        node_policy: NodePolicy,
469        input_placement_acceptance: [PlacementAcceptance; 1],
470        output_placement_acceptance: [PlacementAcceptance; 1],
471    ) -> Result<Self, InferenceError> {
472        let backend = TestTensorBackend;
473        InferenceModel::new(
474            backend,
475            (),
476            node_policy,
477            node_capabilities,
478            input_placement_acceptance,
479            output_placement_acceptance,
480        )
481    }
482
483    /// Handy constant for tests.
484    #[inline]
485    pub fn kind() -> NodeKind {
486        NodeKind::Model
487    }
488}
489
490// -----------------------------------------------------------------------------
491// Test sink node: 1 input, 0 outputs, logs full Message<TestTensor>
492// Implements `sink::Sink<TestTensor, 1>` so it can be used via `SinkNode<_, TestTensor, 1>`.
493// -----------------------------------------------------------------------------
494
495/// test sink
496pub struct TestSinkNodeTensor {
497    node_capabilities: NodeCapabilities,
498    node_policy: NodePolicy,
499    input_placement_acceptance: [PlacementAcceptance; 1],
500    printer: fn(&str),
501    processed: u32,
502}
503
504impl TestSinkNodeTensor {
505    /// new
506    pub const fn new(
507        node_capabilities: NodeCapabilities,
508        node_policy: NodePolicy,
509        input_placement_acceptance: [PlacementAcceptance; 1],
510        printer: fn(&str),
511    ) -> Self {
512        Self {
513            node_capabilities,
514            node_policy,
515            input_placement_acceptance,
516            printer,
517            processed: 0,
518        }
519    }
520
521    /// Returns the number of messages this sink has pushed out thhe graph
522    pub fn processed(&self) -> &u32 {
523        &self.processed
524    }
525}
526
527// simple fixed-size stack buffer implementing core::fmt::Write
528struct FixedBuf<const N: usize> {
529    buf: [u8; N],
530    len: usize,
531}
532
533impl<const N: usize> FixedBuf<N> {
534    #[inline]
535    const fn new() -> Self {
536        Self {
537            buf: [0; N],
538            len: 0,
539        }
540    }
541
542    #[inline]
543    fn as_str(&self) -> &str {
544        core::str::from_utf8(&self.buf[..self.len]).unwrap_or_default()
545    }
546}
547
548impl<const N: usize> core::fmt::Write for FixedBuf<N> {
549    fn write_str(&mut self, s: &str) -> core::fmt::Result {
550        let bytes = s.as_bytes();
551        // how many bytes can we write
552        let remaining = N.saturating_sub(self.len);
553        if remaining == 0 {
554            // nothing fits — silently accept (so formatting can continue),
555            // the buffer remains as-is.
556            return Ok(());
557        }
558
559        // copy as many bytes as will fit
560        let mut to_copy = core::cmp::min(bytes.len(), remaining);
561
562        // Copy the slice into the buffer
563        let dst = &mut self.buf[self.len..self.len + to_copy];
564        // Manual copy to avoid depending on std
565        for (d, &b) in dst.iter_mut().zip(&bytes[..to_copy]) {
566            *d = b;
567        }
568
569        // Ensure we don't end with a truncated UTF-8 sequence.
570        // If the last byte(s) are continuation bytes (0b10xxxxxx),
571        // back up to the last UTF-8 character boundary.
572        //
573        // UTF-8 continuation bytes have the top two bits == 0b10 (0x80..=0xBF).
574        // A leading byte never has those bits equal to 0b10.
575        //
576        // Walk backwards while the last byte is a continuation byte.
577        while to_copy > 0 {
578            let last = self.buf[self.len + to_copy - 1];
579            if (last & 0xC0) != 0x80 {
580                // we found a non-continuation byte => character boundary
581                break;
582            }
583            to_copy -= 1;
584        }
585
586        // If we backed off to zero, nothing valid fits from this write;
587        // undo the earlier copy in that case.
588        if to_copy == 0 {
589            // nothing could be written as a valid UTF-8 suffix; leave buffer unchanged.
590            return Ok(());
591        }
592
593        // Set the new length to include only the valid bytes
594        self.len += to_copy;
595        Ok(())
596    }
597}
598
599impl Sink<TestTensor, 1> for TestSinkNodeTensor {
600    type Error = core::convert::Infallible;
601
602    #[inline]
603    fn open(&mut self) -> Result<(), Self::Error> {
604        Ok(())
605    }
606
607    #[inline]
608    fn consume(&mut self, msg: &Message<TestTensor>) -> Result<(), Self::Error> {
609        #[cfg(feature = "std")]
610        let mut random_seed: u32 = {
611            let now = std::time::SystemTime::now()
612                .duration_since(std::time::UNIX_EPOCH)
613                .unwrap_or_else(|e| e.duration());
614            (now.as_nanos() & 0xFFFF_FFFF) as u32
615        };
616        #[cfg(not(feature = "std"))]
617        let mut random_seed = 1;
618        random_test_node_delay(&mut random_seed, 100);
619
620        let mut buf: FixedBuf<1024> = FixedBuf::new();
621        let _ = core::write!(&mut buf, "{:?}", msg);
622        (self.printer)(buf.as_str());
623
624        self.processed += 1;
625
626        Ok(())
627    }
628
629    #[inline]
630    fn input_acceptance(&self) -> [PlacementAcceptance; 1] {
631        self.input_placement_acceptance
632    }
633
634    #[inline]
635    fn capabilities(&self) -> NodeCapabilities {
636        self.node_capabilities
637    }
638
639    #[inline]
640    fn policy(&self) -> NodePolicy {
641        self.node_policy
642    }
643}
644
645#[cfg(test)]
646mod tests {
647    use super::*;
648    use crate::message::MessageFlags;
649    use crate::policy::{AdmissionPolicy, OverBudgetAction, QueueCaps};
650    use crate::prelude::NoStdLinuxMonotonicClock;
651    use crate::types::{NodeIndex, SequenceNumber, TraceId};
652
653    const TEST_INGRESS_POLICY: EdgePolicy = EdgePolicy::new(
654        QueueCaps::new(16, 14, None, None),
655        AdmissionPolicy::DropNewest,
656        OverBudgetAction::Drop,
657    );
658
659    // ------------- 1) Source node ----------------
660    //
661    // Runs the node contract tests for TestCounterSourceTensor
662    crate::run_node_contract_tests!(test_counter_source_contract, {
663        make_nodelink: || {
664            // Build a clock instance to pass into the constructor.
665            let clock = NoStdLinuxMonotonicClock::new();
666
667            // Static node params (tweak if you want different behaviour)
668            let start_value = 0u32;
669            let trace_id = TraceId::new(1);
670            let seq = SequenceNumber::new(1);
671            let deadline = None;
672            let qos = crate::types::QoSClass::BestEffort;
673            let flags = MessageFlags::empty();
674            let node_caps = crate::node::NodeCapabilities::default();
675            let node_policy = crate::policy::NodePolicy::default();
676            let output_accept = [crate::memory::PlacementAcceptance::default(); 1];
677            let ingress_policy = TEST_INGRESS_POLICY;
678
679            // Make the source instance (BACKLOG_CAP choose e.g. 8).
680            let src: TestCounterSourceTensor<_, 16> = TestCounterSourceTensor::new(
681                clock,
682                start_value,
683                trace_id,
684                seq,
685                deadline,
686                qos,
687                flags,
688                node_caps,
689                node_policy,
690                output_accept,
691                ingress_policy,
692            );
693
694            // Convert Source -> SourceNode using the convenience.
695            // into_sourcenode consumes `src` and returns SourceNode<_, TestTensor, 1>.
696            let src_node = src.into_sourcenode(crate::policy::NodePolicy::default());
697
698            // Construct a NodeLink owning the SourceNode. Use NodeIndex::new(0) and a static name.
699            crate::node::link::NodeLink::new(src_node, NodeIndex::new(0), Some("test-counter-source"))
700        }
701    });
702
703    // ------------- 2) Model node ----------------
704    //
705    // Runs the node contract tests for the identity inference model node.
706    crate::run_node_contract_tests!(test_identity_model_contract, {
707        make_nodelink: || {
708            // Build node params
709            let node_caps = crate::node::NodeCapabilities::default();
710            let node_policy = crate::policy::NodePolicy::default();
711            let input_accept = [crate::memory::PlacementAcceptance::default(); 1];
712            let output_accept = [crate::memory::PlacementAcceptance::default(); 1];
713
714            // Construct the model-backed inference node (MAX_BATCH choose a reasonable constant)
715            let node = TestIdentityModelNodeTensor::<8>::new_identity(
716                node_caps,
717                node_policy,
718                input_accept,
719                output_accept,
720            )
721            .expect("create identity model node");
722
723            // Wrap as a NodeLink with NodeIndex::new(1)
724            crate::node::link::NodeLink::new(node, NodeIndex::new(0), Some("test-identity-model"))
725        }
726    });
727
728    // ------------- 3) Sink node ----------------
729    //
730    // Runs the node contract tests for TestSinkNodeTensor wrapped in SinkNode.
731    crate::run_node_contract_tests!(test_sink_node_contract, {
732        make_nodelink: || {
733            let node_caps = crate::node::NodeCapabilities::default();
734            let node_policy = crate::policy::NodePolicy::default();
735            let input_accept = [crate::memory::PlacementAcceptance::default(); 1];
736
737            // Create sink instance; provide a simple printer that ignores the string in tests.
738            let sink = TestSinkNodeTensor::new(node_caps, node_policy, input_accept, |_s| {});
739
740            // Wrap into SinkNode via From::from (or SinkNode::new)
741            let sink_node = crate::node::sink::SinkNode::from(sink);
742
743            // Construct a NodeLink owning the SinkNode with NodeIndex::new(2)
744            crate::node::link::NodeLink::new(sink_node, NodeIndex::new(0), Some("test-sink"))
745        }
746    });
747}