1use crate::{
60 edge::{Edge as _, EdgeOccupancy, NoQueue},
61 errors::{GraphError, NodeError},
62 graph::{GraphApi, GraphEdgeAccess, GraphNodeAccess, GraphNodeContextBuilder, GraphNodeTypes},
63 node::{
64 bench::{TestCounterSourceTensor, TestIdentityModelNodeTensor, TestSinkNodeTensor},
65 sink::SinkNode,
66 source::{Source as _, SourceNode, EXTERNAL_INGRESS_NODE},
67 Node as _, StepContext, StepResult,
68 },
69 policy::{AdmissionPolicy, EdgePolicy, NodePolicy, OverBudgetAction},
70 prelude::{
71 EdgeDescriptor, EdgeLink, NodeDescriptor, NodeLink, PlatformClock, StaticMemoryManager,
72 Telemetry, TestTensor,
73 },
74 types::{EdgeIndex, NodeIndex, PortId, PortIndex},
75};
76
77type Q32 = crate::edge::bench::TestSpscRingBuf<32>;
79const Q_32_POLICY: EdgePolicy = EdgePolicy {
80 caps: crate::policy::QueueCaps {
81 max_items: 32,
82 soft_items: 32,
83 max_bytes: None,
84 soft_bytes: None,
85 },
86 over_budget: OverBudgetAction::Drop,
87 admission: AdmissionPolicy::DropOldest,
88};
89
90type Mgr32 = StaticMemoryManager<TestTensor, 35>;
92
93#[allow(type_alias_bounds)]
95type SrcNode<SrcClk: PlatformClock> =
96 SourceNode<TestCounterSourceTensor<SrcClk, 32>, TestTensor, 1>;
97const INGRESS_POLICY: EdgePolicy = Q_32_POLICY;
98
99const TEST_MAX_BATCH: usize = 32;
101type MapNode = TestIdentityModelNodeTensor<TEST_MAX_BATCH>;
102
103type SnkNode = SinkNode<TestSinkNodeTensor, TestTensor, 1>;
105
106#[allow(clippy::complexity)]
108pub struct TestPipeline<SrcClk: PlatformClock> {
109 nodes: (
111 NodeLink<SrcNode<SrcClk>, 0, 1, (), TestTensor>,
112 NodeLink<MapNode, 1, 1, TestTensor, TestTensor>,
113 NodeLink<SnkNode, 1, 0, TestTensor, ()>,
114 ),
115 edges: (EdgeLink<Q32>, EdgeLink<Q32>),
117 managers: (Mgr32, Mgr32),
119}
120
121impl<SrcClk: PlatformClock> TestPipeline<SrcClk> {
122 #[inline]
124 pub fn new(
125 node_0: impl Into<SrcNode<SrcClk>>,
126 node_1: MapNode,
127 node_2: impl Into<SnkNode>,
128 q_0: Q32,
129 q_1: Q32,
130 mgr_0: Mgr32,
131 mgr_1: Mgr32,
132 ) -> Self {
133 let node_0: SrcNode<SrcClk> = node_0.into();
134 let node_2: SnkNode = node_2.into();
135
136 let nodes = (
137 NodeLink::<SrcNode<SrcClk>, 0, 1, (), TestTensor>::new(
138 node_0,
139 NodeIndex::from(0usize),
140 Some("src"),
141 ),
142 NodeLink::<MapNode, 1, 1, TestTensor, TestTensor>::new(
143 node_1,
144 NodeIndex::from(1usize),
145 Some("map"),
146 ),
147 NodeLink::<SnkNode, 1, 0, TestTensor, ()>::new(
148 node_2,
149 NodeIndex::from(2usize),
150 Some("snk"),
151 ),
152 );
153
154 let edges = (
155 EdgeLink::<Q32>::new(
156 q_0,
157 EdgeIndex::from(1usize),
158 PortId::new(NodeIndex::from(0usize), PortIndex::from(0usize)),
159 PortId::new(NodeIndex::from(1usize), PortIndex::from(0usize)),
160 Q_32_POLICY,
161 Some("e0"),
162 ),
163 EdgeLink::<Q32>::new(
164 q_1,
165 EdgeIndex::from(2usize),
166 PortId::new(NodeIndex::from(1usize), PortIndex::from(0usize)),
167 PortId::new(NodeIndex::from(2usize), PortIndex::from(0usize)),
168 Q_32_POLICY,
169 Some("e1"),
170 ),
171 );
172
173 let managers = (mgr_0, mgr_1);
174
175 Self {
176 nodes,
177 edges,
178 managers,
179 }
180 }
181}
182
183impl<SrcClk: PlatformClock> GraphApi<3, 3> for TestPipeline<SrcClk> {
185 #[inline]
186 fn get_node_descriptors(&self) -> [NodeDescriptor; 3] {
187 [
188 self.nodes.0.descriptor(),
189 self.nodes.1.descriptor(),
190 self.nodes.2.descriptor(),
191 ]
192 }
193 #[inline]
194 fn get_edge_descriptors(&self) -> [EdgeDescriptor; 3] {
195 [
196 EdgeDescriptor::new(
197 EdgeIndex::from(0usize),
198 PortId::new(EXTERNAL_INGRESS_NODE, PortIndex::from(0usize)),
199 PortId::new(NodeIndex::from(0usize), PortIndex::from(0usize)),
200 Some("ingress0"),
201 ),
202 self.edges.0.descriptor(),
203 self.edges.1.descriptor(),
204 ]
205 }
206
207 #[inline]
208 fn get_node_policies(&self) -> [NodePolicy; 3] {
209 [
210 self.nodes.0.policy(),
211 self.nodes.1.policy(),
212 self.nodes.2.policy(),
213 ]
214 }
215
216 #[inline]
217 fn get_edge_policies(&self) -> [EdgePolicy; 3] {
218 [
219 INGRESS_POLICY,
220 *self.edges.0.policy(),
221 *self.edges.1.policy(),
222 ]
223 }
224
225 #[inline]
226 fn edge_occupancy_for<const E: usize>(&self) -> Result<EdgeOccupancy, GraphError> {
227 let occ = match E {
228 0 => {
229 let src = self.nodes.0.node().source_ref();
230 src.ingress_occupancy()
231 }
232 1 => {
233 let e = &self.edges.0;
234 e.occupancy(e.policy())
235 }
236 2 => {
237 let e = &self.edges.1;
238 e.occupancy(e.policy())
239 }
240 _ => return Err(GraphError::InvalidEdgeIndex), };
242 Ok(occ)
243 }
244
245 #[inline]
246 fn write_all_edge_occupancies(&self, out: &mut [EdgeOccupancy; 3]) -> Result<(), GraphError> {
247 out[0] = self.edge_occupancy_for::<0>()?;
248 out[1] = self.edge_occupancy_for::<1>()?;
249 out[2] = self.edge_occupancy_for::<2>()?;
250 Ok(())
251 }
252
253 #[inline]
254 fn refresh_occupancies_for_node<const I: usize, const IN: usize, const OUT: usize>(
255 &self,
256 out: &mut [EdgeOccupancy; 3],
257 ) -> Result<(), GraphError> {
258 let node_idx = NodeIndex::from(I);
259 for ed in self.get_edge_descriptors().iter() {
261 if ed.upstream().node() == &node_idx || ed.downstream().node() == &node_idx {
262 let ei = (ed.id()).as_usize();
263 match ei {
264 0 => {
265 out[0] = self.edge_occupancy_for::<0>()?;
266 }
267 1 => {
268 out[1] = self.edge_occupancy_for::<1>()?;
269 }
270 2 => {
271 out[2] = self.edge_occupancy_for::<2>()?;
272 }
273 _ => return Err(GraphError::InvalidEdgeIndex),
274 }
275 }
276 }
277 Ok(())
278 }
279
280 #[inline]
281 fn step_node_by_index<C, T>(
282 &mut self,
283 index: usize,
284 clock: &C,
285 telemetry: &mut T,
286 ) -> Result<StepResult, NodeError>
287 where
288 EdgePolicy: Copy,
289 C: PlatformClock + Sized,
290 T: Telemetry + Sized,
291 {
292 match index {
293 0 => <Self as GraphNodeContextBuilder<0, 0, 1>>::with_node_and_step_context::<
294 C,
295 T,
296 StepResult,
297 NodeError,
298 >(self, clock, telemetry, |node, ctx| node.step(ctx)),
299
300 1 => <Self as GraphNodeContextBuilder<1, 1, 1>>::with_node_and_step_context::<
301 C,
302 T,
303 StepResult,
304 NodeError,
305 >(self, clock, telemetry, |node, ctx| node.step(ctx)),
306
307 2 => <Self as GraphNodeContextBuilder<2, 1, 0>>::with_node_and_step_context::<
308 C,
309 T,
310 StepResult,
311 NodeError,
312 >(self, clock, telemetry, |node, ctx| node.step(ctx)),
313
314 _ => unreachable!("invalid node index"),
315 }
316 }
317}
318
319impl<SrcClk: PlatformClock> GraphNodeAccess<0> for TestPipeline<SrcClk> {
321 type Node = NodeLink<SrcNode<SrcClk>, 0, 1, (), TestTensor>;
322 #[inline]
323 fn node_ref(&self) -> &Self::Node {
324 &self.nodes.0
325 }
326 #[inline]
327 fn node_mut(&mut self) -> &mut Self::Node {
328 &mut self.nodes.0
329 }
330}
331impl<SrcClk: PlatformClock> GraphNodeAccess<1> for TestPipeline<SrcClk> {
332 type Node = NodeLink<MapNode, 1, 1, TestTensor, TestTensor>;
333 #[inline]
334 fn node_ref(&self) -> &Self::Node {
335 &self.nodes.1
336 }
337 #[inline]
338 fn node_mut(&mut self) -> &mut Self::Node {
339 &mut self.nodes.1
340 }
341}
342impl<SrcClk: PlatformClock> GraphNodeAccess<2> for TestPipeline<SrcClk> {
343 type Node = NodeLink<SnkNode, 1, 0, TestTensor, ()>;
344 #[inline]
345 fn node_ref(&self) -> &Self::Node {
346 &self.nodes.2
347 }
348 #[inline]
349 fn node_mut(&mut self) -> &mut Self::Node {
350 &mut self.nodes.2
351 }
352}
353
354impl<SrcClk: PlatformClock> GraphEdgeAccess<1> for TestPipeline<SrcClk> {
356 type Edge = EdgeLink<Q32>;
357 #[inline]
358 fn edge_ref(&self) -> &Self::Edge {
359 &self.edges.0
360 }
361 #[inline]
362 fn edge_mut(&mut self) -> &mut Self::Edge {
363 &mut self.edges.0
364 }
365}
366impl<SrcClk: PlatformClock> GraphEdgeAccess<2> for TestPipeline<SrcClk> {
367 type Edge = EdgeLink<Q32>;
368 #[inline]
369 fn edge_ref(&self) -> &Self::Edge {
370 &self.edges.1
371 }
372 #[inline]
373 fn edge_mut(&mut self) -> &mut Self::Edge {
374 &mut self.edges.1
375 }
376}
377
378impl<SrcClk: PlatformClock> GraphNodeTypes<0, 0, 1> for TestPipeline<SrcClk> {
381 type InP = ();
382 type OutP = TestTensor;
383 type InQ = NoQueue;
384 type OutQ = Q32;
385 type InM = StaticMemoryManager<(), 1>;
386 type OutM = Mgr32;
387}
388impl<SrcClk: PlatformClock> GraphNodeTypes<1, 1, 1> for TestPipeline<SrcClk> {
390 type InP = TestTensor;
391 type OutP = TestTensor;
392 type InQ = Q32;
393 type OutQ = Q32;
394 type InM = Mgr32;
395 type OutM = Mgr32;
396}
397impl<SrcClk: PlatformClock> GraphNodeTypes<2, 1, 0> for TestPipeline<SrcClk> {
399 type InP = TestTensor;
400 type OutP = ();
401 type InQ = Q32;
402 type OutQ = NoQueue;
403 type InM = Mgr32;
404 type OutM = StaticMemoryManager<(), 1>;
405}
406
407impl<SrcClk: PlatformClock> GraphNodeContextBuilder<0, 0, 1> for TestPipeline<SrcClk>
410where
411 Self: GraphNodeAccess<0, Node = NodeLink<SrcNode<SrcClk>, 0, 1, (), TestTensor>>,
412{
413 #[inline]
414 fn make_step_context<'graph, 'telemetry, 'clock, C, T>(
415 &'graph mut self,
416 clock: &'clock C,
417 telemetry: &'telemetry mut T,
418 ) -> StepContext<
419 'graph,
420 'telemetry,
421 'clock,
422 0,
423 1,
424 <Self as GraphNodeTypes<0, 0, 1>>::InP,
425 <Self as GraphNodeTypes<0, 0, 1>>::OutP,
426 <Self as GraphNodeTypes<0, 0, 1>>::InQ,
427 <Self as GraphNodeTypes<0, 0, 1>>::OutQ,
428 <Self as GraphNodeTypes<0, 0, 1>>::InM,
429 <Self as GraphNodeTypes<0, 0, 1>>::OutM,
430 C,
431 T,
432 >
433 where
434 EdgePolicy: Copy,
435 C: PlatformClock + Sized,
436 T: Telemetry + Sized,
437 {
438 let out0_policy = *self.edges.0.policy();
439
440 let inputs: [&mut <Self as GraphNodeTypes<0, 0, 1>>::InQ; 0] = [];
441 let outputs: [&mut <Self as GraphNodeTypes<0, 0, 1>>::OutQ; 1] = [self.edges.0.queue_mut()];
442
443 let in_managers: [&'graph mut <Self as GraphNodeTypes<0, 0, 1>>::InM; 0] = [];
444 let out_managers: [&'graph mut <Self as GraphNodeTypes<0, 0, 1>>::OutM; 1] =
445 [&mut self.managers.0];
446
447 let in_policies: [EdgePolicy; 0] = [];
448 let out_policies: [EdgePolicy; 1] = [out0_policy];
449
450 let node_id: u32 = 0;
451 let in_edge_ids: [u32; 0] = [];
452 let out_edge_ids: [u32; 1] = [1];
453
454 StepContext::<
455 'graph,
456 'telemetry,
457 'clock,
458 0,
459 1,
460 <Self as GraphNodeTypes<0, 0, 1>>::InP,
461 <Self as GraphNodeTypes<0, 0, 1>>::OutP,
462 <Self as GraphNodeTypes<0, 0, 1>>::InQ,
463 <Self as GraphNodeTypes<0, 0, 1>>::OutQ,
464 <Self as GraphNodeTypes<0, 0, 1>>::InM,
465 <Self as GraphNodeTypes<0, 0, 1>>::OutM,
466 C,
467 T,
468 >::new(
469 inputs,
470 outputs,
471 in_managers,
472 out_managers,
473 in_policies,
474 out_policies,
475 node_id,
476 in_edge_ids,
477 out_edge_ids,
478 clock,
479 telemetry,
480 )
481 }
482
483 #[inline]
484 fn with_node_and_step_context<'telemetry, 'clock, C, T, R, E>(
485 &mut self,
486 clock: &'clock C,
487 telemetry: &'telemetry mut T,
488 f: impl FnOnce(
489 &mut <Self as GraphNodeAccess<0>>::Node,
490 &mut StepContext<
491 '_,
492 'telemetry,
493 'clock,
494 0,
495 1,
496 <Self as GraphNodeTypes<0, 0, 1>>::InP,
497 <Self as GraphNodeTypes<0, 0, 1>>::OutP,
498 <Self as GraphNodeTypes<0, 0, 1>>::InQ,
499 <Self as GraphNodeTypes<0, 0, 1>>::OutQ,
500 <Self as GraphNodeTypes<0, 0, 1>>::InM,
501 <Self as GraphNodeTypes<0, 0, 1>>::OutM,
502 C,
503 T,
504 >,
505 ) -> Result<R, E>,
506 ) -> Result<R, E>
507 where
508 Self: GraphNodeAccess<0>,
509 EdgePolicy: Copy,
510 C: PlatformClock + Sized,
511 T: Telemetry + Sized,
512 {
513 let node = &mut self.nodes.0;
514
515 let out0_policy = *self.edges.0.policy();
516
517 let inputs: [&mut <Self as GraphNodeTypes<0, 0, 1>>::InQ; 0] = [];
518 let outputs: [&mut <Self as GraphNodeTypes<0, 0, 1>>::OutQ; 1] = [self.edges.0.queue_mut()];
519
520 let in_managers: [&mut <Self as GraphNodeTypes<0, 0, 1>>::InM; 0] = [];
521 let out_managers: [&mut <Self as GraphNodeTypes<0, 0, 1>>::OutM; 1] =
522 [&mut self.managers.0];
523
524 let in_policies: [EdgePolicy; 0] = [];
525 let out_policies: [EdgePolicy; 1] = [out0_policy];
526
527 let node_id: u32 = 0;
528 let in_edge_ids: [u32; 0] = [];
529 let out_edge_ids: [u32; 1] = [1];
530
531 let mut ctx = StepContext::new(
532 inputs,
533 outputs,
534 in_managers,
535 out_managers,
536 in_policies,
537 out_policies,
538 node_id,
539 in_edge_ids,
540 out_edge_ids,
541 clock,
542 telemetry,
543 );
544 f(node, &mut ctx)
545 }
546}
547
548impl<SrcClk: PlatformClock> GraphNodeContextBuilder<1, 1, 1> for TestPipeline<SrcClk>
550where
551 Self: GraphNodeAccess<1, Node = NodeLink<MapNode, 1, 1, TestTensor, TestTensor>>,
552{
553 #[inline]
554 fn make_step_context<'graph, 'telemetry, 'clock, C, T>(
555 &'graph mut self,
556 clock: &'clock C,
557 telemetry: &'telemetry mut T,
558 ) -> StepContext<
559 'graph,
560 'telemetry,
561 'clock,
562 1,
563 1,
564 <Self as GraphNodeTypes<1, 1, 1>>::InP,
565 <Self as GraphNodeTypes<1, 1, 1>>::OutP,
566 <Self as GraphNodeTypes<1, 1, 1>>::InQ,
567 <Self as GraphNodeTypes<1, 1, 1>>::OutQ,
568 <Self as GraphNodeTypes<1, 1, 1>>::InM,
569 <Self as GraphNodeTypes<1, 1, 1>>::OutM,
570 C,
571 T,
572 >
573 where
574 EdgePolicy: Copy,
575 C: PlatformClock + Sized,
576 T: Telemetry + Sized,
577 {
578 let in0_policy = *self.edges.0.policy();
579 let out1_policy = *self.edges.1.policy();
580
581 let inputs: [&mut <Self as GraphNodeTypes<1, 1, 1>>::InQ; 1] = [self.edges.0.queue_mut()];
582 let outputs: [&mut <Self as GraphNodeTypes<1, 1, 1>>::OutQ; 1] = [self.edges.1.queue_mut()];
583
584 let in_managers: [&'graph mut <Self as GraphNodeTypes<1, 1, 1>>::InM; 1] =
585 [&mut self.managers.0];
586 let out_managers: [&'graph mut <Self as GraphNodeTypes<1, 1, 1>>::OutM; 1] =
587 [&mut self.managers.1];
588
589 let in_policies: [EdgePolicy; 1] = [in0_policy];
590 let out_policies: [EdgePolicy; 1] = [out1_policy];
591
592 let node_id: u32 = 1;
593 let in_edge_ids: [u32; 1] = [1];
594 let out_edge_ids: [u32; 1] = [2];
595
596 StepContext::<
597 'graph,
598 'telemetry,
599 'clock,
600 1,
601 1,
602 <Self as GraphNodeTypes<1, 1, 1>>::InP,
603 <Self as GraphNodeTypes<1, 1, 1>>::OutP,
604 <Self as GraphNodeTypes<1, 1, 1>>::InQ,
605 <Self as GraphNodeTypes<1, 1, 1>>::OutQ,
606 <Self as GraphNodeTypes<1, 1, 1>>::InM,
607 <Self as GraphNodeTypes<1, 1, 1>>::OutM,
608 C,
609 T,
610 >::new(
611 inputs,
612 outputs,
613 in_managers,
614 out_managers,
615 in_policies,
616 out_policies,
617 node_id,
618 in_edge_ids,
619 out_edge_ids,
620 clock,
621 telemetry,
622 )
623 }
624
625 #[inline]
626 fn with_node_and_step_context<'telemetry, 'clock, C, T, R, E>(
627 &mut self,
628 clock: &'clock C,
629 telemetry: &'telemetry mut T,
630 f: impl FnOnce(
631 &mut <Self as GraphNodeAccess<1>>::Node,
632 &mut StepContext<
633 '_,
634 'telemetry,
635 'clock,
636 1,
637 1,
638 <Self as GraphNodeTypes<1, 1, 1>>::InP,
639 <Self as GraphNodeTypes<1, 1, 1>>::OutP,
640 <Self as GraphNodeTypes<1, 1, 1>>::InQ,
641 <Self as GraphNodeTypes<1, 1, 1>>::OutQ,
642 <Self as GraphNodeTypes<1, 1, 1>>::InM,
643 <Self as GraphNodeTypes<1, 1, 1>>::OutM,
644 C,
645 T,
646 >,
647 ) -> Result<R, E>,
648 ) -> Result<R, E>
649 where
650 Self: GraphNodeAccess<1>,
651 EdgePolicy: Copy,
652 C: PlatformClock + Sized,
653 T: Telemetry + Sized,
654 {
655 let node = &mut self.nodes.1;
656
657 let in0_policy = *self.edges.0.policy();
658 let out1_policy = *self.edges.1.policy();
659
660 let inputs: [&mut <Self as GraphNodeTypes<1, 1, 1>>::InQ; 1] = [self.edges.0.queue_mut()];
661 let outputs: [&mut <Self as GraphNodeTypes<1, 1, 1>>::OutQ; 1] = [self.edges.1.queue_mut()];
662
663 let in_managers: [&mut <Self as GraphNodeTypes<1, 1, 1>>::InM; 1] = [&mut self.managers.0];
664 let out_managers: [&mut <Self as GraphNodeTypes<1, 1, 1>>::OutM; 1] =
665 [&mut self.managers.1];
666
667 let in_policies: [EdgePolicy; 1] = [in0_policy];
668 let out_policies: [EdgePolicy; 1] = [out1_policy];
669
670 let node_id: u32 = 1;
671 let in_edge_ids: [u32; 1] = [1];
672 let out_edge_ids: [u32; 1] = [2];
673
674 let mut ctx = StepContext::new(
675 inputs,
676 outputs,
677 in_managers,
678 out_managers,
679 in_policies,
680 out_policies,
681 node_id,
682 in_edge_ids,
683 out_edge_ids,
684 clock,
685 telemetry,
686 );
687 f(node, &mut ctx)
688 }
689}
690
691impl<SrcClk: PlatformClock> GraphNodeContextBuilder<2, 1, 0> for TestPipeline<SrcClk>
693where
694 Self: GraphNodeAccess<2, Node = NodeLink<SnkNode, 1, 0, TestTensor, ()>>,
695{
696 #[inline]
697 fn make_step_context<'graph, 'telemetry, 'clock, C, T>(
698 &'graph mut self,
699 clock: &'clock C,
700 telemetry: &'telemetry mut T,
701 ) -> StepContext<
702 'graph,
703 'telemetry,
704 'clock,
705 1,
706 0,
707 <Self as GraphNodeTypes<2, 1, 0>>::InP,
708 <Self as GraphNodeTypes<2, 1, 0>>::OutP,
709 <Self as GraphNodeTypes<2, 1, 0>>::InQ,
710 <Self as GraphNodeTypes<2, 1, 0>>::OutQ,
711 <Self as GraphNodeTypes<2, 1, 0>>::InM,
712 <Self as GraphNodeTypes<2, 1, 0>>::OutM,
713 C,
714 T,
715 >
716 where
717 EdgePolicy: Copy,
718 C: PlatformClock + Sized,
719 T: Telemetry + Sized,
720 {
721 let in1_policy = *self.edges.1.policy();
722
723 let inputs: [&mut <Self as GraphNodeTypes<2, 1, 0>>::InQ; 1] = [self.edges.1.queue_mut()];
724 let outputs: [&mut <Self as GraphNodeTypes<2, 1, 0>>::OutQ; 0] = [];
725
726 let in_managers: [&'graph mut <Self as GraphNodeTypes<2, 1, 0>>::InM; 1] =
727 [&mut self.managers.1];
728 let out_managers: [&'graph mut <Self as GraphNodeTypes<2, 1, 0>>::OutM; 0] = [];
729
730 let in_policies: [EdgePolicy; 1] = [in1_policy];
731 let out_policies: [EdgePolicy; 0] = [];
732
733 let node_id: u32 = 2;
734 let in_edge_ids: [u32; 1] = [2];
735 let out_edge_ids: [u32; 0] = [];
736
737 StepContext::<
738 'graph,
739 'telemetry,
740 'clock,
741 1,
742 0,
743 <Self as GraphNodeTypes<2, 1, 0>>::InP,
744 <Self as GraphNodeTypes<2, 1, 0>>::OutP,
745 <Self as GraphNodeTypes<2, 1, 0>>::InQ,
746 <Self as GraphNodeTypes<2, 1, 0>>::OutQ,
747 <Self as GraphNodeTypes<2, 1, 0>>::InM,
748 <Self as GraphNodeTypes<2, 1, 0>>::OutM,
749 C,
750 T,
751 >::new(
752 inputs,
753 outputs,
754 in_managers,
755 out_managers,
756 in_policies,
757 out_policies,
758 node_id,
759 in_edge_ids,
760 out_edge_ids,
761 clock,
762 telemetry,
763 )
764 }
765
766 #[inline]
767 fn with_node_and_step_context<'telemetry, 'clock, C, T, R, E>(
768 &mut self,
769 clock: &'clock C,
770 telemetry: &'telemetry mut T,
771 f: impl FnOnce(
772 &mut <Self as GraphNodeAccess<2>>::Node,
773 &mut StepContext<
774 '_,
775 'telemetry,
776 'clock,
777 1,
778 0,
779 <Self as GraphNodeTypes<2, 1, 0>>::InP,
780 <Self as GraphNodeTypes<2, 1, 0>>::OutP,
781 <Self as GraphNodeTypes<2, 1, 0>>::InQ,
782 <Self as GraphNodeTypes<2, 1, 0>>::OutQ,
783 <Self as GraphNodeTypes<2, 1, 0>>::InM,
784 <Self as GraphNodeTypes<2, 1, 0>>::OutM,
785 C,
786 T,
787 >,
788 ) -> Result<R, E>,
789 ) -> Result<R, E>
790 where
791 Self: GraphNodeAccess<2>,
792 EdgePolicy: Copy,
793 C: PlatformClock + Sized,
794 T: Telemetry + Sized,
795 {
796 let node = &mut self.nodes.2;
797
798 let in1_policy = *self.edges.1.policy();
799
800 let inputs: [&mut <Self as GraphNodeTypes<2, 1, 0>>::InQ; 1] = [self.edges.1.queue_mut()];
801 let outputs: [&mut <Self as GraphNodeTypes<2, 1, 0>>::OutQ; 0] = [];
802
803 let in_managers: [&mut <Self as GraphNodeTypes<2, 1, 0>>::InM; 1] = [&mut self.managers.1];
804 let out_managers: [&mut <Self as GraphNodeTypes<2, 1, 0>>::OutM; 0] = [];
805
806 let in_policies: [EdgePolicy; 1] = [in1_policy];
807 let out_policies: [EdgePolicy; 0] = [];
808
809 let node_id: u32 = 2;
810 let in_edge_ids: [u32; 1] = [2];
811 let out_edge_ids: [u32; 0] = [];
812
813 let mut ctx = StepContext::new(
814 inputs,
815 outputs,
816 in_managers,
817 out_managers,
818 in_policies,
819 out_policies,
820 node_id,
821 in_edge_ids,
822 out_edge_ids,
823 clock,
824 telemetry,
825 );
826 f(node, &mut ctx)
827 }
828}
829
830#[cfg(feature = "std")]
851pub mod concurrent_graph {
852 use super::*;
853
854 use crate::{
855 edge::{spsc_concurrent::ConcurrentEdge, EdgeOccupancy, NoQueue},
856 errors::{GraphError, NodeError},
857 graph::{
858 GraphApi, GraphEdgeAccess, GraphNodeAccess, GraphNodeContextBuilder, GraphNodeTypes,
859 ScopedGraphApi,
860 },
861 node::{
862 bench::{TestCounterSourceTensor, TestIdentityModelNodeTensor},
863 source::SourceNode,
864 StepContext, StepResult,
865 },
866 policy::EdgePolicy,
867 prelude::{
868 ConcurrentMemoryManager, EdgeDescriptor, EdgeLink, NodeDescriptor, NodeLink,
869 PlatformClock, StaticMemoryManager, Telemetry, WorkerDecision, WorkerScheduler,
870 },
871 types::{EdgeIndex, NodeIndex, PortId, PortIndex},
872 };
873
874 type ConcMgr32 = ConcurrentMemoryManager<TestTensor>;
875
876 #[allow(type_alias_bounds)]
877 type SrcNode<SrcClk: PlatformClock + Send + 'static> =
878 SourceNode<TestCounterSourceTensor<SrcClk, 32>, TestTensor, 1>;
879
880 const TEST_MAX_BATCH: usize = 32;
881 type MapNode = TestIdentityModelNodeTensor<TEST_MAX_BATCH>;
882 type SnkNode = SinkNode<TestSinkNodeTensor, TestTensor, 1>;
883
884 #[allow(clippy::complexity)]
890 pub struct TestPipelineStd<SrcClk: PlatformClock + Send + 'static> {
891 nodes: (
892 NodeLink<SrcNode<SrcClk>, 0, 1, (), TestTensor>,
893 NodeLink<MapNode, 1, 1, TestTensor, TestTensor>,
894 NodeLink<SnkNode, 1, 0, TestTensor, ()>,
895 ),
896 edges: (EdgeLink<ConcurrentEdge>, EdgeLink<ConcurrentEdge>),
897 managers: (ConcMgr32, ConcMgr32),
898 }
899
900 impl<SrcClk: PlatformClock + Send + 'static> TestPipelineStd<SrcClk> {
901 #[inline]
907 pub fn new(
908 node_0: impl Into<SrcNode<SrcClk>>,
909 node_1: MapNode,
910 node_2: impl Into<SnkNode>,
911 q_1: ConcurrentEdge,
912 q_2: ConcurrentEdge,
913 mgr_1: ConcMgr32,
914 mgr_2: ConcMgr32,
915 ) -> Self {
916 let node_0: SrcNode<SrcClk> = node_0.into();
917 let node_2: SnkNode = node_2.into();
918
919 let n0 = NodeLink::<SrcNode<SrcClk>, 0, 1, (), TestTensor>::new(
920 node_0,
921 NodeIndex::from(0usize),
922 Some("src"),
923 );
924 let n1 = NodeLink::<MapNode, 1, 1, TestTensor, TestTensor>::new(
925 node_1,
926 NodeIndex::from(1usize),
927 Some("map"),
928 );
929 let n2 = NodeLink::<SnkNode, 1, 0, TestTensor, ()>::new(
930 node_2,
931 NodeIndex::from(2usize),
932 Some("snk"),
933 );
934
935 let e0 = EdgeLink::new(
936 q_1,
937 EdgeIndex::from(1usize),
938 PortId::new(NodeIndex::from(0usize), PortIndex::from(0)),
939 PortId::new(NodeIndex::from(1usize), PortIndex::from(0)),
940 Q_32_POLICY,
941 Some("e0"),
942 );
943 let e1 = EdgeLink::new(
944 q_2,
945 EdgeIndex::from(2usize),
946 PortId::new(NodeIndex::from(1usize), PortIndex::from(0)),
947 PortId::new(NodeIndex::from(2usize), PortIndex::from(0)),
948 Q_32_POLICY,
949 Some("e1"),
950 );
951
952 Self {
953 nodes: (n0, n1, n2),
954 edges: (e0, e1),
955 managers: (mgr_1, mgr_2),
956 }
957 }
958
959 fn run_scoped_impl<C, T, S>(&mut self, clock: C, telemetry: T, scheduler: S)
971 where
972 C: PlatformClock + Clone + Send + Sync + 'static,
973 T: Telemetry + Clone + Send + 'static,
974 S: WorkerScheduler + 'static,
975 SrcNode<SrcClk>: Send,
976 MapNode: Send,
977 SnkNode: Send,
978 {
979 let pol_0 = *self.edges.0.policy();
981 let pol_1 = *self.edges.1.policy();
982
983 let out_e_0_0 = crate::edge::ScopedEdge::scoped_handle(
989 self.edges.0.queue(),
990 crate::edge::EdgeHandleKind::Producer,
991 );
992 let out_m_0_0 = crate::memory::manager::ScopedManager::scoped_handle(&self.managers.0);
993 let in_e_1_0 = crate::edge::ScopedEdge::scoped_handle(
995 self.edges.0.queue(),
996 crate::edge::EdgeHandleKind::Consumer,
997 );
998 let in_m_1_0 = crate::memory::manager::ScopedManager::scoped_handle(&self.managers.0);
999 let out_e_1_0 = crate::edge::ScopedEdge::scoped_handle(
1000 self.edges.1.queue(),
1001 crate::edge::EdgeHandleKind::Producer,
1002 );
1003 let out_m_1_0 = crate::memory::manager::ScopedManager::scoped_handle(&self.managers.1);
1004 let in_e_2_0 = crate::edge::ScopedEdge::scoped_handle(
1006 self.edges.1.queue(),
1007 crate::edge::EdgeHandleKind::Consumer,
1008 );
1009 let in_m_2_0 = crate::memory::manager::ScopedManager::scoped_handle(&self.managers.1);
1010
1011 let telem_0 = telemetry.clone();
1013 let telem_1 = telemetry.clone();
1014 let telem_2 = telemetry;
1015
1016 let n0 = &mut self.nodes.0;
1018 let n1 = &mut self.nodes.1;
1019 let n2 = &mut self.nodes.2;
1020
1021 let clock_ref = &clock;
1022 let sched_ref = &scheduler;
1023
1024 ::std::thread::scope(|scope| {
1025 {
1027 fn _assert_send<_T: core::marker::Send>() {}
1028 _assert_send::<SrcNode<SrcClk>>();
1029 }
1030 scope.spawn(move || {
1031 let mut out_e_0_0 = out_e_0_0;
1032 let mut out_m_0_0 = out_m_0_0;
1033 let mut telem = telem_0;
1034
1035 let mut state =
1036 crate::scheduling::WorkerState::new(0, 3, clock_ref.now_ticks());
1037 loop {
1038 state.current_tick = clock_ref.now_ticks();
1039
1040 let src = n0.node();
1043 let any_ingress_has_batch = src.ingress_edge_has_batch();
1044
1045 let mut _max_wm = crate::policy::WatermarkState::BelowSoft;
1046 {
1047 let _occ = crate::edge::Edge::occupancy(&out_e_0_0, &pol_0);
1048 if *_occ.watermark() > _max_wm {
1049 _max_wm = *_occ.watermark();
1050 }
1051 }
1052 state.backpressure = _max_wm;
1053
1054 state.readiness = if any_ingress_has_batch {
1055 if _max_wm >= crate::policy::WatermarkState::BetweenSoftAndHard {
1056 crate::scheduling::Readiness::ReadyUnderPressure
1057 } else {
1058 crate::scheduling::Readiness::Ready
1059 }
1060 } else {
1061 crate::scheduling::Readiness::NotReady
1062 };
1063
1064 match sched_ref.decide(&state) {
1065 WorkerDecision::Step => {
1066 let mut ctx = crate::node::StepContext::new(
1067 [] as [&mut NoQueue; 0],
1068 [&mut out_e_0_0],
1069 [] as [&mut StaticMemoryManager<(), 1>; 0],
1070 [&mut out_m_0_0],
1071 [],
1072 [pol_0],
1073 0u32,
1074 [],
1075 [1u32],
1076 clock_ref,
1077 &mut telem,
1078 );
1079 match n0.step(&mut ctx) {
1080 Ok(sr) => {
1081 state.last_step = Some(sr);
1082 state.last_error = false;
1083 }
1084 Err(_e) => {
1085 state.last_step = None;
1086 state.last_error = true;
1087 }
1088 }
1089 }
1090 WorkerDecision::WaitMicros(d) => {
1091 ::std::thread::sleep(::std::time::Duration::from_micros(d));
1092 state.last_step = None;
1093 state.last_error = false;
1094 }
1095 WorkerDecision::Stop => break,
1096 }
1097 }
1098 });
1099
1100 {
1102 fn _assert_send<_T: core::marker::Send>() {}
1103 _assert_send::<MapNode>();
1104 }
1105 scope.spawn(move || {
1106 let mut in_e_1_0 = in_e_1_0;
1107 let mut in_m_1_0 = in_m_1_0;
1108 let mut out_e_1_0 = out_e_1_0;
1109 let mut out_m_1_0 = out_m_1_0;
1110 let mut telem = telem_1;
1111
1112 let mut state =
1113 crate::scheduling::WorkerState::new(1, 3, clock_ref.now_ticks());
1114 loop {
1115 state.current_tick = clock_ref.now_ticks();
1116
1117 let mut _max_wm = crate::policy::WatermarkState::BelowSoft;
1119 {
1120 let _occ = crate::edge::Edge::occupancy(&out_e_1_0, &pol_1);
1121 if *_occ.watermark() > _max_wm {
1122 _max_wm = *_occ.watermark();
1123 }
1124 }
1125 state.backpressure = _max_wm;
1126
1127 let mut any_input_has_items = false;
1129 {
1130 let _occ = crate::edge::Edge::occupancy(&in_e_1_0, &pol_0);
1131 if *_occ.items() > 0 {
1132 any_input_has_items = true;
1133 }
1134 }
1135
1136 let mut any_input_has_batch = false;
1138 if any_input_has_items {
1139 let mut probe_ctx = crate::node::StepContext::new(
1140 [&mut in_e_1_0],
1141 [&mut out_e_1_0],
1142 [&mut in_m_1_0],
1143 [&mut out_m_1_0],
1144 [pol_0],
1145 [pol_1],
1146 1u32,
1147 [1u32],
1148 [2u32],
1149 clock_ref,
1150 &mut telem,
1151 );
1152 let node_policy = n1.policy();
1153 for port in 0..1 {
1154 if probe_ctx.input_edge_has_batch(port, &node_policy) {
1155 any_input_has_batch = true;
1156 break;
1157 }
1158 }
1159 }
1160
1161 state.readiness = if any_input_has_batch {
1162 if _max_wm >= crate::policy::WatermarkState::BetweenSoftAndHard {
1163 crate::scheduling::Readiness::ReadyUnderPressure
1164 } else {
1165 crate::scheduling::Readiness::Ready
1166 }
1167 } else {
1168 crate::scheduling::Readiness::NotReady
1169 };
1170
1171 match sched_ref.decide(&state) {
1172 WorkerDecision::Step => {
1173 let mut ctx = crate::node::StepContext::new(
1175 [&mut in_e_1_0],
1176 [&mut out_e_1_0],
1177 [&mut in_m_1_0],
1178 [&mut out_m_1_0],
1179 [pol_0],
1180 [pol_1],
1181 1u32,
1182 [1u32],
1183 [2u32],
1184 clock_ref,
1185 &mut telem,
1186 );
1187 match n1.step(&mut ctx) {
1188 Ok(sr) => {
1189 state.last_step = Some(sr);
1190 state.last_error = false;
1191 }
1192 Err(e) => {
1193 ::std::eprintln!("run_scoped: node 1 step failed: {:?}", e);
1194 state.last_step = None;
1195 state.last_error = true;
1196 }
1197 }
1198 }
1199 WorkerDecision::WaitMicros(d) => {
1200 ::std::thread::sleep(::std::time::Duration::from_micros(d));
1201 state.last_step = None;
1202 state.last_error = false;
1203 }
1204 WorkerDecision::Stop => break,
1205 }
1206 }
1207 });
1208
1209 {
1211 fn _assert_send<_T: core::marker::Send>() {}
1212 _assert_send::<SnkNode>();
1213 }
1214 scope.spawn(move || {
1215 let mut in_e_2_0 = in_e_2_0;
1216 let mut in_m_2_0 = in_m_2_0;
1217 let mut telem = telem_2;
1218
1219 let mut state =
1220 crate::scheduling::WorkerState::new(2, 3, clock_ref.now_ticks());
1221 loop {
1222 state.current_tick = clock_ref.now_ticks();
1223
1224 let mut _max_wm = crate::policy::WatermarkState::BelowSoft;
1226 state.backpressure = _max_wm;
1227
1228 let mut any_input_has_items = false;
1230 {
1231 let _occ = crate::edge::Edge::occupancy(&in_e_2_0, &pol_1);
1233 if *_occ.items() > 0 {
1234 any_input_has_items = true;
1235 }
1236 }
1237
1238 let mut any_input_has_batch = false;
1240 if any_input_has_items {
1241 let mut probe_ctx = crate::node::StepContext::new(
1242 [&mut in_e_2_0],
1243 [] as [&mut NoQueue; 0],
1244 [&mut in_m_2_0],
1245 [] as [&mut StaticMemoryManager<(), 1>; 0],
1246 [pol_1],
1247 [],
1248 2u32,
1249 [2u32],
1250 [],
1251 clock_ref,
1252 &mut telem,
1253 );
1254 let node_policy = n2.policy();
1255 for port in 0..1 {
1256 if probe_ctx.input_edge_has_batch(port, &node_policy) {
1257 any_input_has_batch = true;
1258 break;
1259 }
1260 }
1261 }
1262
1263 state.readiness = if any_input_has_batch {
1264 if _max_wm >= crate::policy::WatermarkState::BetweenSoftAndHard {
1265 crate::scheduling::Readiness::ReadyUnderPressure
1266 } else {
1267 crate::scheduling::Readiness::Ready
1268 }
1269 } else {
1270 crate::scheduling::Readiness::NotReady
1271 };
1272
1273 match sched_ref.decide(&state) {
1274 WorkerDecision::Step => {
1275 let mut ctx = crate::node::StepContext::new(
1276 [&mut in_e_2_0],
1277 [] as [&mut NoQueue; 0],
1278 [&mut in_m_2_0],
1279 [] as [&mut StaticMemoryManager<(), 1>; 0],
1280 [pol_1],
1281 [],
1282 2u32,
1283 [2u32],
1284 [],
1285 clock_ref,
1286 &mut telem,
1287 );
1288 match n2.step(&mut ctx) {
1289 Ok(sr) => {
1290 state.last_step = Some(sr);
1291 state.last_error = false;
1292 }
1293 Err(e) => {
1294 ::std::eprintln!("run_scoped: node 2 step failed: {:?}", e);
1295 state.last_step = None;
1296 state.last_error = true;
1297 }
1298 }
1299 }
1300 WorkerDecision::WaitMicros(d) => {
1301 ::std::thread::sleep(::std::time::Duration::from_micros(d));
1302 state.last_step = None;
1303 state.last_error = false;
1304 }
1305 WorkerDecision::Stop => break,
1306 }
1307 }
1308 });
1309 });
1310 }
1311 }
1312
1313 impl<SrcClk: PlatformClock + Clone + Send + Sync + 'static> ScopedGraphApi<3, 3>
1315 for TestPipelineStd<SrcClk>
1316 where
1317 SrcNode<SrcClk>: Send,
1318 ConcurrentEdge: crate::edge::ScopedEdge,
1319 ConcMgr32: crate::memory::manager::ScopedManager<TestTensor>,
1320 {
1321 fn run_scoped<C, T, S>(&mut self, clock: C, telemetry: T, scheduler: S)
1322 where
1323 C: PlatformClock + Clone + Send + Sync + 'static,
1324 T: Telemetry + Clone + Send + 'static,
1325 S: WorkerScheduler + 'static,
1326 {
1327 self.run_scoped_impl(clock, telemetry, scheduler)
1328 }
1329 }
1330
1331 impl<SrcClk: PlatformClock + Send + 'static> GraphApi<3, 3> for TestPipelineStd<SrcClk> {
1333 #[inline]
1334 fn get_node_descriptors(&self) -> [NodeDescriptor; 3] {
1335 [
1336 self.nodes.0.descriptor(),
1337 self.nodes.1.descriptor(),
1338 self.nodes.2.descriptor(),
1339 ]
1340 }
1341
1342 #[inline]
1343 fn get_edge_descriptors(&self) -> [EdgeDescriptor; 3] {
1344 [
1345 EdgeDescriptor::new(
1346 EdgeIndex::from(0usize),
1347 PortId::new(EXTERNAL_INGRESS_NODE, PortIndex::from(0)),
1348 PortId::new(NodeIndex::from(0usize), PortIndex::from(0)),
1349 Some("ingress0"),
1350 ),
1351 self.edges.0.descriptor(),
1352 self.edges.1.descriptor(),
1353 ]
1354 }
1355
1356 #[inline]
1357 fn get_node_policies(&self) -> [NodePolicy; 3] {
1358 [
1359 self.nodes.0.policy(),
1360 self.nodes.1.policy(),
1361 self.nodes.2.policy(),
1362 ]
1363 }
1364
1365 #[inline]
1366 fn get_edge_policies(&self) -> [EdgePolicy; 3] {
1367 [
1368 self.nodes.0.node().source_ref().ingress_policy(),
1369 *self.edges.0.policy(),
1370 *self.edges.1.policy(),
1371 ]
1372 }
1373
1374 #[inline]
1375 fn edge_occupancy_for<const E: usize>(&self) -> Result<EdgeOccupancy, GraphError> {
1376 let occ = match E {
1377 0 => {
1378 let src = self.nodes.0.node().source_ref();
1379 src.ingress_occupancy()
1380 }
1381 1 => {
1382 let e = &self.edges.0;
1383 e.occupancy(e.policy())
1384 }
1385 2 => {
1386 let e = &self.edges.1;
1387 e.occupancy(e.policy())
1388 }
1389 _ => return Err(GraphError::InvalidEdgeIndex),
1390 };
1391 Ok(occ)
1392 }
1393
1394 #[inline]
1395 fn write_all_edge_occupancies(
1396 &self,
1397 out: &mut [EdgeOccupancy; 3],
1398 ) -> Result<(), GraphError> {
1399 out[0] = self.edge_occupancy_for::<0>()?;
1400 out[1] = self.edge_occupancy_for::<1>()?;
1401 out[2] = self.edge_occupancy_for::<2>()?;
1402 Ok(())
1403 }
1404
1405 #[inline]
1406 fn refresh_occupancies_for_node<const I: usize, const IN: usize, const OUT: usize>(
1407 &self,
1408 out: &mut [EdgeOccupancy; 3],
1409 ) -> Result<(), GraphError> {
1410 let node_idx = NodeIndex::from(I);
1411 for ed in self.get_edge_descriptors().iter() {
1412 if ed.upstream().node() == &node_idx || ed.downstream().node() == &node_idx {
1413 match ed.id().as_usize() {
1414 0 => out[0] = self.edge_occupancy_for::<0>()?,
1415 1 => out[1] = self.edge_occupancy_for::<1>()?,
1416 2 => out[2] = self.edge_occupancy_for::<2>()?,
1417 _ => return Err(GraphError::InvalidEdgeIndex),
1418 }
1419 }
1420 }
1421 Ok(())
1422 }
1423
1424 #[inline]
1425 fn step_node_by_index<C, T>(
1426 &mut self,
1427 index: usize,
1428 clock: &C,
1429 telemetry: &mut T,
1430 ) -> Result<StepResult, NodeError>
1431 where
1432 EdgePolicy: Copy,
1433 C: PlatformClock + Sized,
1434 T: Telemetry + Sized,
1435 {
1436 match index {
1437 0 => <Self as GraphNodeContextBuilder<0, 0, 1>>::with_node_and_step_context::<
1438 C,
1439 T,
1440 StepResult,
1441 NodeError,
1442 >(self, clock, telemetry, |node, ctx| node.step(ctx)),
1443 1 => <Self as GraphNodeContextBuilder<1, 1, 1>>::with_node_and_step_context::<
1444 C,
1445 T,
1446 StepResult,
1447 NodeError,
1448 >(self, clock, telemetry, |node, ctx| node.step(ctx)),
1449 2 => <Self as GraphNodeContextBuilder<2, 1, 0>>::with_node_and_step_context::<
1450 C,
1451 T,
1452 StepResult,
1453 NodeError,
1454 >(self, clock, telemetry, |node, ctx| node.step(ctx)),
1455 _ => unreachable!("invalid node index"),
1456 }
1457 }
1458 }
1459
1460 impl<SrcClk: PlatformClock + Send + 'static> GraphNodeAccess<0> for TestPipelineStd<SrcClk> {
1462 type Node = NodeLink<SrcNode<SrcClk>, 0, 1, (), TestTensor>;
1463 #[inline]
1464 fn node_ref(&self) -> &Self::Node {
1465 &self.nodes.0
1466 }
1467 #[inline]
1468 fn node_mut(&mut self) -> &mut Self::Node {
1469 &mut self.nodes.0
1470 }
1471 }
1472 impl<SrcClk: PlatformClock + Send + 'static> GraphNodeAccess<1> for TestPipelineStd<SrcClk> {
1473 type Node = NodeLink<MapNode, 1, 1, TestTensor, TestTensor>;
1474 #[inline]
1475 fn node_ref(&self) -> &Self::Node {
1476 &self.nodes.1
1477 }
1478 #[inline]
1479 fn node_mut(&mut self) -> &mut Self::Node {
1480 &mut self.nodes.1
1481 }
1482 }
1483 impl<SrcClk: PlatformClock + Send + 'static> GraphNodeAccess<2> for TestPipelineStd<SrcClk> {
1484 type Node = NodeLink<SnkNode, 1, 0, TestTensor, ()>;
1485 #[inline]
1486 fn node_ref(&self) -> &Self::Node {
1487 &self.nodes.2
1488 }
1489 #[inline]
1490 fn node_mut(&mut self) -> &mut Self::Node {
1491 &mut self.nodes.2
1492 }
1493 }
1494
1495 impl<SrcClk: PlatformClock + Send + 'static> GraphEdgeAccess<1> for TestPipelineStd<SrcClk> {
1497 type Edge = EdgeLink<ConcurrentEdge>;
1498 #[inline]
1499 fn edge_ref(&self) -> &Self::Edge {
1500 &self.edges.0
1501 }
1502 #[inline]
1503 fn edge_mut(&mut self) -> &mut Self::Edge {
1504 &mut self.edges.0
1505 }
1506 }
1507 impl<SrcClk: PlatformClock + Send + 'static> GraphEdgeAccess<2> for TestPipelineStd<SrcClk> {
1508 type Edge = EdgeLink<ConcurrentEdge>;
1509 #[inline]
1510 fn edge_ref(&self) -> &Self::Edge {
1511 &self.edges.1
1512 }
1513 #[inline]
1514 fn edge_mut(&mut self) -> &mut Self::Edge {
1515 &mut self.edges.1
1516 }
1517 }
1518
1519 impl<SrcClk: PlatformClock + Send + 'static> GraphNodeTypes<0, 0, 1> for TestPipelineStd<SrcClk> {
1521 type InP = ();
1522 type OutP = TestTensor;
1523 type InQ = NoQueue;
1524 type OutQ = ConcurrentEdge;
1525 type InM = StaticMemoryManager<(), 1>;
1526 type OutM = ConcMgr32;
1527 }
1528 impl<SrcClk: PlatformClock + Send + 'static> GraphNodeTypes<1, 1, 1> for TestPipelineStd<SrcClk> {
1529 type InP = TestTensor;
1530 type OutP = TestTensor;
1531 type InQ = ConcurrentEdge;
1532 type OutQ = ConcurrentEdge;
1533 type InM = ConcMgr32;
1534 type OutM = ConcMgr32;
1535 }
1536 impl<SrcClk: PlatformClock + Send + 'static> GraphNodeTypes<2, 1, 0> for TestPipelineStd<SrcClk> {
1537 type InP = TestTensor;
1538 type OutP = ();
1539 type InQ = ConcurrentEdge;
1540 type OutQ = NoQueue;
1541 type InM = ConcMgr32;
1542 type OutM = StaticMemoryManager<(), 1>;
1543 }
1544
1545 impl<SrcClk: PlatformClock + Send + 'static> GraphNodeContextBuilder<0, 0, 1>
1549 for TestPipelineStd<SrcClk>
1550 where
1551 Self: GraphNodeAccess<0, Node = NodeLink<SrcNode<SrcClk>, 0, 1, (), TestTensor>>,
1552 {
1553 #[inline]
1554 fn make_step_context<'graph, 'telemetry, 'clock, C, T>(
1555 &'graph mut self,
1556 clock: &'clock C,
1557 telemetry: &'telemetry mut T,
1558 ) -> StepContext<
1559 'graph,
1560 'telemetry,
1561 'clock,
1562 0,
1563 1,
1564 <Self as GraphNodeTypes<0, 0, 1>>::InP,
1565 <Self as GraphNodeTypes<0, 0, 1>>::OutP,
1566 <Self as GraphNodeTypes<0, 0, 1>>::InQ,
1567 <Self as GraphNodeTypes<0, 0, 1>>::OutQ,
1568 <Self as GraphNodeTypes<0, 0, 1>>::InM,
1569 <Self as GraphNodeTypes<0, 0, 1>>::OutM,
1570 C,
1571 T,
1572 >
1573 where
1574 EdgePolicy: Copy,
1575 C: PlatformClock + Sized,
1576 T: Telemetry + Sized,
1577 {
1578 let out0_policy = *self.edges.0.policy();
1579 StepContext::new(
1580 [],
1581 [self.edges.0.queue_mut()],
1582 [],
1583 [&mut self.managers.0],
1584 [],
1585 [out0_policy],
1586 0u32,
1587 [],
1588 [1u32],
1589 clock,
1590 telemetry,
1591 )
1592 }
1593
1594 #[inline]
1595 fn with_node_and_step_context<'telemetry, 'clock, C, T, R, E>(
1596 &mut self,
1597 clock: &'clock C,
1598 telemetry: &'telemetry mut T,
1599 f: impl FnOnce(
1600 &mut <Self as GraphNodeAccess<0>>::Node,
1601 &mut StepContext<
1602 '_,
1603 'telemetry,
1604 'clock,
1605 0,
1606 1,
1607 <Self as GraphNodeTypes<0, 0, 1>>::InP,
1608 <Self as GraphNodeTypes<0, 0, 1>>::OutP,
1609 <Self as GraphNodeTypes<0, 0, 1>>::InQ,
1610 <Self as GraphNodeTypes<0, 0, 1>>::OutQ,
1611 <Self as GraphNodeTypes<0, 0, 1>>::InM,
1612 <Self as GraphNodeTypes<0, 0, 1>>::OutM,
1613 C,
1614 T,
1615 >,
1616 ) -> Result<R, E>,
1617 ) -> Result<R, E>
1618 where
1619 Self: GraphNodeAccess<0>,
1620 EdgePolicy: Copy,
1621 C: PlatformClock + Sized,
1622 T: Telemetry + Sized,
1623 {
1624 let node = &mut self.nodes.0;
1625 let out0_policy = *self.edges.0.policy();
1626 let outputs = [self.edges.0.queue_mut()];
1627 let out_managers = [&mut self.managers.0];
1628 let mut ctx = StepContext::new(
1629 [],
1630 outputs,
1631 [],
1632 out_managers,
1633 [],
1634 [out0_policy],
1635 0u32,
1636 [],
1637 [1u32],
1638 clock,
1639 telemetry,
1640 );
1641 f(node, &mut ctx)
1642 }
1643 }
1644
1645 impl<SrcClk: PlatformClock + Send + 'static> GraphNodeContextBuilder<1, 1, 1>
1647 for TestPipelineStd<SrcClk>
1648 where
1649 Self: GraphNodeAccess<1, Node = NodeLink<MapNode, 1, 1, TestTensor, TestTensor>>,
1650 {
1651 #[inline]
1652 fn make_step_context<'graph, 'telemetry, 'clock, C, T>(
1653 &'graph mut self,
1654 clock: &'clock C,
1655 telemetry: &'telemetry mut T,
1656 ) -> StepContext<
1657 'graph,
1658 'telemetry,
1659 'clock,
1660 1,
1661 1,
1662 <Self as GraphNodeTypes<1, 1, 1>>::InP,
1663 <Self as GraphNodeTypes<1, 1, 1>>::OutP,
1664 <Self as GraphNodeTypes<1, 1, 1>>::InQ,
1665 <Self as GraphNodeTypes<1, 1, 1>>::OutQ,
1666 <Self as GraphNodeTypes<1, 1, 1>>::InM,
1667 <Self as GraphNodeTypes<1, 1, 1>>::OutM,
1668 C,
1669 T,
1670 >
1671 where
1672 EdgePolicy: Copy,
1673 C: PlatformClock + Sized,
1674 T: Telemetry + Sized,
1675 {
1676 let in0_policy = *self.edges.0.policy();
1677 let out1_policy = *self.edges.1.policy();
1678 StepContext::new(
1679 [self.edges.0.queue_mut()],
1680 [self.edges.1.queue_mut()],
1681 [&mut self.managers.0],
1682 [&mut self.managers.1],
1683 [in0_policy],
1684 [out1_policy],
1685 1u32,
1686 [1u32],
1687 [2u32],
1688 clock,
1689 telemetry,
1690 )
1691 }
1692
1693 #[inline]
1694 fn with_node_and_step_context<'telemetry, 'clock, C, T, R, E>(
1695 &mut self,
1696 clock: &'clock C,
1697 telemetry: &'telemetry mut T,
1698 f: impl FnOnce(
1699 &mut <Self as GraphNodeAccess<1>>::Node,
1700 &mut StepContext<
1701 '_,
1702 'telemetry,
1703 'clock,
1704 1,
1705 1,
1706 <Self as GraphNodeTypes<1, 1, 1>>::InP,
1707 <Self as GraphNodeTypes<1, 1, 1>>::OutP,
1708 <Self as GraphNodeTypes<1, 1, 1>>::InQ,
1709 <Self as GraphNodeTypes<1, 1, 1>>::OutQ,
1710 <Self as GraphNodeTypes<1, 1, 1>>::InM,
1711 <Self as GraphNodeTypes<1, 1, 1>>::OutM,
1712 C,
1713 T,
1714 >,
1715 ) -> Result<R, E>,
1716 ) -> Result<R, E>
1717 where
1718 Self: GraphNodeAccess<1>,
1719 EdgePolicy: Copy,
1720 C: PlatformClock + Sized,
1721 T: Telemetry + Sized,
1722 {
1723 let node = &mut self.nodes.1;
1724 let in0_policy = *self.edges.0.policy();
1725 let out1_policy = *self.edges.1.policy();
1726 let inputs = [self.edges.0.queue_mut()];
1727 let outputs = [self.edges.1.queue_mut()];
1728 let in_mgrs = [&mut self.managers.0];
1729 let out_mgrs = [&mut self.managers.1];
1730 let mut ctx = StepContext::new(
1731 inputs,
1732 outputs,
1733 in_mgrs,
1734 out_mgrs,
1735 [in0_policy],
1736 [out1_policy],
1737 1u32,
1738 [1u32],
1739 [2u32],
1740 clock,
1741 telemetry,
1742 );
1743 f(node, &mut ctx)
1744 }
1745 }
1746
1747 impl<SrcClk: PlatformClock + Send + 'static> GraphNodeContextBuilder<2, 1, 0>
1749 for TestPipelineStd<SrcClk>
1750 where
1751 Self: GraphNodeAccess<2, Node = NodeLink<SnkNode, 1, 0, TestTensor, ()>>,
1752 {
1753 #[inline]
1754 fn make_step_context<'graph, 'telemetry, 'clock, C, T>(
1755 &'graph mut self,
1756 clock: &'clock C,
1757 telemetry: &'telemetry mut T,
1758 ) -> StepContext<
1759 'graph,
1760 'telemetry,
1761 'clock,
1762 1,
1763 0,
1764 <Self as GraphNodeTypes<2, 1, 0>>::InP,
1765 <Self as GraphNodeTypes<2, 1, 0>>::OutP,
1766 <Self as GraphNodeTypes<2, 1, 0>>::InQ,
1767 <Self as GraphNodeTypes<2, 1, 0>>::OutQ,
1768 <Self as GraphNodeTypes<2, 1, 0>>::InM,
1769 <Self as GraphNodeTypes<2, 1, 0>>::OutM,
1770 C,
1771 T,
1772 >
1773 where
1774 EdgePolicy: Copy,
1775 C: PlatformClock + Sized,
1776 T: Telemetry + Sized,
1777 {
1778 let in1_policy = *self.edges.1.policy();
1779 StepContext::new(
1780 [self.edges.1.queue_mut()],
1781 [],
1782 [&mut self.managers.1],
1783 [],
1784 [in1_policy],
1785 [],
1786 2u32,
1787 [2u32],
1788 [],
1789 clock,
1790 telemetry,
1791 )
1792 }
1793
1794 #[inline]
1795 fn with_node_and_step_context<'telemetry, 'clock, C, T, R, E>(
1796 &mut self,
1797 clock: &'clock C,
1798 telemetry: &'telemetry mut T,
1799 f: impl FnOnce(
1800 &mut <Self as GraphNodeAccess<2>>::Node,
1801 &mut StepContext<
1802 '_,
1803 'telemetry,
1804 'clock,
1805 1,
1806 0,
1807 <Self as GraphNodeTypes<2, 1, 0>>::InP,
1808 <Self as GraphNodeTypes<2, 1, 0>>::OutP,
1809 <Self as GraphNodeTypes<2, 1, 0>>::InQ,
1810 <Self as GraphNodeTypes<2, 1, 0>>::OutQ,
1811 <Self as GraphNodeTypes<2, 1, 0>>::InM,
1812 <Self as GraphNodeTypes<2, 1, 0>>::OutM,
1813 C,
1814 T,
1815 >,
1816 ) -> Result<R, E>,
1817 ) -> Result<R, E>
1818 where
1819 Self: GraphNodeAccess<2>,
1820 EdgePolicy: Copy,
1821 C: PlatformClock + Sized,
1822 T: Telemetry + Sized,
1823 {
1824 let node = &mut self.nodes.2;
1825 let in1_policy = *self.edges.1.policy();
1826 let inputs = [self.edges.1.queue_mut()];
1827 let in_mgrs = [&mut self.managers.1];
1828 let mut ctx = StepContext::new(
1829 inputs,
1830 [],
1831 in_mgrs,
1832 [],
1833 [in1_policy],
1834 [],
1835 2u32,
1836 [2u32],
1837 [],
1838 clock,
1839 telemetry,
1840 );
1841 f(node, &mut ctx)
1842 }
1843 }
1844}