1use crate::{
4 edge::Edge,
5 errors::{NodeError, NodeErrorKind},
6 memory::PlacementAcceptance,
7 message::{payload::Payload, Message},
8 node::{Node, NodeCapabilities, NodeKind, ProcessResult, StepContext, StepResult},
9 policy::NodePolicy,
10 prelude::{
11 MemoryManager, NodeStepError, NodeStepTelemetry, PlatformClock, Telemetry, TelemetryEvent,
12 TelemetryKey, TelemetryKind,
13 },
14 types::{NodeIndex, PortId, PortIndex},
15};
16
17#[non_exhaustive]
37#[derive(Debug, Clone)]
38pub struct NodeLink<N, const IN: usize, const OUT: usize, InP, OutP>
39where
40 InP: Payload,
41 OutP: Payload,
42 N: Node<IN, OUT, InP, OutP>,
43{
44 node: N,
46
47 id: NodeIndex,
49
50 name: Option<&'static str>,
52
53 _payload_marker: core::marker::PhantomData<(InP, OutP)>,
57}
58
59impl<N, const IN: usize, const OUT: usize, InP, OutP> NodeLink<N, IN, OUT, InP, OutP>
60where
61 InP: Payload,
62 OutP: Payload,
63 N: Node<IN, OUT, InP, OutP>,
64{
65 pub fn new(node: N, id: NodeIndex, name: Option<&'static str>) -> Self {
72 Self {
73 node,
74 id,
75 name,
76 _payload_marker: core::marker::PhantomData,
77 }
78 }
79
80 #[inline]
82 pub fn node(&self) -> &N {
83 &self.node
84 }
85
86 #[inline]
88 pub fn node_mut(&mut self) -> &mut N {
89 &mut self.node
90 }
91
92 #[inline]
94 pub fn id(&self) -> NodeIndex {
95 self.id
96 }
97
98 #[inline]
100 pub fn input_port_ids(&self) -> [PortId; IN] {
101 core::array::from_fn(|i| PortId::new(self.id, PortIndex::new(i)))
102 }
103
104 #[inline]
106 pub fn output_port_ids(&self) -> [PortId; OUT] {
107 core::array::from_fn(|i| PortId::new(self.id, PortIndex::new(i)))
108 }
109
110 pub fn policy(&self) -> NodePolicy {
112 self.node.policy()
113 }
114
115 #[inline]
117 pub fn name(&self) -> Option<&'static str> {
118 self.name
119 }
120
121 #[inline]
123 pub fn descriptor(&self) -> NodeDescriptor {
124 NodeDescriptor {
125 id: self.id(),
126 kind: self.node.node_kind(),
127 in_ports: IN as u16,
128 out_ports: OUT as u16,
129 name: self.name(),
130 }
131 }
132}
133
134impl<N, const IN: usize, const OUT: usize, InP, OutP> Node<IN, OUT, InP, OutP>
135 for NodeLink<N, IN, OUT, InP, OutP>
136where
137 InP: Payload,
138 OutP: Payload,
139 N: Node<IN, OUT, InP, OutP>,
140{
141 #[inline]
142 fn describe_capabilities(&self) -> NodeCapabilities {
143 self.node.describe_capabilities()
144 }
145
146 #[inline]
147 fn input_acceptance(&self) -> [PlacementAcceptance; IN] {
148 self.node.input_acceptance()
149 }
150
151 #[inline]
152 fn output_acceptance(&self) -> [PlacementAcceptance; OUT] {
153 self.node.output_acceptance()
154 }
155
156 #[inline]
157 fn policy(&self) -> NodePolicy {
158 self.node.policy()
159 }
160
161 #[cfg(any(test, feature = "bench"))]
163 fn set_policy(&mut self, policy: NodePolicy) {
164 self.node.set_policy(policy);
165 }
166
167 #[inline]
168 fn node_kind(&self) -> NodeKind {
169 self.node.node_kind()
170 }
171
172 #[inline]
173 fn initialize<C, T>(&mut self, clock: &C, telemetry: &mut T) -> Result<(), NodeError>
174 where
175 T: Telemetry,
176 {
177 self.node.initialize(clock, telemetry)
178 }
179
180 #[inline]
181 fn start<C, T>(&mut self, clock: &C, telemetry: &mut T) -> Result<(), NodeError>
182 where
183 T: Telemetry,
184 {
185 self.node.start(clock, telemetry)
186 }
187
188 fn process_message<C>(
189 &mut self,
190 msg: &Message<InP>,
191 sys_clock: &C,
192 ) -> Result<ProcessResult<OutP>, NodeError>
193 where
194 C: PlatformClock + Sized,
195 {
196 self.node.process_message(msg, sys_clock)
197 }
198
199 #[inline]
200 fn step<'graph, 'telemetry, 'clock, InQ, OutQ, InM, OutM, C, T>(
201 &mut self,
202 ctx: &mut StepContext<
203 'graph,
204 'telemetry,
205 'clock,
206 IN,
207 OUT,
208 InP,
209 OutP,
210 InQ,
211 OutQ,
212 InM,
213 OutM,
214 C,
215 T,
216 >,
217 ) -> Result<StepResult, NodeError>
218 where
219 InQ: Edge,
220 OutQ: Edge,
221 InM: MemoryManager<InP>,
222 OutM: MemoryManager<OutP>,
223 C: PlatformClock + Sized,
224 T: Telemetry + Sized,
225 {
226 let policy = self.node.policy();
228 let batching_enabled = {
229 let nb = policy.batching();
230
231 (nb.fixed_n().unwrap_or(1) > 1) || nb.max_delta_t().is_some()
232 };
233
234 if !T::METRICS_ENABLED {
237 if batching_enabled {
238 return self.node.step_batch(ctx);
239 } else {
240 return self.node.step(ctx);
241 }
242 }
243
244 const GRAPH_ID: crate::telemetry::GraphInstanceId = 0;
246
247 let policy = self.node.policy();
249 let budget_policy = policy.budget();
250 let deadline_policy = policy.deadline();
251
252 let timestamp_start_ns = ctx.now_nanos();
254
255 let result = if batching_enabled {
256 self.node.step_batch(ctx)
257 } else {
258 self.node.step(ctx)
259 };
260
261 let timestamp_end_ns = ctx.now_nanos();
262 let duration_ns = timestamp_end_ns.saturating_sub(timestamp_start_ns);
263
264 let mut budget_ns_opt: Option<u64> = None;
267
268 if let Some(default_deadline_ns) = deadline_policy.default_deadline_ns() {
269 budget_ns_opt = Some(*default_deadline_ns.as_u64());
270 } else if let Some(tick_budget) = budget_policy.tick_budget() {
271 let budget_ns = ctx.ticks_to_nanos(*tick_budget);
272 budget_ns_opt = Some(budget_ns);
273 }
274
275 let slack_ns: u64 = match deadline_policy.slack_tolerance_ns() {
276 Some(slack) => *slack.as_u64(),
277 None => 0,
278 };
279
280 let mut deadline_ns: Option<u64> = None;
281 let mut deadline_missed = false;
282
283 if let Some(budget_ns) = budget_ns_opt {
284 deadline_ns = Some(timestamp_start_ns.saturating_add(budget_ns));
286
287 if duration_ns > budget_ns.saturating_add(slack_ns) {
289 deadline_missed = true;
290 }
291 }
292
293 let telemetry = ctx.telemetry_mut();
297
298 telemetry.record_latency_ns(
301 TelemetryKey::node(*self.id.as_usize() as u32, TelemetryKind::Latency),
302 duration_ns,
303 );
304
305 if deadline_missed {
307 telemetry.incr_counter(
308 TelemetryKey::node(*self.id.as_usize() as u32, TelemetryKind::DeadlineMiss),
309 1,
310 );
311 }
312
313 if let Ok(step_result) = &result {
315 use crate::node::StepResult::*;
316 match step_result {
317 MadeProgress | Terminal | YieldUntil(_) => {
318 telemetry.incr_counter(
319 TelemetryKey::node(*self.id.as_usize() as u32, TelemetryKind::Processed),
320 policy.batching().fixed_n().unwrap_or(1) as u64,
321 );
322 }
323 NoInput | Backpressured | WaitingOnExternal => {
324 }
326 }
327 }
328
329 if T::EVENTS_STATICALLY_ENABLED && telemetry.events_enabled() {
331 let error_kind = match &result {
332 Ok(step_result) => {
333 use crate::node::StepResult::*;
334 match step_result {
335 NoInput => Some(NodeStepError::NoInput),
336 Backpressured => Some(NodeStepError::Backpressured),
337 WaitingOnExternal => Some(NodeStepError::ExternalUnavailable),
338 MadeProgress | Terminal | YieldUntil(_) => {
341 if deadline_missed {
342 Some(NodeStepError::OverBudget)
343 } else {
344 None
345 }
346 }
347 }
348 }
349 Err(error) => {
350 Some(match error.kind() {
351 NodeErrorKind::NoInput => NodeStepError::NoInput,
352 NodeErrorKind::Backpressured => NodeStepError::Backpressured,
353 _ => NodeStepError::ExecutionFailed,
355 })
356 }
357 };
358
359 let event = TelemetryEvent::node_step(NodeStepTelemetry::new(
360 GRAPH_ID,
361 self.id,
362 self.name,
363 timestamp_start_ns,
364 timestamp_end_ns,
365 duration_ns,
366 policy.batching().fixed_n().unwrap_or(1) as u64,
367 deadline_ns,
368 deadline_missed,
369 error_kind,
370 ));
371
372 telemetry.push_event(event);
373 }
374
375 result
376 }
377
378 fn step_batch<'graph, 'telemetry, 'clock, InQ, OutQ, InM, OutM, C, T>(
379 &mut self,
380 ctx: &mut StepContext<
381 'graph,
382 'telemetry,
383 'clock,
384 IN,
385 OUT,
386 InP,
387 OutP,
388 InQ,
389 OutQ,
390 InM,
391 OutM,
392 C,
393 T,
394 >,
395 ) -> Result<StepResult, NodeError>
396 where
397 InQ: Edge,
398 OutQ: Edge,
399 InM: MemoryManager<InP>,
400 OutM: MemoryManager<OutP>,
401 C: PlatformClock + Sized,
402 T: Telemetry + Sized,
403 {
404 if !T::METRICS_ENABLED {
407 return self.node.step_batch(ctx);
408 }
409
410 const GRAPH_ID: crate::telemetry::GraphInstanceId = 0;
412
413 let policy = self.node.policy();
415 let budget_policy = policy.budget();
416 let deadline_policy = policy.deadline();
417
418 let timestamp_start_ns = ctx.now_nanos();
420
421 let result = self.node.step_batch(ctx);
422
423 let timestamp_end_ns = ctx.now_nanos();
424 let duration_ns = timestamp_end_ns.saturating_sub(timestamp_start_ns);
425
426 let mut budget_ns_opt: Option<u64> = None;
429
430 if let Some(default_deadline_ns) = deadline_policy.default_deadline_ns() {
431 budget_ns_opt = Some(*default_deadline_ns.as_u64());
432 } else if let Some(tick_budget) = budget_policy.tick_budget() {
433 let budget_ns = ctx.ticks_to_nanos(*tick_budget);
434 budget_ns_opt = Some(budget_ns);
435 }
436
437 let slack_ns: u64 = match deadline_policy.slack_tolerance_ns() {
438 Some(slack) => *slack.as_u64(),
439 None => 0,
440 };
441
442 let mut deadline_ns: Option<u64> = None;
443 let mut deadline_missed = false;
444
445 if let Some(budget_ns) = budget_ns_opt {
446 deadline_ns = Some(timestamp_start_ns.saturating_add(budget_ns));
448
449 if duration_ns > budget_ns.saturating_add(slack_ns) {
451 deadline_missed = true;
452 }
453 }
454
455 let telemetry = ctx.telemetry_mut();
459
460 telemetry.record_latency_ns(
463 TelemetryKey::node(*self.id.as_usize() as u32, TelemetryKind::Latency),
464 duration_ns,
465 );
466
467 if deadline_missed {
469 telemetry.incr_counter(
470 TelemetryKey::node(*self.id.as_usize() as u32, TelemetryKind::DeadlineMiss),
471 1,
472 );
473 }
474
475 if let Ok(step_result) = &result {
477 use crate::node::StepResult::*;
478 match step_result {
479 MadeProgress | Terminal | YieldUntil(_) => {
480 telemetry.incr_counter(
481 TelemetryKey::node(*self.id.as_usize() as u32, TelemetryKind::Processed),
482 policy.batching().fixed_n().unwrap_or(1) as u64,
483 );
484 }
485 NoInput | Backpressured | WaitingOnExternal => {
486 }
488 }
489 }
490
491 if T::EVENTS_STATICALLY_ENABLED && telemetry.events_enabled() {
493 let error_kind = match &result {
494 Ok(step_result) => {
495 use crate::node::StepResult::*;
496 match step_result {
497 NoInput => Some(NodeStepError::NoInput),
498 Backpressured => Some(NodeStepError::Backpressured),
499 WaitingOnExternal => Some(NodeStepError::ExternalUnavailable),
500 MadeProgress | Terminal | YieldUntil(_) => {
503 if deadline_missed {
504 Some(NodeStepError::OverBudget)
505 } else {
506 None
507 }
508 }
509 }
510 }
511 Err(error) => {
512 Some(match error.kind() {
513 NodeErrorKind::NoInput => NodeStepError::NoInput,
514 NodeErrorKind::Backpressured => NodeStepError::Backpressured,
515 _ => NodeStepError::ExecutionFailed,
517 })
518 }
519 };
520
521 let event = TelemetryEvent::node_step(NodeStepTelemetry::new(
522 GRAPH_ID,
523 self.id,
524 self.name,
525 timestamp_start_ns,
526 timestamp_end_ns,
527 duration_ns,
528 policy.batching().fixed_n().unwrap_or(1) as u64,
529 deadline_ns,
530 deadline_missed,
531 error_kind,
532 ));
533
534 telemetry.push_event(event);
535 }
536
537 result
538 }
539
540 #[inline]
541 fn on_watchdog_timeout<C, T>(
542 &mut self,
543 clock: &C,
544 telemetry: &mut T,
545 ) -> Result<StepResult, NodeError>
546 where
547 C: PlatformClock + Sized,
548 T: Telemetry,
549 {
550 self.node.on_watchdog_timeout(clock, telemetry)
551 }
552
553 #[inline]
554 fn stop<C, T>(&mut self, clock: &C, telemetry: &mut T) -> Result<(), NodeError>
555 where
556 T: Telemetry,
557 {
558 self.node.stop(clock, telemetry)
559 }
560}
561
562#[non_exhaustive]
568#[derive(Debug, Clone)]
569pub struct NodeDescriptor {
570 id: NodeIndex,
572 kind: NodeKind,
574 in_ports: u16,
576 out_ports: u16,
578 name: Option<&'static str>,
580}
581
582impl NodeDescriptor {
583 #[inline]
585 pub fn new(
586 id: NodeIndex,
587 kind: NodeKind,
588 in_ports: u16,
589 out_ports: u16,
590 name: Option<&'static str>,
591 ) -> Self {
592 Self {
593 id,
594 kind,
595 in_ports,
596 out_ports,
597 name,
598 }
599 }
600
601 #[inline]
603 pub fn id(&self) -> &NodeIndex {
604 &self.id
605 }
606
607 #[inline]
609 pub fn kind(&self) -> &NodeKind {
610 &self.kind
611 }
612
613 #[inline]
615 pub fn in_ports(&self) -> &u16 {
616 &self.in_ports
617 }
618
619 #[inline]
621 pub fn out_ports(&self) -> &u16 {
622 &self.out_ports
623 }
624
625 #[inline]
627 pub fn name(&self) -> Option<&'static str> {
628 self.name
629 }
630}