Skip to main content

limen_core/node/
link.rs

1//! Node graph-link descriptor types.
2
3use crate::{
4    edge::Edge,
5    errors::{NodeError, NodeErrorKind},
6    memory::PlacementAcceptance,
7    message::{payload::Payload, Message},
8    node::{Node, NodeCapabilities, NodeKind, ProcessResult, StepContext, StepResult},
9    policy::NodePolicy,
10    prelude::{
11        MemoryManager, NodeStepError, NodeStepTelemetry, PlatformClock, Telemetry, TelemetryEvent,
12        TelemetryKey, TelemetryKind,
13    },
14    types::{NodeIndex, PortId, PortIndex},
15};
16
17/// A lightweight descriptor that **links to** a concrete node instance and records its
18/// static topology and policy metadata.
19///
20/// Unlike a pure descriptor, `NodeLink` **owns** the concrete node instance (`N`)
21/// and records its identity, kind, port counts, policy, and optional name for graph
22/// construction, scheduling, diagnostics, and tooling. It exposes `&N` and `&mut N`
23/// accessors so runtimes can operate on the live node.
24///
25/// # Type Parameters
26/// - `'a`: Lifetime of the borrowed node reference. The descriptor cannot outlive the node.
27/// - `N`: Concrete node type implementing `Node<IN, OUT, InP, OutP>`.
28/// - `IN`: Compile-time number of input ports for the node.
29/// - `OUT`: Compile-time number of output ports for the node.
30/// - `InP`: Input payload type (must implement `Payload`).
31/// - `OutP`: Output payload type (must implement `Payload`).
32///
33/// # Invariants
34/// Callers should ensure `in_ports == IN as u16` and `out_ports == OUT as u16` so the
35/// stored counts are consistent with the node’s const-generic port arity.
36#[non_exhaustive]
37#[derive(Debug, Clone)]
38pub struct NodeLink<N, const IN: usize, const OUT: usize, InP, OutP>
39where
40    InP: Payload,
41    OutP: Payload,
42    N: Node<IN, OUT, InP, OutP>,
43{
44    /// Owned handle to the concrete node instance.
45    node: N,
46
47    /// Unique identifier of this node within the graph.
48    id: NodeIndex,
49
50    /// Optional static name used for diagnostics or graph tooling.
51    name: Option<&'static str>,
52
53    /// Marker to bind `InP` and `OutP` into the type without storing values.
54    ///
55    /// This has zero runtime cost and exists solely for type tracking.
56    _payload_marker: core::marker::PhantomData<(InP, OutP)>,
57}
58
59impl<N, const IN: usize, const OUT: usize, InP, OutP> NodeLink<N, IN, OUT, InP, OutP>
60where
61    InP: Payload,
62    OutP: Payload,
63    N: Node<IN, OUT, InP, OutP>,
64{
65    /// Construct a new `NodeLink` that borrows the given node and records its metadata.
66    ///
67    /// # Parameters
68    /// - `node`: Borrowed reference to the concrete node instance.
69    /// - `id`: Unique identifier of the node in the graph.
70    /// - `name`: Optional static name for diagnostics or tooling.
71    pub fn new(node: N, id: NodeIndex, name: Option<&'static str>) -> Self {
72        Self {
73            node,
74            id,
75            name,
76            _payload_marker: core::marker::PhantomData,
77        }
78    }
79
80    /// Get a reference to the inner node.
81    #[inline]
82    pub fn node(&self) -> &N {
83        &self.node
84    }
85
86    /// Get a mutable reference to the inner node.
87    #[inline]
88    pub fn node_mut(&mut self) -> &mut N {
89        &mut self.node
90    }
91
92    /// Get the unique identifier of this node.
93    #[inline]
94    pub fn id(&self) -> NodeIndex {
95        self.id
96    }
97
98    /// Returns the input port ids for the node.
99    #[inline]
100    pub fn input_port_ids(&self) -> [PortId; IN] {
101        core::array::from_fn(|i| PortId::new(self.id, PortIndex::new(i)))
102    }
103
104    /// Returns the input port ids for the node.
105    #[inline]
106    pub fn output_port_ids(&self) -> [PortId; OUT] {
107        core::array::from_fn(|i| PortId::new(self.id, PortIndex::new(i)))
108    }
109
110    /// Return the node's policy bundle.
111    pub fn policy(&self) -> NodePolicy {
112        self.node.policy()
113    }
114
115    /// Get the optional static name of this node.
116    #[inline]
117    pub fn name(&self) -> Option<&'static str> {
118        self.name
119    }
120
121    /// Return the `NodeDescriptor` for this `NodeLink`.
122    #[inline]
123    pub fn descriptor(&self) -> NodeDescriptor {
124        NodeDescriptor {
125            id: self.id(),
126            kind: self.node.node_kind(),
127            in_ports: IN as u16,
128            out_ports: OUT as u16,
129            name: self.name(),
130        }
131    }
132}
133
134impl<N, const IN: usize, const OUT: usize, InP, OutP> Node<IN, OUT, InP, OutP>
135    for NodeLink<N, IN, OUT, InP, OutP>
136where
137    InP: Payload,
138    OutP: Payload,
139    N: Node<IN, OUT, InP, OutP>,
140{
141    #[inline]
142    fn describe_capabilities(&self) -> NodeCapabilities {
143        self.node.describe_capabilities()
144    }
145
146    #[inline]
147    fn input_acceptance(&self) -> [PlacementAcceptance; IN] {
148        self.node.input_acceptance()
149    }
150
151    #[inline]
152    fn output_acceptance(&self) -> [PlacementAcceptance; OUT] {
153        self.node.output_acceptance()
154    }
155
156    #[inline]
157    fn policy(&self) -> NodePolicy {
158        self.node.policy()
159    }
160
161    /// **TEST ONLY** method used to override batching policis for node contract tests.
162    #[cfg(any(test, feature = "bench"))]
163    fn set_policy(&mut self, policy: NodePolicy) {
164        self.node.set_policy(policy);
165    }
166
167    #[inline]
168    fn node_kind(&self) -> NodeKind {
169        self.node.node_kind()
170    }
171
172    #[inline]
173    fn initialize<C, T>(&mut self, clock: &C, telemetry: &mut T) -> Result<(), NodeError>
174    where
175        T: Telemetry,
176    {
177        self.node.initialize(clock, telemetry)
178    }
179
180    #[inline]
181    fn start<C, T>(&mut self, clock: &C, telemetry: &mut T) -> Result<(), NodeError>
182    where
183        T: Telemetry,
184    {
185        self.node.start(clock, telemetry)
186    }
187
188    fn process_message<C>(
189        &mut self,
190        msg: &Message<InP>,
191        sys_clock: &C,
192    ) -> Result<ProcessResult<OutP>, NodeError>
193    where
194        C: PlatformClock + Sized,
195    {
196        self.node.process_message(msg, sys_clock)
197    }
198
199    #[inline]
200    fn step<'graph, 'telemetry, 'clock, InQ, OutQ, InM, OutM, C, T>(
201        &mut self,
202        ctx: &mut StepContext<
203            'graph,
204            'telemetry,
205            'clock,
206            IN,
207            OUT,
208            InP,
209            OutP,
210            InQ,
211            OutQ,
212            InM,
213            OutM,
214            C,
215            T,
216        >,
217    ) -> Result<StepResult, NodeError>
218    where
219        InQ: Edge,
220        OutQ: Edge,
221        InM: MemoryManager<InP>,
222        OutM: MemoryManager<OutP>,
223        C: PlatformClock + Sized,
224        T: Telemetry + Sized,
225    {
226        // Determine whether the node's policy indicates batch-mode behavior.
227        let policy = self.node.policy();
228        let batching_enabled = {
229            let nb = policy.batching();
230
231            (nb.fixed_n().unwrap_or(1) > 1) || nb.max_delta_t().is_some()
232        };
233
234        // If metrics are completely disabled for this Telemetry type, delegate to the
235        // appropriate node entrypoint (batch vs single-message).
236        if !T::METRICS_ENABLED {
237            if batching_enabled {
238                return self.node.step_batch(ctx);
239            } else {
240                return self.node.step(ctx);
241            }
242        }
243
244        // For now we keep a single graph instance identifier, as in the runtime.
245        const GRAPH_ID: crate::telemetry::GraphInstanceId = 0;
246
247        // Cache static policy (copy) for deadline/budget checks.
248        let policy = self.node.policy();
249        let budget_policy = policy.budget();
250        let deadline_policy = policy.deadline();
251
252        // ---- Execute node step + measure latency ----
253        let timestamp_start_ns = ctx.now_nanos();
254
255        let result = if batching_enabled {
256            self.node.step_batch(ctx)
257        } else {
258            self.node.step(ctx)
259        };
260
261        let timestamp_end_ns = ctx.now_nanos();
262        let duration_ns = timestamp_end_ns.saturating_sub(timestamp_start_ns);
263
264        // ---- Compute deadline budget in nanoseconds (duration-based) ----
265
266        let mut budget_ns_opt: Option<u64> = None;
267
268        if let Some(default_deadline_ns) = deadline_policy.default_deadline_ns() {
269            budget_ns_opt = Some(*default_deadline_ns.as_u64());
270        } else if let Some(tick_budget) = budget_policy.tick_budget() {
271            let budget_ns = ctx.ticks_to_nanos(*tick_budget);
272            budget_ns_opt = Some(budget_ns);
273        }
274
275        let slack_ns: u64 = match deadline_policy.slack_tolerance_ns() {
276            Some(slack) => *slack.as_u64(),
277            None => 0,
278        };
279
280        let mut deadline_ns: Option<u64> = None;
281        let mut deadline_missed = false;
282
283        if let Some(budget_ns) = budget_ns_opt {
284            // Represent this as an absolute deadline in the event.
285            deadline_ns = Some(timestamp_start_ns.saturating_add(budget_ns));
286
287            // Pure duration-based miss check, incorporating slack.
288            if duration_ns > budget_ns.saturating_add(slack_ns) {
289                deadline_missed = true;
290            }
291        }
292
293        // ---- Telemetry updates (latency, processed, deadline, NodeStep event) ----
294
295        // Access the telemetry sink from the context.
296        let telemetry = ctx.telemetry_mut();
297
298        // Latency metric (per node, per step).
299        // This assumes `NodeIndex` is a tuple struct where `.0` yields a numeric index.
300        telemetry.record_latency_ns(
301            TelemetryKey::node(*self.id.as_usize() as u32, TelemetryKind::Latency),
302            duration_ns,
303        );
304
305        // Deadline miss counter (only if we computed a budget and exceeded it).
306        if deadline_missed {
307            telemetry.incr_counter(
308                TelemetryKey::node(*self.id.as_usize() as u32, TelemetryKind::DeadlineMiss),
309                1,
310            );
311        }
312
313        // Processed counter: count *steps* that actually made progress / completed.
314        if let Ok(step_result) = &result {
315            use crate::node::StepResult::*;
316            match step_result {
317                MadeProgress | Terminal | YieldUntil(_) => {
318                    telemetry.incr_counter(
319                        TelemetryKey::node(*self.id.as_usize() as u32, TelemetryKind::Processed),
320                        policy.batching().fixed_n().unwrap_or(1) as u64,
321                    );
322                }
323                NoInput | Backpressured | WaitingOnExternal => {
324                    // Not counted as processed.
325                }
326            }
327        }
328
329        // Optional structured NodeStep event.
330        if T::EVENTS_STATICALLY_ENABLED && telemetry.events_enabled() {
331            let error_kind = match &result {
332                Ok(step_result) => {
333                    use crate::node::StepResult::*;
334                    match step_result {
335                        NoInput => Some(NodeStepError::NoInput),
336                        Backpressured => Some(NodeStepError::Backpressured),
337                        WaitingOnExternal => Some(NodeStepError::ExternalUnavailable),
338                        // For progress/terminal/yield, only flag OverBudget if we
339                        // actually missed the duration-based deadline.
340                        MadeProgress | Terminal | YieldUntil(_) => {
341                            if deadline_missed {
342                                Some(NodeStepError::OverBudget)
343                            } else {
344                                None
345                            }
346                        }
347                    }
348                }
349                Err(error) => {
350                    Some(match error.kind() {
351                        NodeErrorKind::NoInput => NodeStepError::NoInput,
352                        NodeErrorKind::Backpressured => NodeStepError::Backpressured,
353                        // Any other error kind is treated as a generic execution failure.
354                        _ => NodeStepError::ExecutionFailed,
355                    })
356                }
357            };
358
359            let event = TelemetryEvent::node_step(NodeStepTelemetry::new(
360                GRAPH_ID,
361                self.id,
362                self.name,
363                timestamp_start_ns,
364                timestamp_end_ns,
365                duration_ns,
366                policy.batching().fixed_n().unwrap_or(1) as u64,
367                deadline_ns,
368                deadline_missed,
369                error_kind,
370            ));
371
372            telemetry.push_event(event);
373        }
374
375        result
376    }
377
378    fn step_batch<'graph, 'telemetry, 'clock, InQ, OutQ, InM, OutM, C, T>(
379        &mut self,
380        ctx: &mut StepContext<
381            'graph,
382            'telemetry,
383            'clock,
384            IN,
385            OUT,
386            InP,
387            OutP,
388            InQ,
389            OutQ,
390            InM,
391            OutM,
392            C,
393            T,
394        >,
395    ) -> Result<StepResult, NodeError>
396    where
397        InQ: Edge,
398        OutQ: Edge,
399        InM: MemoryManager<InP>,
400        OutM: MemoryManager<OutP>,
401        C: PlatformClock + Sized,
402        T: Telemetry + Sized,
403    {
404        // If metrics are completely disabled for this Telemetry type, delegate to the
405        // appropriate node entrypoint (batch vs single-message).
406        if !T::METRICS_ENABLED {
407            return self.node.step_batch(ctx);
408        }
409
410        // For now we keep a single graph instance identifier, as in the runtime.
411        const GRAPH_ID: crate::telemetry::GraphInstanceId = 0;
412
413        // Cache static policy (copy) for deadline/budget checks.
414        let policy = self.node.policy();
415        let budget_policy = policy.budget();
416        let deadline_policy = policy.deadline();
417
418        // ---- Execute node step + measure latency ----
419        let timestamp_start_ns = ctx.now_nanos();
420
421        let result = self.node.step_batch(ctx);
422
423        let timestamp_end_ns = ctx.now_nanos();
424        let duration_ns = timestamp_end_ns.saturating_sub(timestamp_start_ns);
425
426        // ---- Compute deadline budget in nanoseconds (duration-based) ----
427
428        let mut budget_ns_opt: Option<u64> = None;
429
430        if let Some(default_deadline_ns) = deadline_policy.default_deadline_ns() {
431            budget_ns_opt = Some(*default_deadline_ns.as_u64());
432        } else if let Some(tick_budget) = budget_policy.tick_budget() {
433            let budget_ns = ctx.ticks_to_nanos(*tick_budget);
434            budget_ns_opt = Some(budget_ns);
435        }
436
437        let slack_ns: u64 = match deadline_policy.slack_tolerance_ns() {
438            Some(slack) => *slack.as_u64(),
439            None => 0,
440        };
441
442        let mut deadline_ns: Option<u64> = None;
443        let mut deadline_missed = false;
444
445        if let Some(budget_ns) = budget_ns_opt {
446            // Represent this as an absolute deadline in the event.
447            deadline_ns = Some(timestamp_start_ns.saturating_add(budget_ns));
448
449            // Pure duration-based miss check, incorporating slack.
450            if duration_ns > budget_ns.saturating_add(slack_ns) {
451                deadline_missed = true;
452            }
453        }
454
455        // ---- Telemetry updates (latency, processed, deadline, NodeStep event) ----
456
457        // Access the telemetry sink from the context.
458        let telemetry = ctx.telemetry_mut();
459
460        // Latency metric (per node, per step).
461        // This assumes `NodeIndex` is a tuple struct where `.0` yields a numeric index.
462        telemetry.record_latency_ns(
463            TelemetryKey::node(*self.id.as_usize() as u32, TelemetryKind::Latency),
464            duration_ns,
465        );
466
467        // Deadline miss counter (only if we computed a budget and exceeded it).
468        if deadline_missed {
469            telemetry.incr_counter(
470                TelemetryKey::node(*self.id.as_usize() as u32, TelemetryKind::DeadlineMiss),
471                1,
472            );
473        }
474
475        // Processed counter: count *steps* that actually made progress / completed.
476        if let Ok(step_result) = &result {
477            use crate::node::StepResult::*;
478            match step_result {
479                MadeProgress | Terminal | YieldUntil(_) => {
480                    telemetry.incr_counter(
481                        TelemetryKey::node(*self.id.as_usize() as u32, TelemetryKind::Processed),
482                        policy.batching().fixed_n().unwrap_or(1) as u64,
483                    );
484                }
485                NoInput | Backpressured | WaitingOnExternal => {
486                    // Not counted as processed.
487                }
488            }
489        }
490
491        // Optional structured NodeStep event.
492        if T::EVENTS_STATICALLY_ENABLED && telemetry.events_enabled() {
493            let error_kind = match &result {
494                Ok(step_result) => {
495                    use crate::node::StepResult::*;
496                    match step_result {
497                        NoInput => Some(NodeStepError::NoInput),
498                        Backpressured => Some(NodeStepError::Backpressured),
499                        WaitingOnExternal => Some(NodeStepError::ExternalUnavailable),
500                        // For progress/terminal/yield, only flag OverBudget if we
501                        // actually missed the duration-based deadline.
502                        MadeProgress | Terminal | YieldUntil(_) => {
503                            if deadline_missed {
504                                Some(NodeStepError::OverBudget)
505                            } else {
506                                None
507                            }
508                        }
509                    }
510                }
511                Err(error) => {
512                    Some(match error.kind() {
513                        NodeErrorKind::NoInput => NodeStepError::NoInput,
514                        NodeErrorKind::Backpressured => NodeStepError::Backpressured,
515                        // Any other error kind is treated as a generic execution failure.
516                        _ => NodeStepError::ExecutionFailed,
517                    })
518                }
519            };
520
521            let event = TelemetryEvent::node_step(NodeStepTelemetry::new(
522                GRAPH_ID,
523                self.id,
524                self.name,
525                timestamp_start_ns,
526                timestamp_end_ns,
527                duration_ns,
528                policy.batching().fixed_n().unwrap_or(1) as u64,
529                deadline_ns,
530                deadline_missed,
531                error_kind,
532            ));
533
534            telemetry.push_event(event);
535        }
536
537        result
538    }
539
540    #[inline]
541    fn on_watchdog_timeout<C, T>(
542        &mut self,
543        clock: &C,
544        telemetry: &mut T,
545    ) -> Result<StepResult, NodeError>
546    where
547        C: PlatformClock + Sized,
548        T: Telemetry,
549    {
550        self.node.on_watchdog_timeout(clock, telemetry)
551    }
552
553    #[inline]
554    fn stop<C, T>(&mut self, clock: &C, telemetry: &mut T) -> Result<(), NodeError>
555    where
556        T: Telemetry,
557    {
558        self.node.stop(clock, telemetry)
559    }
560}
561
562/// A node descriptor: topology and policy metadata, without an executable instance.
563///
564/// `NodeDescriptor` captures static configuration of a node in the graph:
565/// its identity, kind, port counts, policy, and an optional name.
566/// It does not hold runtime state or implementation details.
567#[non_exhaustive]
568#[derive(Debug, Clone)]
569pub struct NodeDescriptor {
570    /// Unique identifier of this node in the graph.
571    id: NodeIndex,
572    /// High-level category of the node (source, process, sink, etc).
573    kind: NodeKind,
574    /// Number of input ports declared by this node.
575    in_ports: u16,
576    /// Number of output ports declared by this node.
577    out_ports: u16,
578    /// Optional static name (for diagnostics or graph tooling).
579    name: Option<&'static str>,
580}
581
582impl NodeDescriptor {
583    /// Construct a new `NodeDescriptor`.
584    #[inline]
585    pub fn new(
586        id: NodeIndex,
587        kind: NodeKind,
588        in_ports: u16,
589        out_ports: u16,
590        name: Option<&'static str>,
591    ) -> Self {
592        Self {
593            id,
594            kind,
595            in_ports,
596            out_ports,
597            name,
598        }
599    }
600
601    /// Unique identifier of this node in the graph.
602    #[inline]
603    pub fn id(&self) -> &NodeIndex {
604        &self.id
605    }
606
607    /// High-level category of the node (source, process, sink, etc).
608    #[inline]
609    pub fn kind(&self) -> &NodeKind {
610        &self.kind
611    }
612
613    /// Number of input ports declared by this node.
614    #[inline]
615    pub fn in_ports(&self) -> &u16 {
616        &self.in_ports
617    }
618
619    /// Number of output ports declared by this node.
620    #[inline]
621    pub fn out_ports(&self) -> &u16 {
622        &self.out_ports
623    }
624
625    /// Optional static name (for diagnostics or graph tooling).
626    #[inline]
627    pub fn name(&self) -> Option<&'static str> {
628        self.name
629    }
630}