Skip to main content

limen_core/graph/
bench.rs

1//! (Work)bench [test] Graph implementations.
2//!
3//! This module contains two hand-written graph implementations that exactly
4//! mirror what `limen-codegen` / `limen-build` produce for the following DSL
5//! input. They exist so that the codegen output is always verifiable against a
6//! known-correct, reviewed reference.
7//!
8//! ## Two separate codegen invocations
9//!
10//! `TestPipeline` (no-`std`, single-threaded) and `TestPipelineStd` (std,
11//! concurrent) are produced by **two independent codegen invocations** of the
12//! same logical pipeline. In a real project using the builder API this looks
13//! like:
14//!
15//! ```text
16//! // Invocation 1 — no_std graph (single-threaded runtime):
17//! GraphBuilder::new()
18//!     .vis(pub)
19//!     .name("TestPipeline")
20//!     .node(Node::new(0).ty("TestCounterSourceTensor<C, 32>").in_ports(0).out_ports(1)
21//!           .in_payload("()").out_payload("TestTensor").name("src").ingress_policy("Q_32_POLICY"))
22//!     .node(Node::new(1).ty("TestIdentityModelNodeTensor<32>").in_ports(1).out_ports(1)
23//!           .in_payload("TestTensor").out_payload("TestTensor").name("map"))
24//!     .node(Node::new(2).ty("TestSinkNodeTensor").in_ports(1).out_ports(0)
25//!           .in_payload("TestTensor").out_payload("()").name("snk"))
26//!     .edge(Edge::new(0).ty("Q32").payload("TestTensor").manager("StaticMemoryManager<TestTensor, 8>")
27//!           .from(0, 0).to(1, 0).policy("Q_32_POLICY").name("e0"))
28//!     .edge(Edge::new(1).ty("Q32").payload("TestTensor").manager("StaticMemoryManager<TestTensor, 8>")
29//!           .from(1, 0).to(2, 0).policy("Q_32_POLICY").name("e1"))
30//!     .concurrent(false)          // ← no ScopedGraphApi emitted
31//!     .finish()
32//!
33//! // Invocation 2 — std graph (concurrent runtime, ScopedGraphApi):
34//! GraphBuilder::new()
35//!     // ... same nodes ...
36//!     .edge(Edge::new(0).ty("ConcurrentEdge").payload("TestTensor").manager("ConcurrentMemoryManager<TestTensor>")
37//!           .from(0, 0).to(1, 0).policy("Q_32_POLICY").name("e0"))
38//!     .edge(Edge::new(1).ty("ConcurrentEdge").payload("TestTensor").manager("ConcurrentMemoryManager<TestTensor>")
39//!           .from(1, 0).to(2, 0).policy("Q_32_POLICY").name("e1"))
40//!     .concurrent(true)           // ← emits ScopedGraphApi + run_scoped impl
41//!     .finish()
42//! ```
43//!
44//! When using the proc-macro (`limen-build`), the second invocation is written
45//! with the `concurrent;` keyword in the DSL block. The edge types must be
46//! changed to `ConcurrentEdge`/`ConcurrentMemoryManager` manually (or via a
47//! separate `define_graph!` block) since the DSL does not auto-promote queue
48//! types.
49//!
50//! ## Why two invocations instead of one?
51//!
52//! The graph struct is monomorphized over its concrete edge and manager types.
53//! A no-`std` graph uses `StaticRing<N>` / `StaticMemoryManager<P, N>`;
54//! a concurrent graph uses `ConcurrentEdge` / `ConcurrentMemoryManager<P>`.
55//! These are structurally different types — they cannot be unified in a single
56//! struct. The codegen therefore requires a dedicated invocation per target
57//! flavor, each producing a distinct named struct.
58
59use crate::{
60    edge::{Edge as _, EdgeOccupancy, NoQueue},
61    errors::{GraphError, NodeError},
62    graph::{GraphApi, GraphEdgeAccess, GraphNodeAccess, GraphNodeContextBuilder, GraphNodeTypes},
63    node::{
64        bench::{TestCounterSourceTensor, TestIdentityModelNodeTensor, TestSinkNodeTensor},
65        sink::SinkNode,
66        source::{Source as _, SourceNode, EXTERNAL_INGRESS_NODE},
67        Node as _, StepContext, StepResult,
68    },
69    policy::{AdmissionPolicy, EdgePolicy, NodePolicy, OverBudgetAction},
70    prelude::{
71        EdgeDescriptor, EdgeLink, NodeDescriptor, NodeLink, PlatformClock, StaticMemoryManager,
72        Telemetry, TestTensor,
73    },
74    types::{EdgeIndex, NodeIndex, PortId, PortIndex},
75};
76
77// Test edge types.
78type Q32 = crate::edge::bench::TestSpscRingBuf<32>;
79const Q_32_POLICY: EdgePolicy = EdgePolicy {
80    caps: crate::policy::QueueCaps {
81        max_items: 32,
82        soft_items: 32,
83        max_bytes: None,
84        soft_bytes: None,
85    },
86    over_budget: OverBudgetAction::Drop,
87    admission: AdmissionPolicy::DropOldest,
88};
89
90// Test memory manager type (one per real edge).
91type Mgr32 = StaticMemoryManager<TestTensor, 35>;
92
93// Test source node types.
94#[allow(type_alias_bounds)]
95type SrcNode<SrcClk: PlatformClock> =
96    SourceNode<TestCounterSourceTensor<SrcClk, 32>, TestTensor, 1>;
97const INGRESS_POLICY: EdgePolicy = Q_32_POLICY;
98
99// Test model node types.
100const TEST_MAX_BATCH: usize = 32;
101type MapNode = TestIdentityModelNodeTensor<TEST_MAX_BATCH>;
102
103// Test sink node types.
104type SnkNode = SinkNode<TestSinkNodeTensor, TestTensor, 1>;
105
106/// concrete graph implementation used for testing.
107#[allow(clippy::complexity)]
108pub struct TestPipeline<SrcClk: PlatformClock> {
109    /// Nodes held in the graph.
110    nodes: (
111        NodeLink<SrcNode<SrcClk>, 0, 1, (), TestTensor>,
112        NodeLink<MapNode, 1, 1, TestTensor, TestTensor>,
113        NodeLink<SnkNode, 1, 0, TestTensor, ()>,
114    ),
115    /// Edges held in the graph.
116    edges: (EdgeLink<Q32>, EdgeLink<Q32>),
117    /// Memory managers for all *real* edges in declaration order.
118    managers: (Mgr32, Mgr32),
119}
120
121impl<SrcClk: PlatformClock> TestPipeline<SrcClk> {
122    /// Returns a TestPipeline graph given the nodes and edges.
123    #[inline]
124    pub fn new(
125        node_0: impl Into<SrcNode<SrcClk>>,
126        node_1: MapNode,
127        node_2: impl Into<SnkNode>,
128        q_0: Q32,
129        q_1: Q32,
130        mgr_0: Mgr32,
131        mgr_1: Mgr32,
132    ) -> Self {
133        let node_0: SrcNode<SrcClk> = node_0.into();
134        let node_2: SnkNode = node_2.into();
135
136        let nodes = (
137            NodeLink::<SrcNode<SrcClk>, 0, 1, (), TestTensor>::new(
138                node_0,
139                NodeIndex::from(0usize),
140                Some("src"),
141            ),
142            NodeLink::<MapNode, 1, 1, TestTensor, TestTensor>::new(
143                node_1,
144                NodeIndex::from(1usize),
145                Some("map"),
146            ),
147            NodeLink::<SnkNode, 1, 0, TestTensor, ()>::new(
148                node_2,
149                NodeIndex::from(2usize),
150                Some("snk"),
151            ),
152        );
153
154        let edges = (
155            EdgeLink::<Q32>::new(
156                q_0,
157                EdgeIndex::from(1usize),
158                PortId::new(NodeIndex::from(0usize), PortIndex::from(0usize)),
159                PortId::new(NodeIndex::from(1usize), PortIndex::from(0usize)),
160                Q_32_POLICY,
161                Some("e0"),
162            ),
163            EdgeLink::<Q32>::new(
164                q_1,
165                EdgeIndex::from(2usize),
166                PortId::new(NodeIndex::from(1usize), PortIndex::from(0usize)),
167                PortId::new(NodeIndex::from(2usize), PortIndex::from(0usize)),
168                Q_32_POLICY,
169                Some("e1"),
170            ),
171        );
172
173        let managers = (mgr_0, mgr_1);
174
175        Self {
176            nodes,
177            edges,
178            managers,
179        }
180    }
181}
182
183// ===== GraphApi<3,3> =====
184impl<SrcClk: PlatformClock> GraphApi<3, 3> for TestPipeline<SrcClk> {
185    #[inline]
186    fn get_node_descriptors(&self) -> [NodeDescriptor; 3] {
187        [
188            self.nodes.0.descriptor(),
189            self.nodes.1.descriptor(),
190            self.nodes.2.descriptor(),
191        ]
192    }
193    #[inline]
194    fn get_edge_descriptors(&self) -> [EdgeDescriptor; 3] {
195        [
196            EdgeDescriptor::new(
197                EdgeIndex::from(0usize),
198                PortId::new(EXTERNAL_INGRESS_NODE, PortIndex::from(0usize)),
199                PortId::new(NodeIndex::from(0usize), PortIndex::from(0usize)),
200                Some("ingress0"),
201            ),
202            self.edges.0.descriptor(),
203            self.edges.1.descriptor(),
204        ]
205    }
206
207    #[inline]
208    fn get_node_policies(&self) -> [NodePolicy; 3] {
209        [
210            self.nodes.0.policy(),
211            self.nodes.1.policy(),
212            self.nodes.2.policy(),
213        ]
214    }
215
216    #[inline]
217    fn get_edge_policies(&self) -> [EdgePolicy; 3] {
218        [
219            INGRESS_POLICY,
220            *self.edges.0.policy(),
221            *self.edges.1.policy(),
222        ]
223    }
224
225    #[inline]
226    fn edge_occupancy_for<const E: usize>(&self) -> Result<EdgeOccupancy, GraphError> {
227        let occ = match E {
228            0 => {
229                let src = self.nodes.0.node().source_ref();
230                src.ingress_occupancy()
231            }
232            1 => {
233                let e = &self.edges.0;
234                e.occupancy(e.policy())
235            }
236            2 => {
237                let e = &self.edges.1;
238                e.occupancy(e.policy())
239            }
240            _ => return Err(GraphError::InvalidEdgeIndex), // use your variant
241        };
242        Ok(occ)
243    }
244
245    #[inline]
246    fn write_all_edge_occupancies(&self, out: &mut [EdgeOccupancy; 3]) -> Result<(), GraphError> {
247        out[0] = self.edge_occupancy_for::<0>()?;
248        out[1] = self.edge_occupancy_for::<1>()?;
249        out[2] = self.edge_occupancy_for::<2>()?;
250        Ok(())
251    }
252
253    #[inline]
254    fn refresh_occupancies_for_node<const I: usize, const IN: usize, const OUT: usize>(
255        &self,
256        out: &mut [EdgeOccupancy; 3],
257    ) -> Result<(), GraphError> {
258        let node_idx = NodeIndex::from(I);
259        // Iterate *all* edges; update those where this node is upstream OR downstream.
260        for ed in self.get_edge_descriptors().iter() {
261            if ed.upstream().node() == &node_idx || ed.downstream().node() == &node_idx {
262                let ei = (ed.id()).as_usize();
263                match ei {
264                    0 => {
265                        out[0] = self.edge_occupancy_for::<0>()?;
266                    }
267                    1 => {
268                        out[1] = self.edge_occupancy_for::<1>()?;
269                    }
270                    2 => {
271                        out[2] = self.edge_occupancy_for::<2>()?;
272                    }
273                    _ => return Err(GraphError::InvalidEdgeIndex),
274                }
275            }
276        }
277        Ok(())
278    }
279
280    #[inline]
281    fn step_node_by_index<C, T>(
282        &mut self,
283        index: usize,
284        clock: &C,
285        telemetry: &mut T,
286    ) -> Result<StepResult, NodeError>
287    where
288        EdgePolicy: Copy,
289        C: PlatformClock + Sized,
290        T: Telemetry + Sized,
291    {
292        match index {
293            0 => <Self as GraphNodeContextBuilder<0, 0, 1>>::with_node_and_step_context::<
294                C,
295                T,
296                StepResult,
297                NodeError,
298            >(self, clock, telemetry, |node, ctx| node.step(ctx)),
299
300            1 => <Self as GraphNodeContextBuilder<1, 1, 1>>::with_node_and_step_context::<
301                C,
302                T,
303                StepResult,
304                NodeError,
305            >(self, clock, telemetry, |node, ctx| node.step(ctx)),
306
307            2 => <Self as GraphNodeContextBuilder<2, 1, 0>>::with_node_and_step_context::<
308                C,
309                T,
310                StepResult,
311                NodeError,
312            >(self, clock, telemetry, |node, ctx| node.step(ctx)),
313
314            _ => unreachable!("invalid node index"),
315        }
316    }
317}
318
319// ===== GraphNodeAccess<I> =====
320impl<SrcClk: PlatformClock> GraphNodeAccess<0> for TestPipeline<SrcClk> {
321    type Node = NodeLink<SrcNode<SrcClk>, 0, 1, (), TestTensor>;
322    #[inline]
323    fn node_ref(&self) -> &Self::Node {
324        &self.nodes.0
325    }
326    #[inline]
327    fn node_mut(&mut self) -> &mut Self::Node {
328        &mut self.nodes.0
329    }
330}
331impl<SrcClk: PlatformClock> GraphNodeAccess<1> for TestPipeline<SrcClk> {
332    type Node = NodeLink<MapNode, 1, 1, TestTensor, TestTensor>;
333    #[inline]
334    fn node_ref(&self) -> &Self::Node {
335        &self.nodes.1
336    }
337    #[inline]
338    fn node_mut(&mut self) -> &mut Self::Node {
339        &mut self.nodes.1
340    }
341}
342impl<SrcClk: PlatformClock> GraphNodeAccess<2> for TestPipeline<SrcClk> {
343    type Node = NodeLink<SnkNode, 1, 0, TestTensor, ()>;
344    #[inline]
345    fn node_ref(&self) -> &Self::Node {
346        &self.nodes.2
347    }
348    #[inline]
349    fn node_mut(&mut self) -> &mut Self::Node {
350        &mut self.nodes.2
351    }
352}
353
354// ===== GraphEdgeAccess<E> =====
355impl<SrcClk: PlatformClock> GraphEdgeAccess<1> for TestPipeline<SrcClk> {
356    type Edge = EdgeLink<Q32>;
357    #[inline]
358    fn edge_ref(&self) -> &Self::Edge {
359        &self.edges.0
360    }
361    #[inline]
362    fn edge_mut(&mut self) -> &mut Self::Edge {
363        &mut self.edges.0
364    }
365}
366impl<SrcClk: PlatformClock> GraphEdgeAccess<2> for TestPipeline<SrcClk> {
367    type Edge = EdgeLink<Q32>;
368    #[inline]
369    fn edge_ref(&self) -> &Self::Edge {
370        &self.edges.1
371    }
372    #[inline]
373    fn edge_mut(&mut self) -> &mut Self::Edge {
374        &mut self.edges.1
375    }
376}
377
378// ===== GraphNodeTypes<I, IN, OUT> =====
379// node 0: IN=0, OUT=1
380impl<SrcClk: PlatformClock> GraphNodeTypes<0, 0, 1> for TestPipeline<SrcClk> {
381    type InP = ();
382    type OutP = TestTensor;
383    type InQ = NoQueue;
384    type OutQ = Q32;
385    type InM = StaticMemoryManager<(), 1>;
386    type OutM = Mgr32;
387}
388// node 1: IN=1, OUT=1
389impl<SrcClk: PlatformClock> GraphNodeTypes<1, 1, 1> for TestPipeline<SrcClk> {
390    type InP = TestTensor;
391    type OutP = TestTensor;
392    type InQ = Q32;
393    type OutQ = Q32;
394    type InM = Mgr32;
395    type OutM = Mgr32;
396}
397// node 2: IN=1, OUT=0
398impl<SrcClk: PlatformClock> GraphNodeTypes<2, 1, 0> for TestPipeline<SrcClk> {
399    type InP = TestTensor;
400    type OutP = ();
401    type InQ = Q32;
402    type OutQ = NoQueue;
403    type InM = Mgr32;
404    type OutM = StaticMemoryManager<(), 1>;
405}
406
407// ===== GraphNodeContextBuilder<I, IN, OUT> =====
408// node 0: in=[], out=[edge id 1]
409impl<SrcClk: PlatformClock> GraphNodeContextBuilder<0, 0, 1> for TestPipeline<SrcClk>
410where
411    Self: GraphNodeAccess<0, Node = NodeLink<SrcNode<SrcClk>, 0, 1, (), TestTensor>>,
412{
413    #[inline]
414    fn make_step_context<'graph, 'telemetry, 'clock, C, T>(
415        &'graph mut self,
416        clock: &'clock C,
417        telemetry: &'telemetry mut T,
418    ) -> StepContext<
419        'graph,
420        'telemetry,
421        'clock,
422        0,
423        1,
424        <Self as GraphNodeTypes<0, 0, 1>>::InP,
425        <Self as GraphNodeTypes<0, 0, 1>>::OutP,
426        <Self as GraphNodeTypes<0, 0, 1>>::InQ,
427        <Self as GraphNodeTypes<0, 0, 1>>::OutQ,
428        <Self as GraphNodeTypes<0, 0, 1>>::InM,
429        <Self as GraphNodeTypes<0, 0, 1>>::OutM,
430        C,
431        T,
432    >
433    where
434        EdgePolicy: Copy,
435        C: PlatformClock + Sized,
436        T: Telemetry + Sized,
437    {
438        let out0_policy = *self.edges.0.policy();
439
440        let inputs: [&mut <Self as GraphNodeTypes<0, 0, 1>>::InQ; 0] = [/* empty */];
441        let outputs: [&mut <Self as GraphNodeTypes<0, 0, 1>>::OutQ; 1] = [self.edges.0.queue_mut()];
442
443        let in_managers: [&'graph mut <Self as GraphNodeTypes<0, 0, 1>>::InM; 0] = [];
444        let out_managers: [&'graph mut <Self as GraphNodeTypes<0, 0, 1>>::OutM; 1] =
445            [&mut self.managers.0];
446
447        let in_policies: [EdgePolicy; 0] = [/* empty */];
448        let out_policies: [EdgePolicy; 1] = [out0_policy];
449
450        let node_id: u32 = 0;
451        let in_edge_ids: [u32; 0] = [/* empty */];
452        let out_edge_ids: [u32; 1] = [1];
453
454        StepContext::<
455            'graph,
456            'telemetry,
457            'clock,
458            0,
459            1,
460            <Self as GraphNodeTypes<0, 0, 1>>::InP,
461            <Self as GraphNodeTypes<0, 0, 1>>::OutP,
462            <Self as GraphNodeTypes<0, 0, 1>>::InQ,
463            <Self as GraphNodeTypes<0, 0, 1>>::OutQ,
464            <Self as GraphNodeTypes<0, 0, 1>>::InM,
465            <Self as GraphNodeTypes<0, 0, 1>>::OutM,
466            C,
467            T,
468        >::new(
469            inputs,
470            outputs,
471            in_managers,
472            out_managers,
473            in_policies,
474            out_policies,
475            node_id,
476            in_edge_ids,
477            out_edge_ids,
478            clock,
479            telemetry,
480        )
481    }
482
483    #[inline]
484    fn with_node_and_step_context<'telemetry, 'clock, C, T, R, E>(
485        &mut self,
486        clock: &'clock C,
487        telemetry: &'telemetry mut T,
488        f: impl FnOnce(
489            &mut <Self as GraphNodeAccess<0>>::Node,
490            &mut StepContext<
491                '_,
492                'telemetry,
493                'clock,
494                0,
495                1,
496                <Self as GraphNodeTypes<0, 0, 1>>::InP,
497                <Self as GraphNodeTypes<0, 0, 1>>::OutP,
498                <Self as GraphNodeTypes<0, 0, 1>>::InQ,
499                <Self as GraphNodeTypes<0, 0, 1>>::OutQ,
500                <Self as GraphNodeTypes<0, 0, 1>>::InM,
501                <Self as GraphNodeTypes<0, 0, 1>>::OutM,
502                C,
503                T,
504            >,
505        ) -> Result<R, E>,
506    ) -> Result<R, E>
507    where
508        Self: GraphNodeAccess<0>,
509        EdgePolicy: Copy,
510        C: PlatformClock + Sized,
511        T: Telemetry + Sized,
512    {
513        let node = &mut self.nodes.0;
514
515        let out0_policy = *self.edges.0.policy();
516
517        let inputs: [&mut <Self as GraphNodeTypes<0, 0, 1>>::InQ; 0] = [/* empty */];
518        let outputs: [&mut <Self as GraphNodeTypes<0, 0, 1>>::OutQ; 1] = [self.edges.0.queue_mut()];
519
520        let in_managers: [&mut <Self as GraphNodeTypes<0, 0, 1>>::InM; 0] = [/* empty */];
521        let out_managers: [&mut <Self as GraphNodeTypes<0, 0, 1>>::OutM; 1] =
522            [&mut self.managers.0];
523
524        let in_policies: [EdgePolicy; 0] = [/* empty */];
525        let out_policies: [EdgePolicy; 1] = [out0_policy];
526
527        let node_id: u32 = 0;
528        let in_edge_ids: [u32; 0] = [/* empty */];
529        let out_edge_ids: [u32; 1] = [1];
530
531        let mut ctx = StepContext::new(
532            inputs,
533            outputs,
534            in_managers,
535            out_managers,
536            in_policies,
537            out_policies,
538            node_id,
539            in_edge_ids,
540            out_edge_ids,
541            clock,
542            telemetry,
543        );
544        f(node, &mut ctx)
545    }
546}
547
548// node 1: in=[edge id 1], out=[edge id 2]
549impl<SrcClk: PlatformClock> GraphNodeContextBuilder<1, 1, 1> for TestPipeline<SrcClk>
550where
551    Self: GraphNodeAccess<1, Node = NodeLink<MapNode, 1, 1, TestTensor, TestTensor>>,
552{
553    #[inline]
554    fn make_step_context<'graph, 'telemetry, 'clock, C, T>(
555        &'graph mut self,
556        clock: &'clock C,
557        telemetry: &'telemetry mut T,
558    ) -> StepContext<
559        'graph,
560        'telemetry,
561        'clock,
562        1,
563        1,
564        <Self as GraphNodeTypes<1, 1, 1>>::InP,
565        <Self as GraphNodeTypes<1, 1, 1>>::OutP,
566        <Self as GraphNodeTypes<1, 1, 1>>::InQ,
567        <Self as GraphNodeTypes<1, 1, 1>>::OutQ,
568        <Self as GraphNodeTypes<1, 1, 1>>::InM,
569        <Self as GraphNodeTypes<1, 1, 1>>::OutM,
570        C,
571        T,
572    >
573    where
574        EdgePolicy: Copy,
575        C: PlatformClock + Sized,
576        T: Telemetry + Sized,
577    {
578        let in0_policy = *self.edges.0.policy();
579        let out1_policy = *self.edges.1.policy();
580
581        let inputs: [&mut <Self as GraphNodeTypes<1, 1, 1>>::InQ; 1] = [self.edges.0.queue_mut()];
582        let outputs: [&mut <Self as GraphNodeTypes<1, 1, 1>>::OutQ; 1] = [self.edges.1.queue_mut()];
583
584        let in_managers: [&'graph mut <Self as GraphNodeTypes<1, 1, 1>>::InM; 1] =
585            [&mut self.managers.0];
586        let out_managers: [&'graph mut <Self as GraphNodeTypes<1, 1, 1>>::OutM; 1] =
587            [&mut self.managers.1];
588
589        let in_policies: [EdgePolicy; 1] = [in0_policy];
590        let out_policies: [EdgePolicy; 1] = [out1_policy];
591
592        let node_id: u32 = 1;
593        let in_edge_ids: [u32; 1] = [1];
594        let out_edge_ids: [u32; 1] = [2];
595
596        StepContext::<
597            'graph,
598            'telemetry,
599            'clock,
600            1,
601            1,
602            <Self as GraphNodeTypes<1, 1, 1>>::InP,
603            <Self as GraphNodeTypes<1, 1, 1>>::OutP,
604            <Self as GraphNodeTypes<1, 1, 1>>::InQ,
605            <Self as GraphNodeTypes<1, 1, 1>>::OutQ,
606            <Self as GraphNodeTypes<1, 1, 1>>::InM,
607            <Self as GraphNodeTypes<1, 1, 1>>::OutM,
608            C,
609            T,
610        >::new(
611            inputs,
612            outputs,
613            in_managers,
614            out_managers,
615            in_policies,
616            out_policies,
617            node_id,
618            in_edge_ids,
619            out_edge_ids,
620            clock,
621            telemetry,
622        )
623    }
624
625    #[inline]
626    fn with_node_and_step_context<'telemetry, 'clock, C, T, R, E>(
627        &mut self,
628        clock: &'clock C,
629        telemetry: &'telemetry mut T,
630        f: impl FnOnce(
631            &mut <Self as GraphNodeAccess<1>>::Node,
632            &mut StepContext<
633                '_,
634                'telemetry,
635                'clock,
636                1,
637                1,
638                <Self as GraphNodeTypes<1, 1, 1>>::InP,
639                <Self as GraphNodeTypes<1, 1, 1>>::OutP,
640                <Self as GraphNodeTypes<1, 1, 1>>::InQ,
641                <Self as GraphNodeTypes<1, 1, 1>>::OutQ,
642                <Self as GraphNodeTypes<1, 1, 1>>::InM,
643                <Self as GraphNodeTypes<1, 1, 1>>::OutM,
644                C,
645                T,
646            >,
647        ) -> Result<R, E>,
648    ) -> Result<R, E>
649    where
650        Self: GraphNodeAccess<1>,
651        EdgePolicy: Copy,
652        C: PlatformClock + Sized,
653        T: Telemetry + Sized,
654    {
655        let node = &mut self.nodes.1;
656
657        let in0_policy = *self.edges.0.policy();
658        let out1_policy = *self.edges.1.policy();
659
660        let inputs: [&mut <Self as GraphNodeTypes<1, 1, 1>>::InQ; 1] = [self.edges.0.queue_mut()];
661        let outputs: [&mut <Self as GraphNodeTypes<1, 1, 1>>::OutQ; 1] = [self.edges.1.queue_mut()];
662
663        let in_managers: [&mut <Self as GraphNodeTypes<1, 1, 1>>::InM; 1] = [&mut self.managers.0];
664        let out_managers: [&mut <Self as GraphNodeTypes<1, 1, 1>>::OutM; 1] =
665            [&mut self.managers.1];
666
667        let in_policies: [EdgePolicy; 1] = [in0_policy];
668        let out_policies: [EdgePolicy; 1] = [out1_policy];
669
670        let node_id: u32 = 1;
671        let in_edge_ids: [u32; 1] = [1];
672        let out_edge_ids: [u32; 1] = [2];
673
674        let mut ctx = StepContext::new(
675            inputs,
676            outputs,
677            in_managers,
678            out_managers,
679            in_policies,
680            out_policies,
681            node_id,
682            in_edge_ids,
683            out_edge_ids,
684            clock,
685            telemetry,
686        );
687        f(node, &mut ctx)
688    }
689}
690
691// node 2: in=[edge id 2], out=[]
692impl<SrcClk: PlatformClock> GraphNodeContextBuilder<2, 1, 0> for TestPipeline<SrcClk>
693where
694    Self: GraphNodeAccess<2, Node = NodeLink<SnkNode, 1, 0, TestTensor, ()>>,
695{
696    #[inline]
697    fn make_step_context<'graph, 'telemetry, 'clock, C, T>(
698        &'graph mut self,
699        clock: &'clock C,
700        telemetry: &'telemetry mut T,
701    ) -> StepContext<
702        'graph,
703        'telemetry,
704        'clock,
705        1,
706        0,
707        <Self as GraphNodeTypes<2, 1, 0>>::InP,
708        <Self as GraphNodeTypes<2, 1, 0>>::OutP,
709        <Self as GraphNodeTypes<2, 1, 0>>::InQ,
710        <Self as GraphNodeTypes<2, 1, 0>>::OutQ,
711        <Self as GraphNodeTypes<2, 1, 0>>::InM,
712        <Self as GraphNodeTypes<2, 1, 0>>::OutM,
713        C,
714        T,
715    >
716    where
717        EdgePolicy: Copy,
718        C: PlatformClock + Sized,
719        T: Telemetry + Sized,
720    {
721        let in1_policy = *self.edges.1.policy();
722
723        let inputs: [&mut <Self as GraphNodeTypes<2, 1, 0>>::InQ; 1] = [self.edges.1.queue_mut()];
724        let outputs: [&mut <Self as GraphNodeTypes<2, 1, 0>>::OutQ; 0] = [/* empty */];
725
726        let in_managers: [&'graph mut <Self as GraphNodeTypes<2, 1, 0>>::InM; 1] =
727            [&mut self.managers.1];
728        let out_managers: [&'graph mut <Self as GraphNodeTypes<2, 1, 0>>::OutM; 0] = [/* empty */];
729
730        let in_policies: [EdgePolicy; 1] = [in1_policy];
731        let out_policies: [EdgePolicy; 0] = [/* empty */];
732
733        let node_id: u32 = 2;
734        let in_edge_ids: [u32; 1] = [2];
735        let out_edge_ids: [u32; 0] = [/* empty */];
736
737        StepContext::<
738            'graph,
739            'telemetry,
740            'clock,
741            1,
742            0,
743            <Self as GraphNodeTypes<2, 1, 0>>::InP,
744            <Self as GraphNodeTypes<2, 1, 0>>::OutP,
745            <Self as GraphNodeTypes<2, 1, 0>>::InQ,
746            <Self as GraphNodeTypes<2, 1, 0>>::OutQ,
747            <Self as GraphNodeTypes<2, 1, 0>>::InM,
748            <Self as GraphNodeTypes<2, 1, 0>>::OutM,
749            C,
750            T,
751        >::new(
752            inputs,
753            outputs,
754            in_managers,
755            out_managers,
756            in_policies,
757            out_policies,
758            node_id,
759            in_edge_ids,
760            out_edge_ids,
761            clock,
762            telemetry,
763        )
764    }
765
766    #[inline]
767    fn with_node_and_step_context<'telemetry, 'clock, C, T, R, E>(
768        &mut self,
769        clock: &'clock C,
770        telemetry: &'telemetry mut T,
771        f: impl FnOnce(
772            &mut <Self as GraphNodeAccess<2>>::Node,
773            &mut StepContext<
774                '_,
775                'telemetry,
776                'clock,
777                1,
778                0,
779                <Self as GraphNodeTypes<2, 1, 0>>::InP,
780                <Self as GraphNodeTypes<2, 1, 0>>::OutP,
781                <Self as GraphNodeTypes<2, 1, 0>>::InQ,
782                <Self as GraphNodeTypes<2, 1, 0>>::OutQ,
783                <Self as GraphNodeTypes<2, 1, 0>>::InM,
784                <Self as GraphNodeTypes<2, 1, 0>>::OutM,
785                C,
786                T,
787            >,
788        ) -> Result<R, E>,
789    ) -> Result<R, E>
790    where
791        Self: GraphNodeAccess<2>,
792        EdgePolicy: Copy,
793        C: PlatformClock + Sized,
794        T: Telemetry + Sized,
795    {
796        let node = &mut self.nodes.2;
797
798        let in1_policy = *self.edges.1.policy();
799
800        let inputs: [&mut <Self as GraphNodeTypes<2, 1, 0>>::InQ; 1] = [self.edges.1.queue_mut()];
801        let outputs: [&mut <Self as GraphNodeTypes<2, 1, 0>>::OutQ; 0] = [/* empty */];
802
803        let in_managers: [&mut <Self as GraphNodeTypes<2, 1, 0>>::InM; 1] = [&mut self.managers.1];
804        let out_managers: [&mut <Self as GraphNodeTypes<2, 1, 0>>::OutM; 0] = [/* empty */];
805
806        let in_policies: [EdgePolicy; 1] = [in1_policy];
807        let out_policies: [EdgePolicy; 0] = [/* empty */];
808
809        let node_id: u32 = 2;
810        let in_edge_ids: [u32; 1] = [2];
811        let out_edge_ids: [u32; 0] = [/* empty */];
812
813        let mut ctx = StepContext::new(
814            inputs,
815            outputs,
816            in_managers,
817            out_managers,
818            in_policies,
819            out_policies,
820            node_id,
821            in_edge_ids,
822            out_edge_ids,
823            clock,
824            telemetry,
825        );
826        f(node, &mut ctx)
827    }
828}
829
830/// Concurrent (std) graph implementation — the output of a second, separate
831/// codegen invocation with `.concurrent(true)` (builder API) or `concurrent;`
832/// (DSL).
833///
834/// This module hand-mirrors the code that `limen-codegen` emits when
835/// `emit_concurrent = true`. It serves as a reference implementation and a
836/// regression guard: if this module stops compiling or its tests fail, the
837/// corresponding codegen change is broken.
838///
839/// Key structural differences from `TestPipeline` (the first invocation):
840///
841/// - All real edges use `ConcurrentEdge` (Arc-backed, `Clone + Send + Sync`).
842/// - All managers use `ConcurrentMemoryManager<u32>`.
843/// - The struct has only three fields: `nodes`, `edges`, `managers`.
844///   Ingress edges are **not** stored — their occupancy and policies are
845///   obtained from the owning source node at runtime (same as codegen output).
846/// - A `ScopedGraphApi<3, 3>` impl is emitted with `where` bounds requiring
847///   `ConcurrentEdge: ScopedEdge` and `ConcurrentMemoryManager<u32>: ScopedManager<u32>`.
848/// - `run_scoped` uses `ScopedEdge::scoped_handle` / `ScopedManager::scoped_handle`
849///   instead of `.clone()`, enabling future lock-free, non-Clone edge types.
850#[cfg(feature = "std")]
851pub mod concurrent_graph {
852    use super::*;
853
854    use crate::{
855        edge::{spsc_concurrent::ConcurrentEdge, EdgeOccupancy, NoQueue},
856        errors::{GraphError, NodeError},
857        graph::{
858            GraphApi, GraphEdgeAccess, GraphNodeAccess, GraphNodeContextBuilder, GraphNodeTypes,
859            ScopedGraphApi,
860        },
861        node::{
862            bench::{TestCounterSourceTensor, TestIdentityModelNodeTensor},
863            source::SourceNode,
864            StepContext, StepResult,
865        },
866        policy::EdgePolicy,
867        prelude::{
868            ConcurrentMemoryManager, EdgeDescriptor, EdgeLink, NodeDescriptor, NodeLink,
869            PlatformClock, StaticMemoryManager, Telemetry, WorkerDecision, WorkerScheduler,
870        },
871        types::{EdgeIndex, NodeIndex, PortId, PortIndex},
872    };
873
874    type ConcMgr32 = ConcurrentMemoryManager<TestTensor>;
875
876    #[allow(type_alias_bounds)]
877    type SrcNode<SrcClk: PlatformClock + Send + 'static> =
878        SourceNode<TestCounterSourceTensor<SrcClk, 32>, TestTensor, 1>;
879
880    const TEST_MAX_BATCH: usize = 32;
881    type MapNode = TestIdentityModelNodeTensor<TEST_MAX_BATCH>;
882    type SnkNode = SinkNode<TestSinkNodeTensor, TestTensor, 1>;
883
884    /// Concrete std graph using `ConcurrentEdge` (Arc-backed, Clone+Send+Sync).
885    ///
886    /// Mirrors the output of a `concurrent(true)` codegen invocation.
887    /// Ingress edges are not stored: occupancy and policy are obtained from the
888    /// source node at runtime, matching codegen output exactly.
889    #[allow(clippy::complexity)]
890    pub struct TestPipelineStd<SrcClk: PlatformClock + Send + 'static> {
891        nodes: (
892            NodeLink<SrcNode<SrcClk>, 0, 1, (), TestTensor>,
893            NodeLink<MapNode, 1, 1, TestTensor, TestTensor>,
894            NodeLink<SnkNode, 1, 0, TestTensor, ()>,
895        ),
896        edges: (EdgeLink<ConcurrentEdge>, EdgeLink<ConcurrentEdge>),
897        managers: (ConcMgr32, ConcMgr32),
898    }
899
900    impl<SrcClk: PlatformClock + Send + 'static> TestPipelineStd<SrcClk> {
901        /// Construct a new concurrent graph instance.
902        ///
903        /// Mirrors the `new(..)` constructor emitted by codegen for a
904        /// `concurrent(true)` graph. Ingress probe edges are not created here —
905        /// ingress occupancy and policy are read directly from the source node.
906        #[inline]
907        pub fn new(
908            node_0: impl Into<SrcNode<SrcClk>>,
909            node_1: MapNode,
910            node_2: impl Into<SnkNode>,
911            q_1: ConcurrentEdge,
912            q_2: ConcurrentEdge,
913            mgr_1: ConcMgr32,
914            mgr_2: ConcMgr32,
915        ) -> Self {
916            let node_0: SrcNode<SrcClk> = node_0.into();
917            let node_2: SnkNode = node_2.into();
918
919            let n0 = NodeLink::<SrcNode<SrcClk>, 0, 1, (), TestTensor>::new(
920                node_0,
921                NodeIndex::from(0usize),
922                Some("src"),
923            );
924            let n1 = NodeLink::<MapNode, 1, 1, TestTensor, TestTensor>::new(
925                node_1,
926                NodeIndex::from(1usize),
927                Some("map"),
928            );
929            let n2 = NodeLink::<SnkNode, 1, 0, TestTensor, ()>::new(
930                node_2,
931                NodeIndex::from(2usize),
932                Some("snk"),
933            );
934
935            let e0 = EdgeLink::new(
936                q_1,
937                EdgeIndex::from(1usize),
938                PortId::new(NodeIndex::from(0usize), PortIndex::from(0)),
939                PortId::new(NodeIndex::from(1usize), PortIndex::from(0)),
940                Q_32_POLICY,
941                Some("e0"),
942            );
943            let e1 = EdgeLink::new(
944                q_2,
945                EdgeIndex::from(2usize),
946                PortId::new(NodeIndex::from(1usize), PortIndex::from(0)),
947                PortId::new(NodeIndex::from(2usize), PortIndex::from(0)),
948                Q_32_POLICY,
949                Some("e1"),
950            );
951
952            Self {
953                nodes: (n0, n1, n2),
954                edges: (e0, e1),
955                managers: (mgr_1, mgr_2),
956            }
957        }
958
959        /// Execute all nodes concurrently via `std::thread::scope`.
960        ///
961        /// This method mirrors the body of `run_scoped` as emitted by codegen
962        /// when `emit_concurrent = true`. Structure:
963        ///
964        /// - Step 1: edge policy copies (before node borrows; `EdgePolicy: Copy`)
965        /// - Step 2: per-node scoped edge + manager handles via `ScopedEdge` /
966        ///   `ScopedManager` — future lock-free non-Clone types work transparently
967        /// - Step 3: per-worker telemetry clones
968        /// - Step 4: disjoint node borrows (Rust tracks tuple fields separately)
969        /// - Step 5: one scoped thread per node, scheduler-driven
970        fn run_scoped_impl<C, T, S>(&mut self, clock: C, telemetry: T, scheduler: S)
971        where
972            C: PlatformClock + Clone + Send + Sync + 'static,
973            T: Telemetry + Clone + Send + 'static,
974            S: WorkerScheduler + 'static,
975            SrcNode<SrcClk>: Send,
976            MapNode: Send,
977            SnkNode: Send,
978        {
979            // --- Step 1: edge policy copies ---
980            let pol_0 = *self.edges.0.policy();
981            let pol_1 = *self.edges.1.policy();
982
983            // --- Step 2: per-node scoped edge handles + manager handles ---
984            // Uses ScopedEdge / ScopedManager instead of Clone so future
985            // lock-free, non-Clone edge and manager types work transparently.
986            //
987            // node 0 (source): no inputs, 1 output (real edge 0)
988            let out_e_0_0 = crate::edge::ScopedEdge::scoped_handle(
989                self.edges.0.queue(),
990                crate::edge::EdgeHandleKind::Producer,
991            );
992            let out_m_0_0 = crate::memory::manager::ScopedManager::scoped_handle(&self.managers.0);
993            // node 1 (model): 1 input (real edge 0), 1 output (real edge 1)
994            let in_e_1_0 = crate::edge::ScopedEdge::scoped_handle(
995                self.edges.0.queue(),
996                crate::edge::EdgeHandleKind::Consumer,
997            );
998            let in_m_1_0 = crate::memory::manager::ScopedManager::scoped_handle(&self.managers.0);
999            let out_e_1_0 = crate::edge::ScopedEdge::scoped_handle(
1000                self.edges.1.queue(),
1001                crate::edge::EdgeHandleKind::Producer,
1002            );
1003            let out_m_1_0 = crate::memory::manager::ScopedManager::scoped_handle(&self.managers.1);
1004            // node 2 (sink): 1 input (real edge 1), no outputs
1005            let in_e_2_0 = crate::edge::ScopedEdge::scoped_handle(
1006                self.edges.1.queue(),
1007                crate::edge::EdgeHandleKind::Consumer,
1008            );
1009            let in_m_2_0 = crate::memory::manager::ScopedManager::scoped_handle(&self.managers.1);
1010
1011            // --- Step 3: per-worker telemetry ---
1012            let telem_0 = telemetry.clone();
1013            let telem_1 = telemetry.clone();
1014            let telem_2 = telemetry;
1015
1016            // --- Step 4: disjoint node borrows ---
1017            let n0 = &mut self.nodes.0;
1018            let n1 = &mut self.nodes.1;
1019            let n2 = &mut self.nodes.2;
1020
1021            let clock_ref = &clock;
1022            let sched_ref = &scheduler;
1023
1024            ::std::thread::scope(|scope| {
1025                // --- Node 0: source (batch-aware readiness from ingress) ---
1026                {
1027                    fn _assert_send<_T: core::marker::Send>() {}
1028                    _assert_send::<SrcNode<SrcClk>>();
1029                }
1030                scope.spawn(move || {
1031                    let mut out_e_0_0 = out_e_0_0;
1032                    let mut out_m_0_0 = out_m_0_0;
1033                    let mut telem = telem_0;
1034
1035                    let mut state =
1036                        crate::scheduling::WorkerState::new(0, 3, clock_ref.now_ticks());
1037                    loop {
1038                        state.current_tick = clock_ref.now_ticks();
1039
1040                        // Determine whether the ingress (external) edge can form a
1041                        // batch now according to the source's ingress policy.
1042                        let src = n0.node();
1043                        let any_ingress_has_batch = src.ingress_edge_has_batch();
1044
1045                        let mut _max_wm = crate::policy::WatermarkState::BelowSoft;
1046                        {
1047                            let _occ = crate::edge::Edge::occupancy(&out_e_0_0, &pol_0);
1048                            if *_occ.watermark() > _max_wm {
1049                                _max_wm = *_occ.watermark();
1050                            }
1051                        }
1052                        state.backpressure = _max_wm;
1053
1054                        state.readiness = if any_ingress_has_batch {
1055                            if _max_wm >= crate::policy::WatermarkState::BetweenSoftAndHard {
1056                                crate::scheduling::Readiness::ReadyUnderPressure
1057                            } else {
1058                                crate::scheduling::Readiness::Ready
1059                            }
1060                        } else {
1061                            crate::scheduling::Readiness::NotReady
1062                        };
1063
1064                        match sched_ref.decide(&state) {
1065                            WorkerDecision::Step => {
1066                                let mut ctx = crate::node::StepContext::new(
1067                                    [] as [&mut NoQueue; 0],
1068                                    [&mut out_e_0_0],
1069                                    [] as [&mut StaticMemoryManager<(), 1>; 0],
1070                                    [&mut out_m_0_0],
1071                                    [],
1072                                    [pol_0],
1073                                    0u32,
1074                                    [],
1075                                    [1u32],
1076                                    clock_ref,
1077                                    &mut telem,
1078                                );
1079                                match n0.step(&mut ctx) {
1080                                    Ok(sr) => {
1081                                        state.last_step = Some(sr);
1082                                        state.last_error = false;
1083                                    }
1084                                    Err(_e) => {
1085                                        state.last_step = None;
1086                                        state.last_error = true;
1087                                    }
1088                                }
1089                            }
1090                            WorkerDecision::WaitMicros(d) => {
1091                                ::std::thread::sleep(::std::time::Duration::from_micros(d));
1092                                state.last_step = None;
1093                                state.last_error = false;
1094                            }
1095                            WorkerDecision::Stop => break,
1096                        }
1097                    }
1098                });
1099
1100                // --- Node 1: model (batch-aware readiness from input edge 0) ---
1101                {
1102                    fn _assert_send<_T: core::marker::Send>() {}
1103                    _assert_send::<MapNode>();
1104                }
1105                scope.spawn(move || {
1106                    let mut in_e_1_0 = in_e_1_0;
1107                    let mut in_m_1_0 = in_m_1_0;
1108                    let mut out_e_1_0 = out_e_1_0;
1109                    let mut out_m_1_0 = out_m_1_0;
1110                    let mut telem = telem_1;
1111
1112                    let mut state =
1113                        crate::scheduling::WorkerState::new(1, 3, clock_ref.now_ticks());
1114                    loop {
1115                        state.current_tick = clock_ref.now_ticks();
1116
1117                        // Compute max output backpressure
1118                        let mut _max_wm = crate::policy::WatermarkState::BelowSoft;
1119                        {
1120                            let _occ = crate::edge::Edge::occupancy(&out_e_1_0, &pol_1);
1121                            if *_occ.watermark() > _max_wm {
1122                                _max_wm = *_occ.watermark();
1123                            }
1124                        }
1125                        state.backpressure = _max_wm;
1126
1127                        // Cheap pre-check: do any input edges have items?
1128                        let mut any_input_has_items = false;
1129                        {
1130                            let _occ = crate::edge::Edge::occupancy(&in_e_1_0, &pol_0);
1131                            if *_occ.items() > 0 {
1132                                any_input_has_items = true;
1133                            }
1134                        }
1135
1136                        // Probe authoritative batch semantics only if we have items.
1137                        let mut any_input_has_batch = false;
1138                        if any_input_has_items {
1139                            let mut probe_ctx = crate::node::StepContext::new(
1140                                [&mut in_e_1_0],
1141                                [&mut out_e_1_0],
1142                                [&mut in_m_1_0],
1143                                [&mut out_m_1_0],
1144                                [pol_0],
1145                                [pol_1],
1146                                1u32,
1147                                [1u32],
1148                                [2u32],
1149                                clock_ref,
1150                                &mut telem,
1151                            );
1152                            let node_policy = n1.policy();
1153                            for port in 0..1 {
1154                                if probe_ctx.input_edge_has_batch(port, &node_policy) {
1155                                    any_input_has_batch = true;
1156                                    break;
1157                                }
1158                            }
1159                        }
1160
1161                        state.readiness = if any_input_has_batch {
1162                            if _max_wm >= crate::policy::WatermarkState::BetweenSoftAndHard {
1163                                crate::scheduling::Readiness::ReadyUnderPressure
1164                            } else {
1165                                crate::scheduling::Readiness::Ready
1166                            }
1167                        } else {
1168                            crate::scheduling::Readiness::NotReady
1169                        };
1170
1171                        match sched_ref.decide(&state) {
1172                            WorkerDecision::Step => {
1173                                // Construct real ctx for the actual step.
1174                                let mut ctx = crate::node::StepContext::new(
1175                                    [&mut in_e_1_0],
1176                                    [&mut out_e_1_0],
1177                                    [&mut in_m_1_0],
1178                                    [&mut out_m_1_0],
1179                                    [pol_0],
1180                                    [pol_1],
1181                                    1u32,
1182                                    [1u32],
1183                                    [2u32],
1184                                    clock_ref,
1185                                    &mut telem,
1186                                );
1187                                match n1.step(&mut ctx) {
1188                                    Ok(sr) => {
1189                                        state.last_step = Some(sr);
1190                                        state.last_error = false;
1191                                    }
1192                                    Err(e) => {
1193                                        ::std::eprintln!("run_scoped: node 1 step failed: {:?}", e);
1194                                        state.last_step = None;
1195                                        state.last_error = true;
1196                                    }
1197                                }
1198                            }
1199                            WorkerDecision::WaitMicros(d) => {
1200                                ::std::thread::sleep(::std::time::Duration::from_micros(d));
1201                                state.last_step = None;
1202                                state.last_error = false;
1203                            }
1204                            WorkerDecision::Stop => break,
1205                        }
1206                    }
1207                });
1208
1209                // --- Node 2: sink (batch-aware readiness from input edge 1) ---
1210                {
1211                    fn _assert_send<_T: core::marker::Send>() {}
1212                    _assert_send::<SnkNode>();
1213                }
1214                scope.spawn(move || {
1215                    let mut in_e_2_0 = in_e_2_0;
1216                    let mut in_m_2_0 = in_m_2_0;
1217                    let mut telem = telem_2;
1218
1219                    let mut state =
1220                        crate::scheduling::WorkerState::new(2, 3, clock_ref.now_ticks());
1221                    loop {
1222                        state.current_tick = clock_ref.now_ticks();
1223
1224                        // No outputs -> backpressure is BelowSoft
1225                        let mut _max_wm = crate::policy::WatermarkState::BelowSoft;
1226                        state.backpressure = _max_wm;
1227
1228                        // Cheap pre-check: do any input edges have items?
1229                        let mut any_input_has_items = false;
1230                        {
1231                            // No outputs -> backpressure is BelowSoft (already set).
1232                            let _occ = crate::edge::Edge::occupancy(&in_e_2_0, &pol_1);
1233                            if *_occ.items() > 0 {
1234                                any_input_has_items = true;
1235                            }
1236                        }
1237
1238                        // Probe authoritative batch semantics only if we have items.
1239                        let mut any_input_has_batch = false;
1240                        if any_input_has_items {
1241                            let mut probe_ctx = crate::node::StepContext::new(
1242                                [&mut in_e_2_0],
1243                                [] as [&mut NoQueue; 0],
1244                                [&mut in_m_2_0],
1245                                [] as [&mut StaticMemoryManager<(), 1>; 0],
1246                                [pol_1],
1247                                [],
1248                                2u32,
1249                                [2u32],
1250                                [],
1251                                clock_ref,
1252                                &mut telem,
1253                            );
1254                            let node_policy = n2.policy();
1255                            for port in 0..1 {
1256                                if probe_ctx.input_edge_has_batch(port, &node_policy) {
1257                                    any_input_has_batch = true;
1258                                    break;
1259                                }
1260                            }
1261                        }
1262
1263                        state.readiness = if any_input_has_batch {
1264                            if _max_wm >= crate::policy::WatermarkState::BetweenSoftAndHard {
1265                                crate::scheduling::Readiness::ReadyUnderPressure
1266                            } else {
1267                                crate::scheduling::Readiness::Ready
1268                            }
1269                        } else {
1270                            crate::scheduling::Readiness::NotReady
1271                        };
1272
1273                        match sched_ref.decide(&state) {
1274                            WorkerDecision::Step => {
1275                                let mut ctx = crate::node::StepContext::new(
1276                                    [&mut in_e_2_0],
1277                                    [] as [&mut NoQueue; 0],
1278                                    [&mut in_m_2_0],
1279                                    [] as [&mut StaticMemoryManager<(), 1>; 0],
1280                                    [pol_1],
1281                                    [],
1282                                    2u32,
1283                                    [2u32],
1284                                    [],
1285                                    clock_ref,
1286                                    &mut telem,
1287                                );
1288                                match n2.step(&mut ctx) {
1289                                    Ok(sr) => {
1290                                        state.last_step = Some(sr);
1291                                        state.last_error = false;
1292                                    }
1293                                    Err(e) => {
1294                                        ::std::eprintln!("run_scoped: node 2 step failed: {:?}", e);
1295                                        state.last_step = None;
1296                                        state.last_error = true;
1297                                    }
1298                                }
1299                            }
1300                            WorkerDecision::WaitMicros(d) => {
1301                                ::std::thread::sleep(::std::time::Duration::from_micros(d));
1302                                state.last_step = None;
1303                                state.last_error = false;
1304                            }
1305                            WorkerDecision::Stop => break,
1306                        }
1307                    }
1308                });
1309            });
1310        }
1311    }
1312
1313    // ===== ScopedGraphApi<3, 3> =====
1314    impl<SrcClk: PlatformClock + Clone + Send + Sync + 'static> ScopedGraphApi<3, 3>
1315        for TestPipelineStd<SrcClk>
1316    where
1317        SrcNode<SrcClk>: Send,
1318        ConcurrentEdge: crate::edge::ScopedEdge,
1319        ConcMgr32: crate::memory::manager::ScopedManager<TestTensor>,
1320    {
1321        fn run_scoped<C, T, S>(&mut self, clock: C, telemetry: T, scheduler: S)
1322        where
1323            C: PlatformClock + Clone + Send + Sync + 'static,
1324            T: Telemetry + Clone + Send + 'static,
1325            S: WorkerScheduler + 'static,
1326        {
1327            self.run_scoped_impl(clock, telemetry, scheduler)
1328        }
1329    }
1330
1331    // ===== GraphApi<3, 3> =====
1332    impl<SrcClk: PlatformClock + Send + 'static> GraphApi<3, 3> for TestPipelineStd<SrcClk> {
1333        #[inline]
1334        fn get_node_descriptors(&self) -> [NodeDescriptor; 3] {
1335            [
1336                self.nodes.0.descriptor(),
1337                self.nodes.1.descriptor(),
1338                self.nodes.2.descriptor(),
1339            ]
1340        }
1341
1342        #[inline]
1343        fn get_edge_descriptors(&self) -> [EdgeDescriptor; 3] {
1344            [
1345                EdgeDescriptor::new(
1346                    EdgeIndex::from(0usize),
1347                    PortId::new(EXTERNAL_INGRESS_NODE, PortIndex::from(0)),
1348                    PortId::new(NodeIndex::from(0usize), PortIndex::from(0)),
1349                    Some("ingress0"),
1350                ),
1351                self.edges.0.descriptor(),
1352                self.edges.1.descriptor(),
1353            ]
1354        }
1355
1356        #[inline]
1357        fn get_node_policies(&self) -> [NodePolicy; 3] {
1358            [
1359                self.nodes.0.policy(),
1360                self.nodes.1.policy(),
1361                self.nodes.2.policy(),
1362            ]
1363        }
1364
1365        #[inline]
1366        fn get_edge_policies(&self) -> [EdgePolicy; 3] {
1367            [
1368                self.nodes.0.node().source_ref().ingress_policy(),
1369                *self.edges.0.policy(),
1370                *self.edges.1.policy(),
1371            ]
1372        }
1373
1374        #[inline]
1375        fn edge_occupancy_for<const E: usize>(&self) -> Result<EdgeOccupancy, GraphError> {
1376            let occ = match E {
1377                0 => {
1378                    let src = self.nodes.0.node().source_ref();
1379                    src.ingress_occupancy()
1380                }
1381                1 => {
1382                    let e = &self.edges.0;
1383                    e.occupancy(e.policy())
1384                }
1385                2 => {
1386                    let e = &self.edges.1;
1387                    e.occupancy(e.policy())
1388                }
1389                _ => return Err(GraphError::InvalidEdgeIndex),
1390            };
1391            Ok(occ)
1392        }
1393
1394        #[inline]
1395        fn write_all_edge_occupancies(
1396            &self,
1397            out: &mut [EdgeOccupancy; 3],
1398        ) -> Result<(), GraphError> {
1399            out[0] = self.edge_occupancy_for::<0>()?;
1400            out[1] = self.edge_occupancy_for::<1>()?;
1401            out[2] = self.edge_occupancy_for::<2>()?;
1402            Ok(())
1403        }
1404
1405        #[inline]
1406        fn refresh_occupancies_for_node<const I: usize, const IN: usize, const OUT: usize>(
1407            &self,
1408            out: &mut [EdgeOccupancy; 3],
1409        ) -> Result<(), GraphError> {
1410            let node_idx = NodeIndex::from(I);
1411            for ed in self.get_edge_descriptors().iter() {
1412                if ed.upstream().node() == &node_idx || ed.downstream().node() == &node_idx {
1413                    match ed.id().as_usize() {
1414                        0 => out[0] = self.edge_occupancy_for::<0>()?,
1415                        1 => out[1] = self.edge_occupancy_for::<1>()?,
1416                        2 => out[2] = self.edge_occupancy_for::<2>()?,
1417                        _ => return Err(GraphError::InvalidEdgeIndex),
1418                    }
1419                }
1420            }
1421            Ok(())
1422        }
1423
1424        #[inline]
1425        fn step_node_by_index<C, T>(
1426            &mut self,
1427            index: usize,
1428            clock: &C,
1429            telemetry: &mut T,
1430        ) -> Result<StepResult, NodeError>
1431        where
1432            EdgePolicy: Copy,
1433            C: PlatformClock + Sized,
1434            T: Telemetry + Sized,
1435        {
1436            match index {
1437                0 => <Self as GraphNodeContextBuilder<0, 0, 1>>::with_node_and_step_context::<
1438                    C,
1439                    T,
1440                    StepResult,
1441                    NodeError,
1442                >(self, clock, telemetry, |node, ctx| node.step(ctx)),
1443                1 => <Self as GraphNodeContextBuilder<1, 1, 1>>::with_node_and_step_context::<
1444                    C,
1445                    T,
1446                    StepResult,
1447                    NodeError,
1448                >(self, clock, telemetry, |node, ctx| node.step(ctx)),
1449                2 => <Self as GraphNodeContextBuilder<2, 1, 0>>::with_node_and_step_context::<
1450                    C,
1451                    T,
1452                    StepResult,
1453                    NodeError,
1454                >(self, clock, telemetry, |node, ctx| node.step(ctx)),
1455                _ => unreachable!("invalid node index"),
1456            }
1457        }
1458    }
1459
1460    // ===== GraphNodeAccess<I> =====
1461    impl<SrcClk: PlatformClock + Send + 'static> GraphNodeAccess<0> for TestPipelineStd<SrcClk> {
1462        type Node = NodeLink<SrcNode<SrcClk>, 0, 1, (), TestTensor>;
1463        #[inline]
1464        fn node_ref(&self) -> &Self::Node {
1465            &self.nodes.0
1466        }
1467        #[inline]
1468        fn node_mut(&mut self) -> &mut Self::Node {
1469            &mut self.nodes.0
1470        }
1471    }
1472    impl<SrcClk: PlatformClock + Send + 'static> GraphNodeAccess<1> for TestPipelineStd<SrcClk> {
1473        type Node = NodeLink<MapNode, 1, 1, TestTensor, TestTensor>;
1474        #[inline]
1475        fn node_ref(&self) -> &Self::Node {
1476            &self.nodes.1
1477        }
1478        #[inline]
1479        fn node_mut(&mut self) -> &mut Self::Node {
1480            &mut self.nodes.1
1481        }
1482    }
1483    impl<SrcClk: PlatformClock + Send + 'static> GraphNodeAccess<2> for TestPipelineStd<SrcClk> {
1484        type Node = NodeLink<SnkNode, 1, 0, TestTensor, ()>;
1485        #[inline]
1486        fn node_ref(&self) -> &Self::Node {
1487            &self.nodes.2
1488        }
1489        #[inline]
1490        fn node_mut(&mut self) -> &mut Self::Node {
1491            &mut self.nodes.2
1492        }
1493    }
1494
1495    // ===== GraphEdgeAccess<E> =====
1496    impl<SrcClk: PlatformClock + Send + 'static> GraphEdgeAccess<1> for TestPipelineStd<SrcClk> {
1497        type Edge = EdgeLink<ConcurrentEdge>;
1498        #[inline]
1499        fn edge_ref(&self) -> &Self::Edge {
1500            &self.edges.0
1501        }
1502        #[inline]
1503        fn edge_mut(&mut self) -> &mut Self::Edge {
1504            &mut self.edges.0
1505        }
1506    }
1507    impl<SrcClk: PlatformClock + Send + 'static> GraphEdgeAccess<2> for TestPipelineStd<SrcClk> {
1508        type Edge = EdgeLink<ConcurrentEdge>;
1509        #[inline]
1510        fn edge_ref(&self) -> &Self::Edge {
1511            &self.edges.1
1512        }
1513        #[inline]
1514        fn edge_mut(&mut self) -> &mut Self::Edge {
1515            &mut self.edges.1
1516        }
1517    }
1518
1519    // ===== GraphNodeTypes<I, IN, OUT> =====
1520    impl<SrcClk: PlatformClock + Send + 'static> GraphNodeTypes<0, 0, 1> for TestPipelineStd<SrcClk> {
1521        type InP = ();
1522        type OutP = TestTensor;
1523        type InQ = NoQueue;
1524        type OutQ = ConcurrentEdge;
1525        type InM = StaticMemoryManager<(), 1>;
1526        type OutM = ConcMgr32;
1527    }
1528    impl<SrcClk: PlatformClock + Send + 'static> GraphNodeTypes<1, 1, 1> for TestPipelineStd<SrcClk> {
1529        type InP = TestTensor;
1530        type OutP = TestTensor;
1531        type InQ = ConcurrentEdge;
1532        type OutQ = ConcurrentEdge;
1533        type InM = ConcMgr32;
1534        type OutM = ConcMgr32;
1535    }
1536    impl<SrcClk: PlatformClock + Send + 'static> GraphNodeTypes<2, 1, 0> for TestPipelineStd<SrcClk> {
1537        type InP = TestTensor;
1538        type OutP = ();
1539        type InQ = ConcurrentEdge;
1540        type OutQ = NoQueue;
1541        type InM = ConcMgr32;
1542        type OutM = StaticMemoryManager<(), 1>;
1543    }
1544
1545    // ===== GraphNodeContextBuilder<I, IN, OUT> =====
1546
1547    // node 0: in=[], out=[e0]
1548    impl<SrcClk: PlatformClock + Send + 'static> GraphNodeContextBuilder<0, 0, 1>
1549        for TestPipelineStd<SrcClk>
1550    where
1551        Self: GraphNodeAccess<0, Node = NodeLink<SrcNode<SrcClk>, 0, 1, (), TestTensor>>,
1552    {
1553        #[inline]
1554        fn make_step_context<'graph, 'telemetry, 'clock, C, T>(
1555            &'graph mut self,
1556            clock: &'clock C,
1557            telemetry: &'telemetry mut T,
1558        ) -> StepContext<
1559            'graph,
1560            'telemetry,
1561            'clock,
1562            0,
1563            1,
1564            <Self as GraphNodeTypes<0, 0, 1>>::InP,
1565            <Self as GraphNodeTypes<0, 0, 1>>::OutP,
1566            <Self as GraphNodeTypes<0, 0, 1>>::InQ,
1567            <Self as GraphNodeTypes<0, 0, 1>>::OutQ,
1568            <Self as GraphNodeTypes<0, 0, 1>>::InM,
1569            <Self as GraphNodeTypes<0, 0, 1>>::OutM,
1570            C,
1571            T,
1572        >
1573        where
1574            EdgePolicy: Copy,
1575            C: PlatformClock + Sized,
1576            T: Telemetry + Sized,
1577        {
1578            let out0_policy = *self.edges.0.policy();
1579            StepContext::new(
1580                [],
1581                [self.edges.0.queue_mut()],
1582                [],
1583                [&mut self.managers.0],
1584                [],
1585                [out0_policy],
1586                0u32,
1587                [],
1588                [1u32],
1589                clock,
1590                telemetry,
1591            )
1592        }
1593
1594        #[inline]
1595        fn with_node_and_step_context<'telemetry, 'clock, C, T, R, E>(
1596            &mut self,
1597            clock: &'clock C,
1598            telemetry: &'telemetry mut T,
1599            f: impl FnOnce(
1600                &mut <Self as GraphNodeAccess<0>>::Node,
1601                &mut StepContext<
1602                    '_,
1603                    'telemetry,
1604                    'clock,
1605                    0,
1606                    1,
1607                    <Self as GraphNodeTypes<0, 0, 1>>::InP,
1608                    <Self as GraphNodeTypes<0, 0, 1>>::OutP,
1609                    <Self as GraphNodeTypes<0, 0, 1>>::InQ,
1610                    <Self as GraphNodeTypes<0, 0, 1>>::OutQ,
1611                    <Self as GraphNodeTypes<0, 0, 1>>::InM,
1612                    <Self as GraphNodeTypes<0, 0, 1>>::OutM,
1613                    C,
1614                    T,
1615                >,
1616            ) -> Result<R, E>,
1617        ) -> Result<R, E>
1618        where
1619            Self: GraphNodeAccess<0>,
1620            EdgePolicy: Copy,
1621            C: PlatformClock + Sized,
1622            T: Telemetry + Sized,
1623        {
1624            let node = &mut self.nodes.0;
1625            let out0_policy = *self.edges.0.policy();
1626            let outputs = [self.edges.0.queue_mut()];
1627            let out_managers = [&mut self.managers.0];
1628            let mut ctx = StepContext::new(
1629                [],
1630                outputs,
1631                [],
1632                out_managers,
1633                [],
1634                [out0_policy],
1635                0u32,
1636                [],
1637                [1u32],
1638                clock,
1639                telemetry,
1640            );
1641            f(node, &mut ctx)
1642        }
1643    }
1644
1645    // node 1: in=[e0], out=[e1]
1646    impl<SrcClk: PlatformClock + Send + 'static> GraphNodeContextBuilder<1, 1, 1>
1647        for TestPipelineStd<SrcClk>
1648    where
1649        Self: GraphNodeAccess<1, Node = NodeLink<MapNode, 1, 1, TestTensor, TestTensor>>,
1650    {
1651        #[inline]
1652        fn make_step_context<'graph, 'telemetry, 'clock, C, T>(
1653            &'graph mut self,
1654            clock: &'clock C,
1655            telemetry: &'telemetry mut T,
1656        ) -> StepContext<
1657            'graph,
1658            'telemetry,
1659            'clock,
1660            1,
1661            1,
1662            <Self as GraphNodeTypes<1, 1, 1>>::InP,
1663            <Self as GraphNodeTypes<1, 1, 1>>::OutP,
1664            <Self as GraphNodeTypes<1, 1, 1>>::InQ,
1665            <Self as GraphNodeTypes<1, 1, 1>>::OutQ,
1666            <Self as GraphNodeTypes<1, 1, 1>>::InM,
1667            <Self as GraphNodeTypes<1, 1, 1>>::OutM,
1668            C,
1669            T,
1670        >
1671        where
1672            EdgePolicy: Copy,
1673            C: PlatformClock + Sized,
1674            T: Telemetry + Sized,
1675        {
1676            let in0_policy = *self.edges.0.policy();
1677            let out1_policy = *self.edges.1.policy();
1678            StepContext::new(
1679                [self.edges.0.queue_mut()],
1680                [self.edges.1.queue_mut()],
1681                [&mut self.managers.0],
1682                [&mut self.managers.1],
1683                [in0_policy],
1684                [out1_policy],
1685                1u32,
1686                [1u32],
1687                [2u32],
1688                clock,
1689                telemetry,
1690            )
1691        }
1692
1693        #[inline]
1694        fn with_node_and_step_context<'telemetry, 'clock, C, T, R, E>(
1695            &mut self,
1696            clock: &'clock C,
1697            telemetry: &'telemetry mut T,
1698            f: impl FnOnce(
1699                &mut <Self as GraphNodeAccess<1>>::Node,
1700                &mut StepContext<
1701                    '_,
1702                    'telemetry,
1703                    'clock,
1704                    1,
1705                    1,
1706                    <Self as GraphNodeTypes<1, 1, 1>>::InP,
1707                    <Self as GraphNodeTypes<1, 1, 1>>::OutP,
1708                    <Self as GraphNodeTypes<1, 1, 1>>::InQ,
1709                    <Self as GraphNodeTypes<1, 1, 1>>::OutQ,
1710                    <Self as GraphNodeTypes<1, 1, 1>>::InM,
1711                    <Self as GraphNodeTypes<1, 1, 1>>::OutM,
1712                    C,
1713                    T,
1714                >,
1715            ) -> Result<R, E>,
1716        ) -> Result<R, E>
1717        where
1718            Self: GraphNodeAccess<1>,
1719            EdgePolicy: Copy,
1720            C: PlatformClock + Sized,
1721            T: Telemetry + Sized,
1722        {
1723            let node = &mut self.nodes.1;
1724            let in0_policy = *self.edges.0.policy();
1725            let out1_policy = *self.edges.1.policy();
1726            let inputs = [self.edges.0.queue_mut()];
1727            let outputs = [self.edges.1.queue_mut()];
1728            let in_mgrs = [&mut self.managers.0];
1729            let out_mgrs = [&mut self.managers.1];
1730            let mut ctx = StepContext::new(
1731                inputs,
1732                outputs,
1733                in_mgrs,
1734                out_mgrs,
1735                [in0_policy],
1736                [out1_policy],
1737                1u32,
1738                [1u32],
1739                [2u32],
1740                clock,
1741                telemetry,
1742            );
1743            f(node, &mut ctx)
1744        }
1745    }
1746
1747    // node 2: in=[e1], out=[]
1748    impl<SrcClk: PlatformClock + Send + 'static> GraphNodeContextBuilder<2, 1, 0>
1749        for TestPipelineStd<SrcClk>
1750    where
1751        Self: GraphNodeAccess<2, Node = NodeLink<SnkNode, 1, 0, TestTensor, ()>>,
1752    {
1753        #[inline]
1754        fn make_step_context<'graph, 'telemetry, 'clock, C, T>(
1755            &'graph mut self,
1756            clock: &'clock C,
1757            telemetry: &'telemetry mut T,
1758        ) -> StepContext<
1759            'graph,
1760            'telemetry,
1761            'clock,
1762            1,
1763            0,
1764            <Self as GraphNodeTypes<2, 1, 0>>::InP,
1765            <Self as GraphNodeTypes<2, 1, 0>>::OutP,
1766            <Self as GraphNodeTypes<2, 1, 0>>::InQ,
1767            <Self as GraphNodeTypes<2, 1, 0>>::OutQ,
1768            <Self as GraphNodeTypes<2, 1, 0>>::InM,
1769            <Self as GraphNodeTypes<2, 1, 0>>::OutM,
1770            C,
1771            T,
1772        >
1773        where
1774            EdgePolicy: Copy,
1775            C: PlatformClock + Sized,
1776            T: Telemetry + Sized,
1777        {
1778            let in1_policy = *self.edges.1.policy();
1779            StepContext::new(
1780                [self.edges.1.queue_mut()],
1781                [],
1782                [&mut self.managers.1],
1783                [],
1784                [in1_policy],
1785                [],
1786                2u32,
1787                [2u32],
1788                [],
1789                clock,
1790                telemetry,
1791            )
1792        }
1793
1794        #[inline]
1795        fn with_node_and_step_context<'telemetry, 'clock, C, T, R, E>(
1796            &mut self,
1797            clock: &'clock C,
1798            telemetry: &'telemetry mut T,
1799            f: impl FnOnce(
1800                &mut <Self as GraphNodeAccess<2>>::Node,
1801                &mut StepContext<
1802                    '_,
1803                    'telemetry,
1804                    'clock,
1805                    1,
1806                    0,
1807                    <Self as GraphNodeTypes<2, 1, 0>>::InP,
1808                    <Self as GraphNodeTypes<2, 1, 0>>::OutP,
1809                    <Self as GraphNodeTypes<2, 1, 0>>::InQ,
1810                    <Self as GraphNodeTypes<2, 1, 0>>::OutQ,
1811                    <Self as GraphNodeTypes<2, 1, 0>>::InM,
1812                    <Self as GraphNodeTypes<2, 1, 0>>::OutM,
1813                    C,
1814                    T,
1815                >,
1816            ) -> Result<R, E>,
1817        ) -> Result<R, E>
1818        where
1819            Self: GraphNodeAccess<2>,
1820            EdgePolicy: Copy,
1821            C: PlatformClock + Sized,
1822            T: Telemetry + Sized,
1823        {
1824            let node = &mut self.nodes.2;
1825            let in1_policy = *self.edges.1.policy();
1826            let inputs = [self.edges.1.queue_mut()];
1827            let in_mgrs = [&mut self.managers.1];
1828            let mut ctx = StepContext::new(
1829                inputs,
1830                [],
1831                in_mgrs,
1832                [],
1833                [in1_policy],
1834                [],
1835                2u32,
1836                [2u32],
1837                [],
1838                clock,
1839                telemetry,
1840            );
1841            f(node, &mut ctx)
1842        }
1843    }
1844}