Skip to main content

limen_core/
node.rs

1//! Uniform node contract, lifecycle, and step context.
2//!
3//! Nodes are monomorphized by generics and const generics — there is **no
4//! dynamic dispatch** in the hot path. Port schemas and policies are
5//! type-encoded on each node.
6//!
7//! Key types:
8//! - [`Node`] — the core trait: `step(StepContext) -> StepResult`.
9//! - [`StepContext`] — per-step runtime context (clock, telemetry, input/output edges, memory manager).
10//! - [`StepResult`] — scheduling hint returned after each step.
11//! - [`ProcessResult`] — result of processing a single message inside a node.
12//! - [`NodeKind`] — categorical label (source, process, model, sink, …).
13//! - [`NodeCapabilities`] — capability descriptor for device streams and degrade tiers.
14//!
15//! Submodules:
16//! - [`source`] — [`SourceNode`](source::SourceNode) trait for 0-input nodes.
17//! - [`sink`] — [`SinkNode`](sink::SinkNode) trait for 0-output nodes.
18//! - [`model`] — [`InferenceModel`](model::InferenceModel) adapter for inference nodes.
19//! - [`link`] — [`NodeLink`](link::NodeLink) and [`NodeDescriptor`](link::NodeDescriptor) wiring helpers.
20//! - `bench` — test nodes (`bench` / `test` feature).
21
22pub mod link;
23pub mod model;
24pub mod sink;
25pub mod source;
26
27#[cfg(any(test, feature = "bench"))]
28pub mod bench;
29
30#[cfg(any(test, feature = "bench"))]
31pub mod contract_tests;
32#[cfg(any(test, feature = "bench"))]
33pub use contract_tests::*;
34
35use crate::edge::{Edge, EdgeOccupancy, EnqueueResult};
36use crate::errors::{NodeError, QueueError};
37use crate::memory::PlacementAcceptance;
38use crate::message::{payload::Payload, Message};
39use crate::policy::{
40    AdmissionDecision, BatchingPolicy, EdgePolicy, NodePolicy, SlidingWindow, WindowKind,
41};
42use crate::prelude::{BatchMessageIter, MemoryManager, PlatformClock, TelemetryKey, TelemetryKind};
43use crate::telemetry::Telemetry;
44use crate::types::Ticks;
45
46/// Categories of nodes used in graph descriptors and builders.
47///
48/// These capture the high-level role of a node in the dataflow graph.
49#[non_exhaustive]
50#[derive(Debug, Clone, Copy, PartialEq, Eq)]
51pub enum NodeKind {
52    /// A source node: 0 inputs / ≥1 outputs.
53    ///
54    /// Examples: sensors, file readers, external ingress points.
55    Source,
56    /// A processing node: ≥1 inputs / ≥1 outputs.
57    ///
58    /// Examples: stateless transforms, stateful operators, pre/post-processing.
59    Process,
60    /// A model node: ≥1 inputs / ≥1 outputs.
61    ///
62    /// Represents inference nodes bound to a `ComputeBackend` and a model.
63    Model,
64    /// A split (fan-out) node: ≥1 inputs / ≥2 outputs.
65    ///
66    /// Used to branch one stream into multiple downstream paths.
67    Split,
68    /// A join (fan-in) node: ≥2 inputs / ≥1 outputs.
69    ///
70    /// Used to merge multiple streams into a single downstream path.
71    Join,
72    /// A sink node: ≥1 inputs / 0 outputs.
73    ///
74    /// Examples: file writers, stdout, GPIO, MQTT, other terminal sinks.
75    Sink,
76    /// An external node: request/response via transport to a remote or coprocessor.
77    External,
78}
79
80/// Node capability descriptor (ops, dtypes, layouts, streams).
81#[non_exhaustive]
82#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
83pub struct NodeCapabilities {
84    /// Whether the node can execute on device streams (P2).
85    device_streams: bool,
86    /// Whether mixed-precision or degrade tiers are available.
87    degrade_tiers: bool,
88}
89
90impl NodeCapabilities {
91    /// Construct a new `NodeCapabilities`.
92    #[inline]
93    pub const fn new(device_streams: bool, degrade_tiers: bool) -> Self {
94        Self {
95            device_streams,
96            degrade_tiers,
97        }
98    }
99
100    /// Whether the node can execute on device streams (P2).
101    #[inline]
102    pub fn device_streams(&self) -> &bool {
103        &self.device_streams
104    }
105
106    /// Whether mixed-precision or degrade tiers are available.
107    #[inline]
108    pub fn degrade_tiers(&self) -> &bool {
109        &self.degrade_tiers
110    }
111}
112
113/// Result of a `step` call indicating progress and scheduling hints.
114#[non_exhaustive]
115#[derive(Debug, Clone, Copy, PartialEq, Eq)]
116pub enum StepResult {
117    /// Work was performed (messages consumed and/or produced).
118    MadeProgress,
119    /// No inputs were available to make progress.
120    NoInput,
121    /// Backpressure prevented progress.
122    Backpressured,
123    /// Waiting on external completion (device, transport).
124    WaitingOnExternal,
125    /// Yield until provided tick (cooperative scheduling hint).
126    YieldUntil(Ticks),
127    /// Node has completed and will not produce further outputs.
128    Terminal,
129}
130
131/// Result of processing a single input message.
132///
133/// Returned by [`Node::process_message`] to indicate what the node produced.
134/// The framework handles pushing to output edges; the node never interacts
135/// with queues or managers directly.
136#[non_exhaustive]
137#[derive(Debug)]
138pub enum ProcessResult<P: Payload> {
139    /// Processed the message and produced output to push to port 0.
140    Output(Message<P>),
141    /// Consumed the input but produced no output (sinks, filters).
142    Consumed,
143    /// Nothing to process / skip.
144    Skip,
145}
146
147/// A context provided to nodes during `step`, abstracting queues, managers,
148/// and services.
149///
150/// The context is generic over input/output payload, queue, **memory manager**,
151/// clock, and telemetry types to avoid trait objects.
152pub struct StepContext<
153    'graph,
154    'telemetry,
155    'clock,
156    const IN: usize,
157    const OUT: usize,
158    InP,
159    OutP,
160    InQ,
161    OutQ,
162    InM,
163    OutM,
164    C,
165    T,
166> where
167    InP: Payload,
168    OutP: Payload,
169    C: PlatformClock + Sized,
170    T: Telemetry + Sized,
171{
172    /// Arrays of inbound queues by input port index.
173    inputs: [&'graph mut InQ; IN],
174    /// Arrays of outbound queues by output port index.
175    outputs: [&'graph mut OutQ; OUT],
176    /// Memory managers for each input port (one per edge).
177    in_managers: [&'graph mut InM; IN],
178    /// Memory managers for each output port (one per edge).
179    out_managers: [&'graph mut OutM; OUT],
180    /// Edge policies for each input.
181    in_policies: [EdgePolicy; IN],
182    /// Edge policies for each output.
183    out_policies: [EdgePolicy; OUT],
184
185    /// Node identifier for automatic telemetry stamping.
186    node_id: u32,
187    /// Input edge identifiers for automatic telemetry stamping.
188    in_edge_ids: [u32; IN],
189    /// Output edge identifiers for automatic telemetry stamping.
190    out_edge_ids: [u32; OUT],
191
192    /// Platform clock or timer services.
193    clock: &'clock C,
194    /// Telemetry sink for counters/histograms.
195    telemetry: &'telemetry mut T,
196    /// Phantom type markers to keep payload types visible to the compiler.
197    _marker: core::marker::PhantomData<(InP, OutP)>,
198}
199
200impl<
201        'graph,
202        'telemetry,
203        'clock,
204        const IN: usize,
205        const OUT: usize,
206        InP,
207        OutP,
208        InQ,
209        OutQ,
210        InM,
211        OutM,
212        C,
213        T,
214    > StepContext<'graph, 'telemetry, 'clock, IN, OUT, InP, OutP, InQ, OutQ, InM, OutM, C, T>
215where
216    InP: Payload,
217    OutP: Payload,
218    C: PlatformClock + Sized,
219    T: Telemetry + Sized,
220{
221    /// Create a new step context from queues, managers, policies, and services.
222    #[allow(clippy::too_many_arguments)]
223    pub fn new(
224        inputs: [&'graph mut InQ; IN],
225        outputs: [&'graph mut OutQ; OUT],
226        in_managers: [&'graph mut InM; IN],
227        out_managers: [&'graph mut OutM; OUT],
228        in_policies: [EdgePolicy; IN],
229        out_policies: [EdgePolicy; OUT],
230        node_id: u32,
231        in_edge_ids: [u32; IN],
232        out_edge_ids: [u32; OUT],
233        clock: &'clock C,
234        telemetry: &'telemetry mut T,
235    ) -> Self {
236        Self {
237            inputs,
238            outputs,
239            in_managers,
240            out_managers,
241            in_policies,
242            out_policies,
243            node_id,
244            in_edge_ids,
245            out_edge_ids,
246            clock,
247            telemetry,
248            _marker: core::marker::PhantomData,
249        }
250    }
251}
252
253impl<
254        'graph,
255        'telemetry,
256        'clock,
257        const IN: usize,
258        const OUT: usize,
259        InP,
260        OutP,
261        InQ,
262        OutQ,
263        InM,
264        OutM,
265        C,
266        T,
267    > StepContext<'graph, 'telemetry, 'clock, IN, OUT, InP, OutP, InQ, OutQ, InM, OutM, C, T>
268where
269    InP: Payload,
270    OutP: Payload,
271    InQ: Edge,
272    OutQ: Edge,
273    InM: MemoryManager<InP>,
274    OutM: MemoryManager<OutP>,
275    C: PlatformClock + Sized,
276    T: Telemetry + Sized,
277{
278    // ---------------------------------------------------------------
279    // Input operations
280    // ---------------------------------------------------------------
281
282    /// Peek the front message header on the specified input port (non-consuming).
283    ///
284    /// Returns a guard that dereferences to `MessageHeader`. The guard must
285    /// be dropped before any mutable operation on the same manager slot.
286    #[inline]
287    pub fn in_peek_header(&self, i: usize) -> Result<InM::HeaderGuard<'_>, QueueError> {
288        debug_assert!(i < IN);
289        let token = self.inputs[i].try_peek()?;
290        self.in_managers[i]
291            .peek_header(token)
292            .map_err(|_| QueueError::Empty)
293    }
294
295    // ---------------------------------------------------------------
296    // Callback-based input operations
297    // ---------------------------------------------------------------
298
299    /// Pop one message from input `port`, call `f` with a shared reference,
300    /// then push any output and free the manager slot.
301    pub fn pop_and_process<F>(&mut self, port: usize, f: F) -> Result<StepResult, NodeError>
302    where
303        F: FnOnce(&Message<InP>) -> Result<ProcessResult<OutP>, NodeError>,
304    {
305        debug_assert!(port < IN);
306
307        let token = match self.inputs[port].try_pop(&*self.in_managers[port]) {
308            Ok(t) => t,
309            Err(QueueError::Empty) => return Ok(StepResult::NoInput),
310            Err(QueueError::Backpressured) | Err(QueueError::AtOrAboveHardCap) => {
311                return Err(NodeError::backpressured());
312            }
313            Err(QueueError::Poisoned) | Err(QueueError::Unsupported) => {
314                return Err(NodeError::execution_failed());
315            }
316        };
317
318        let guard = self.in_managers[port]
319            .read(token)
320            .map_err(|_| NodeError::execution_failed())?;
321
322        let result = f(&*guard)?;
323
324        drop(guard);
325        let _ = self.in_managers[port].free(token);
326
327        if T::METRICS_ENABLED {
328            self.telemetry.incr_counter(
329                TelemetryKey::node(self.node_id, TelemetryKind::IngressMsgs),
330                1,
331            );
332            let _ = self.in_occupancy(port);
333        }
334
335        match result {
336            ProcessResult::Output(out_msg) => self.push_output(0, out_msg),
337            ProcessResult::Consumed => Ok(StepResult::MadeProgress),
338            ProcessResult::Skip => Ok(StepResult::NoInput),
339        }
340    }
341
342    /// Pop a batch from input `port`. Set batch flags on popped tokens.
343    /// Call `f` for each message in the batch, pushing any outputs internally.
344    /// After processing, free all consumed tokens.
345    pub fn pop_batch_and_process<F>(
346        &mut self,
347        port: usize,
348        nmax: usize,
349        node_policy: &NodePolicy,
350        mut f: F,
351    ) -> Result<StepResult, NodeError>
352    where
353        F: FnMut(&Message<InP>) -> Result<ProcessResult<OutP>, NodeError>,
354    {
355        debug_assert!(port < IN);
356
357        if nmax == 0 {
358            return Err(NodeError::execution_failed());
359        }
360
361        // Build clamped batching policy from node policy.
362        let requested_policy = {
363            let nb = *node_policy.batching();
364            BatchingPolicy::with_window(
365                nb.fixed_n().map(|f_n| core::cmp::min(f_n, nmax)),
366                *nb.max_delta_t(),
367                match nb.window_kind() {
368                    WindowKind::Disjoint => WindowKind::Disjoint,
369                    WindowKind::Sliding(sw) => {
370                        let size = nb
371                            .fixed_n()
372                            .map(|f_n| core::cmp::min(f_n, nmax))
373                            .unwrap_or(1);
374                        let stride = core::cmp::min(*sw.stride(), size);
375                        WindowKind::Sliding(SlidingWindow::new(stride))
376                    }
377                },
378            )
379        };
380
381        // Determine stride for free decisions.
382        let stride = match requested_policy.window_kind() {
383            WindowKind::Disjoint => usize::MAX,
384            WindowKind::Sliding(sw) => *sw.stride(),
385        };
386
387        // Sample pre-pop occupancy.
388        let occ_before = self.inputs[port].occupancy(&self.in_policies[port]);
389
390        // Pop batch of tokens.
391        let batch =
392            match self.inputs[port].try_pop_batch(&requested_policy, &*self.in_managers[port]) {
393                Ok(b) => b,
394                Err(QueueError::Empty) => return Ok(StepResult::NoInput),
395                Err(QueueError::Backpressured) | Err(QueueError::AtOrAboveHardCap) => {
396                    return Err(NodeError::backpressured());
397                }
398                Err(QueueError::Poisoned) => {
399                    return Err(NodeError::execution_failed().with_code(1));
400                }
401                Err(QueueError::Unsupported) => {
402                    return Err(NodeError::execution_failed().with_code(2));
403                }
404            };
405
406        let batch_len = batch.len();
407        if batch_len == 0 {
408            return Ok(StepResult::NoInput);
409        }
410        let actual_stride = core::cmp::min(stride, batch_len);
411
412        // Disjoint field split for input manager.
413        let in_mgr: &mut InM = &mut *self.in_managers[port];
414
415        // Phase 1: set batch boundary flags on popped tokens.
416        for (idx, &token) in batch.as_slice().iter().enumerate() {
417            if idx < actual_stride {
418                if let Ok(mut wg) = in_mgr.read_mut(token) {
419                    if idx == 0 {
420                        wg.header_mut().set_first_in_batch();
421                    }
422                    if idx == batch_len - 1 || batch_len == 1 {
423                        wg.header_mut().set_last_in_batch();
424                    }
425                }
426            }
427        }
428
429        // Phase 2: build OutStepContext for disjoint output access, then
430        // iterate batch calling process_message per item.
431        let iter =
432            BatchMessageIter::new(batch.as_slice().iter(), &*in_mgr, actual_stride, batch_len);
433
434        let out_policies = self.out_policies;
435        let out_edge_ids = self.out_edge_ids;
436        let node_id = self.node_id;
437        let clock = self.clock;
438        let telemetry: &mut T = &mut *self.telemetry;
439        let outputs = &mut self.outputs;
440        let out_managers = &mut self.out_managers;
441
442        let mut out = OutStepContext {
443            outputs,
444            out_managers,
445            out_policies,
446            out_edge_ids,
447            node_id,
448            clock,
449            telemetry,
450            _marker: core::marker::PhantomData,
451        };
452
453        let mut any_made = false;
454        let mut backpressured = false;
455        for guard in iter {
456            if backpressured {
457                // Once backpressured, skip remaining messages but keep iterating
458                // to drop the iterator cleanly.
459                drop(guard);
460                continue;
461            }
462            match f(&*guard)? {
463                ProcessResult::Output(out_msg) => {
464                    drop(guard);
465                    match out.out_try_push(0, out_msg) {
466                        EnqueueResult::Enqueued => {
467                            any_made = true;
468                        }
469                        EnqueueResult::DroppedNewest | EnqueueResult::Rejected => {
470                            backpressured = true;
471                        }
472                    }
473                }
474                ProcessResult::Consumed => {
475                    drop(guard);
476                    any_made = true;
477                }
478                ProcessResult::Skip => {
479                    drop(guard);
480                }
481            }
482        }
483
484        // Phase 3: free consumed tokens.
485        for (idx, &token) in batch.as_slice().iter().enumerate() {
486            if idx < actual_stride {
487                let _ = in_mgr.free(token);
488            }
489        }
490
491        // Telemetry.
492        if T::METRICS_ENABLED {
493            let telemetry = &mut *out.telemetry;
494            telemetry.incr_counter(
495                TelemetryKey::node(node_id, TelemetryKind::IngressMsgs),
496                actual_stride as u64,
497            );
498            let after_items = occ_before.items().saturating_sub(actual_stride);
499            telemetry.set_gauge(
500                TelemetryKey::edge(self.in_edge_ids[port], TelemetryKind::QueueDepth),
501                after_items as u64,
502            );
503        }
504
505        if backpressured {
506            Ok(StepResult::Backpressured)
507        } else if any_made {
508            Ok(StepResult::MadeProgress)
509        } else {
510            Ok(StepResult::NoInput)
511        }
512    }
513
514    /// Return a snapshot of occupancy of the specified input queue.
515    #[inline]
516    pub fn in_occupancy(&mut self, i: usize) -> EdgeOccupancy {
517        debug_assert!(i < IN);
518        let occ = self.inputs[i].occupancy(&self.in_policies[i]);
519        if T::METRICS_ENABLED {
520            self.telemetry.set_gauge(
521                TelemetryKey::edge(self.in_edge_ids[i], TelemetryKind::QueueDepth),
522                *occ.items() as u64,
523            );
524        }
525        occ
526    }
527
528    /// Return the policy of the specified input queue.
529    #[inline]
530    pub fn in_policy(&mut self, i: usize) -> EdgePolicy {
531        debug_assert!(i < IN);
532        self.in_policies[i]
533    }
534
535    // ---------------------------------------------------------------
536    // Output operations
537    // ---------------------------------------------------------------
538
539    /// Push a message to the specified output port.
540    ///
541    /// Stores the message in the output memory manager, then pushes the
542    /// resulting token to the edge. Handles eviction: if DropOldest evicts
543    /// a token, the evicted token is freed from the manager.
544    pub fn out_try_push(&mut self, o: usize, m: Message<OutP>) -> EnqueueResult {
545        debug_assert!(o < OUT);
546
547        let token = match self.out_managers[o].store(m) {
548            Ok(t) => t,
549            Err(_) => return EnqueueResult::Rejected,
550        };
551
552        // Pre-eviction: query admission and pop+free any tokens that must make room.
553        let decision = self.outputs[o].get_admission_decision(
554            &self.out_policies[o],
555            token,
556            &*self.out_managers[o],
557        );
558        match decision {
559            AdmissionDecision::Evict(n) => {
560                for _ in 0..n {
561                    match self.outputs[o].try_pop(&*self.out_managers[o]) {
562                        Ok(evicted) => {
563                            let _ = self.out_managers[o].free(evicted);
564                            if T::METRICS_ENABLED {
565                                self.telemetry.incr_counter(
566                                    TelemetryKey::node(self.node_id, TelemetryKind::Dropped),
567                                    1,
568                                );
569                            }
570                        }
571                        Err(_) => break,
572                    }
573                }
574            }
575            AdmissionDecision::EvictUntilBelowHard => loop {
576                let occ = self.outputs[o].occupancy(&self.out_policies[o]);
577                if !self.out_policies[o]
578                    .caps
579                    .at_or_above_hard(*occ.items(), *occ.bytes())
580                {
581                    break;
582                }
583                match self.outputs[o].try_pop(&*self.out_managers[o]) {
584                    Ok(evicted) => {
585                        let _ = self.out_managers[o].free(evicted);
586                        if T::METRICS_ENABLED {
587                            self.telemetry.incr_counter(
588                                TelemetryKey::node(self.node_id, TelemetryKind::Dropped),
589                                1,
590                            );
591                        }
592                    }
593                    Err(_) => break,
594                }
595            },
596            AdmissionDecision::DropNewest => {
597                let _ = self.out_managers[o].free(token);
598                if T::METRICS_ENABLED {
599                    self.telemetry
600                        .incr_counter(TelemetryKey::node(self.node_id, TelemetryKind::Dropped), 1);
601                }
602                return EnqueueResult::DroppedNewest;
603            }
604            AdmissionDecision::Reject | AdmissionDecision::Block => {
605                let _ = self.out_managers[o].free(token);
606                if T::METRICS_ENABLED {
607                    self.telemetry
608                        .incr_counter(TelemetryKey::node(self.node_id, TelemetryKind::Dropped), 1);
609                }
610                return EnqueueResult::Rejected;
611            }
612            AdmissionDecision::Admit => {}
613        }
614
615        match self.outputs[o].try_push(token, &self.out_policies[o], &*self.out_managers[o]) {
616            EnqueueResult::Enqueued => {
617                if T::METRICS_ENABLED {
618                    self.telemetry.incr_counter(
619                        TelemetryKey::node(self.node_id, TelemetryKind::EgressMsgs),
620                        1,
621                    );
622                    let _ = self.out_occupancy(o);
623                }
624                EnqueueResult::Enqueued
625            }
626            EnqueueResult::DroppedNewest | EnqueueResult::Rejected => {
627                let _ = self.out_managers[o].free(token);
628                if T::METRICS_ENABLED {
629                    self.telemetry
630                        .incr_counter(TelemetryKey::node(self.node_id, TelemetryKind::Dropped), 1);
631                }
632                EnqueueResult::Rejected
633            }
634        }
635    }
636
637    /// Push an output message to the specified output port and map the result
638    /// to a `StepResult`.
639    ///
640    /// Stores the message in the output memory manager, performs pre-eviction
641    /// (popping and freeing all slots the admission policy requires), then
642    /// pushes the token to the edge. Residual eviction from a concurrent race
643    /// is also freed. This guarantees zero manager slot leaks across all edge
644    /// tiers and admission policies.
645    pub fn push_output(
646        &mut self,
647        port: usize,
648        msg: Message<OutP>,
649    ) -> Result<StepResult, NodeError> {
650        debug_assert!(port < OUT);
651
652        let admission_decision =
653            self.outputs[port].get_admission_decision_from_message(&self.out_policies[port], &msg);
654
655        match admission_decision {
656            AdmissionDecision::Evict(eviction_count) => {
657                for _ in 0..eviction_count {
658                    match self.outputs[port].try_pop(&*self.out_managers[port]) {
659                        Ok(evicted_token) => {
660                            let _ = self.out_managers[port].free(evicted_token);
661                            if T::METRICS_ENABLED {
662                                self.telemetry.incr_counter(
663                                    TelemetryKey::node(self.node_id, TelemetryKind::Dropped),
664                                    1,
665                                );
666                            }
667                        }
668                        Err(_) => break,
669                    }
670                }
671            }
672            AdmissionDecision::EvictUntilBelowHard => loop {
673                let occupancy = self.outputs[port].occupancy(&self.out_policies[port]);
674                if !self.out_policies[port]
675                    .caps
676                    .at_or_above_hard(*occupancy.items(), *occupancy.bytes())
677                {
678                    break;
679                }
680
681                match self.outputs[port].try_pop(&*self.out_managers[port]) {
682                    Ok(evicted_token) => {
683                        let _ = self.out_managers[port].free(evicted_token);
684                        if T::METRICS_ENABLED {
685                            self.telemetry.incr_counter(
686                                TelemetryKey::node(self.node_id, TelemetryKind::Dropped),
687                                1,
688                            );
689                        }
690                    }
691                    Err(_) => break,
692                }
693            },
694            AdmissionDecision::DropNewest
695            | AdmissionDecision::Reject
696            | AdmissionDecision::Block => {
697                if T::METRICS_ENABLED {
698                    self.telemetry
699                        .incr_counter(TelemetryKey::node(self.node_id, TelemetryKind::Dropped), 1);
700                }
701                return Ok(StepResult::Backpressured);
702            }
703            AdmissionDecision::Admit => {}
704        }
705
706        let token = match self.out_managers[port].store(msg) {
707            Ok(token) => token,
708            Err(_) => {
709                if T::METRICS_ENABLED {
710                    self.telemetry
711                        .incr_counter(TelemetryKey::node(self.node_id, TelemetryKind::Dropped), 1);
712                }
713                return Ok(StepResult::Backpressured);
714            }
715        };
716
717        match self.outputs[port].try_push(
718            token,
719            &self.out_policies[port],
720            &*self.out_managers[port],
721        ) {
722            EnqueueResult::Enqueued => {
723                if T::METRICS_ENABLED {
724                    self.telemetry.incr_counter(
725                        TelemetryKey::node(self.node_id, TelemetryKind::EgressMsgs),
726                        1,
727                    );
728                    let _ = self.out_occupancy(port);
729                }
730                Ok(StepResult::MadeProgress)
731            }
732            EnqueueResult::DroppedNewest | EnqueueResult::Rejected => {
733                let _ = self.out_managers[port].free(token);
734                if T::METRICS_ENABLED {
735                    self.telemetry
736                        .incr_counter(TelemetryKey::node(self.node_id, TelemetryKind::Dropped), 1);
737                }
738                Ok(StepResult::Backpressured)
739            }
740        }
741    }
742
743    /// Return a snapshot of occupancy of the specified output queue.
744    #[inline]
745    pub fn out_occupancy(&mut self, o: usize) -> EdgeOccupancy {
746        debug_assert!(o < OUT);
747        let occ = self.outputs[o].occupancy(&self.out_policies[o]);
748        if T::METRICS_ENABLED {
749            self.telemetry.set_gauge(
750                TelemetryKey::edge(self.out_edge_ids[o], TelemetryKind::QueueDepth),
751                *occ.items() as u64,
752            );
753        }
754        occ
755    }
756
757    /// Return the policy of the specified output queue.
758    #[inline]
759    pub fn out_policy(&mut self, i: usize) -> EdgePolicy {
760        debug_assert!(i < OUT);
761        self.out_policies[i]
762    }
763
764    // ---------------------------------------------------------------
765    // Clock / telemetry
766    // ---------------------------------------------------------------
767
768    /// Access the platform clock used for timing and conversions.
769    #[inline]
770    pub fn clock(&self) -> &C {
771        self.clock
772    }
773
774    /// Borrow the telemetry sink to emit custom counters/gauges/histograms.
775    #[inline]
776    pub fn telemetry_mut(&mut self) -> &mut T {
777        self.telemetry
778    }
779
780    /// Current monotonic tick value from the platform clock.
781    #[inline]
782    pub fn now_ticks(&self) -> Ticks {
783        self.clock.now_ticks()
784    }
785
786    /// Current time in nanoseconds per the clock's tick-to-ns mapping.
787    #[inline]
788    pub fn now_nanos(&self) -> u64 {
789        self.clock.ticks_to_nanos(self.clock.now_ticks())
790    }
791
792    /// Convert clock ticks to nanoseconds using the clock's scale.
793    #[inline]
794    pub fn ticks_to_nanos(&self, t: Ticks) -> u64 {
795        self.clock.ticks_to_nanos(t)
796    }
797
798    /// Convert nanoseconds to clock ticks using the clock's scale.
799    #[inline]
800    pub fn nanos_to_ticks(&self, ns: u64) -> Ticks {
801        self.clock.nanos_to_ticks(ns)
802    }
803
804    // ---------------------------------------------------------------
805    // Batch readiness
806    // ---------------------------------------------------------------
807
808    /// Return `true` if the input edge `port` can produce a batch under `policy`.
809    ///
810    /// Uses the input manager's `HeaderStore` to peek creation ticks for
811    /// span validation when both `fixed_n` and `max_delta_t` are set.
812    #[inline]
813    pub fn input_edge_has_batch(&mut self, port: usize, policy: &NodePolicy) -> bool {
814        debug_assert!(port < IN);
815
816        let occ = self.in_occupancy(port);
817        if occ.items() == &0 {
818            return false;
819        }
820
821        let fixed_opt = *policy.batching().fixed_n();
822        let delta_opt = *policy.batching().max_delta_t();
823
824        match (fixed_opt, delta_opt) {
825            (Some(fixed_n), None) => *occ.items() >= fixed_n,
826            (None, Some(_max_delta_t)) => true,
827            (Some(fixed_n), Some(max_delta_t)) => {
828                if *occ.items() < fixed_n {
829                    return false;
830                }
831                // Peek front and last token, look up creation ticks via HeaderStore.
832                let first_token = match self.inputs[port].try_peek_at(0) {
833                    Ok(t) => t,
834                    Err(_) => return false,
835                };
836                let last_token = match self.inputs[port].try_peek_at(fixed_n - 1) {
837                    Ok(t) => t,
838                    Err(_) => return false,
839                };
840
841                let first_ticks = match self.in_managers[port].peek_header(first_token) {
842                    Ok(h) => *h.creation_tick(),
843                    Err(_) => return false,
844                };
845                let last_ticks = match self.in_managers[port].peek_header(last_token) {
846                    Ok(h) => *h.creation_tick(),
847                    Err(_) => return false,
848                };
849
850                let span = last_ticks.saturating_sub(first_ticks);
851                span <= max_delta_t
852            }
853            (None, None) => true,
854        }
855    }
856
857    /// Construct an `OutStepContext` by borrowing only the output-related
858    /// fields and telemetry from `self`.
859    #[allow(dead_code)]
860    #[inline]
861    fn as_out_step_context<'ctx>(
862        &'ctx mut self,
863    ) -> OutStepContext<'graph, 'ctx, 'clock, OUT, OutP, OutQ, OutM, C, T>
864    where
865        EdgePolicy: Copy,
866    {
867        let out_policies = self.out_policies;
868        let out_edge_ids = self.out_edge_ids;
869        let node_id = self.node_id;
870        let clock = self.clock;
871        let telemetry = &mut *self.telemetry;
872        let outputs: &'ctx mut [&'graph mut OutQ; OUT] = &mut self.outputs;
873        let out_managers: &'ctx mut [&'graph mut OutM; OUT] = &mut self.out_managers;
874
875        OutStepContext {
876            outputs,
877            out_managers,
878            out_policies,
879            out_edge_ids,
880            node_id,
881            clock,
882            telemetry,
883            _marker: core::marker::PhantomData,
884        }
885    }
886}
887
888/// A `StepContext` *view* that only exposes outputs / managers / clock / telemetry.
889///
890/// Explicitly **does not** provide access to input queues or input managers.
891pub struct OutStepContext<'graph, 'ctx, 'clock, const OUT: usize, OutP, OutQ, OutM, C, T>
892where
893    OutP: Payload,
894    C: PlatformClock + Sized,
895    T: Telemetry + Sized,
896{
897    /// Mutable borrow of the outputs array from the original StepContext.
898    outputs: &'ctx mut [&'graph mut OutQ; OUT],
899    /// Mutable borrow of the output managers array.
900    out_managers: &'ctx mut [&'graph mut OutM; OUT],
901    /// Copy of per-output policies (EdgePolicy: Copy).
902    out_policies: [EdgePolicy; OUT],
903    /// Copy of output edge ids.
904    out_edge_ids: [u32; OUT],
905    /// Node id for telemetry.
906    node_id: u32,
907    /// Borrow the clock (shared).
908    clock: &'clock C,
909    /// Mutable borrow of telemetry from the StepContext (reborrowed).
910    telemetry: &'ctx mut T,
911    /// Phantom to keep OutP visible to the compiler.
912    _marker: core::marker::PhantomData<OutP>,
913}
914
915impl<'graph, 'ctx, 'clock, const OUT: usize, OutP, OutQ, OutM, C, T>
916    OutStepContext<'graph, 'ctx, 'clock, OUT, OutP, OutQ, OutM, C, T>
917where
918    OutP: Payload,
919    OutQ: Edge,
920    OutM: MemoryManager<OutP>,
921    C: PlatformClock + Sized,
922    T: Telemetry + Sized,
923{
924    /// Push a message to an output queue via the memory manager.
925    ///
926    /// Stores the message in the manager, pushes the token to the edge,
927    /// and handles eviction (frees evicted tokens from the manager).
928    pub fn out_try_push(&mut self, o: usize, m: Message<OutP>) -> EnqueueResult {
929        debug_assert!(o < OUT);
930
931        let token = match self.out_managers[o].store(m) {
932            Ok(t) => t,
933            Err(_) => return EnqueueResult::Rejected,
934        };
935
936        // Pre-eviction: query admission and pop+free any tokens that must make room.
937        let decision = self.outputs[o].get_admission_decision(
938            &self.out_policies[o],
939            token,
940            &*self.out_managers[o],
941        );
942        match decision {
943            AdmissionDecision::Evict(n) => {
944                for _ in 0..n {
945                    match self.outputs[o].try_pop(&*self.out_managers[o]) {
946                        Ok(evicted) => {
947                            let _ = self.out_managers[o].free(evicted);
948                            if T::METRICS_ENABLED {
949                                self.telemetry.incr_counter(
950                                    TelemetryKey::node(self.node_id, TelemetryKind::Dropped),
951                                    1,
952                                );
953                            }
954                        }
955                        Err(_) => break,
956                    }
957                }
958            }
959            AdmissionDecision::EvictUntilBelowHard => loop {
960                let occ = self.outputs[o].occupancy(&self.out_policies[o]);
961                if !self.out_policies[o]
962                    .caps
963                    .at_or_above_hard(*occ.items(), *occ.bytes())
964                {
965                    break;
966                }
967                match self.outputs[o].try_pop(&*self.out_managers[o]) {
968                    Ok(evicted) => {
969                        let _ = self.out_managers[o].free(evicted);
970                        if T::METRICS_ENABLED {
971                            self.telemetry.incr_counter(
972                                TelemetryKey::node(self.node_id, TelemetryKind::Dropped),
973                                1,
974                            );
975                        }
976                    }
977                    Err(_) => break,
978                }
979            },
980            AdmissionDecision::DropNewest => {
981                let _ = self.out_managers[o].free(token);
982                if T::METRICS_ENABLED {
983                    self.telemetry
984                        .incr_counter(TelemetryKey::node(self.node_id, TelemetryKind::Dropped), 1);
985                }
986                return EnqueueResult::DroppedNewest;
987            }
988            AdmissionDecision::Reject | AdmissionDecision::Block => {
989                let _ = self.out_managers[o].free(token);
990                if T::METRICS_ENABLED {
991                    self.telemetry
992                        .incr_counter(TelemetryKey::node(self.node_id, TelemetryKind::Dropped), 1);
993                }
994                return EnqueueResult::Rejected;
995            }
996            AdmissionDecision::Admit => {}
997        }
998
999        match self.outputs[o].try_push(token, &self.out_policies[o], &*self.out_managers[o]) {
1000            EnqueueResult::Enqueued => {
1001                if T::METRICS_ENABLED {
1002                    self.telemetry.incr_counter(
1003                        TelemetryKey::node(self.node_id, TelemetryKind::EgressMsgs),
1004                        1,
1005                    );
1006                    let occ = self.outputs[o].occupancy(&self.out_policies[o]);
1007                    self.telemetry.set_gauge(
1008                        TelemetryKey::edge(self.out_edge_ids[o], TelemetryKind::QueueDepth),
1009                        *occ.items() as u64,
1010                    );
1011                }
1012                EnqueueResult::Enqueued
1013            }
1014            EnqueueResult::DroppedNewest | EnqueueResult::Rejected => {
1015                let _ = self.out_managers[o].free(token);
1016                if T::METRICS_ENABLED {
1017                    self.telemetry
1018                        .incr_counter(TelemetryKey::node(self.node_id, TelemetryKind::Dropped), 1);
1019                }
1020                EnqueueResult::Rejected
1021            }
1022        }
1023    }
1024
1025    /// Snapshot occupancy for the given output edge.
1026    #[inline]
1027    pub fn out_occupancy(&mut self, o: usize) -> EdgeOccupancy {
1028        debug_assert!(o < OUT);
1029        let occ = self.outputs[o].occupancy(&self.out_policies[o]);
1030        if T::METRICS_ENABLED {
1031            self.telemetry.set_gauge(
1032                TelemetryKey::edge(self.out_edge_ids[o], TelemetryKind::QueueDepth),
1033                *occ.items() as u64,
1034            );
1035        }
1036        occ
1037    }
1038
1039    /// Return the policy for the given output (copy).
1040    #[inline]
1041    pub fn out_policy(&mut self, o: usize) -> EdgePolicy {
1042        debug_assert!(o < OUT);
1043        self.out_policies[o]
1044    }
1045
1046    /// Access the platform clock.
1047    #[inline]
1048    pub fn clock(&self) -> &C {
1049        self.clock
1050    }
1051
1052    /// Borrow the telemetry sink.
1053    #[inline]
1054    pub fn telemetry_mut(&mut self) -> &mut T {
1055        self.telemetry
1056    }
1057
1058    /// Current monotonic tick value.
1059    #[inline]
1060    pub fn now_ticks(&self) -> Ticks {
1061        self.clock.now_ticks()
1062    }
1063
1064    /// Current time in nanoseconds.
1065    #[inline]
1066    pub fn now_nanos(&self) -> u64 {
1067        self.clock.ticks_to_nanos(self.clock.now_ticks())
1068    }
1069
1070    /// Convert clock ticks to nanoseconds.
1071    #[inline]
1072    pub fn ticks_to_nanos(&self, t: Ticks) -> u64 {
1073        self.clock.ticks_to_nanos(t)
1074    }
1075
1076    /// Convert nanoseconds to clock ticks.
1077    #[inline]
1078    pub fn nanos_to_ticks(&self, ns: u64) -> Ticks {
1079        self.clock.nanos_to_ticks(ns)
1080    }
1081}
1082
1083/// The uniform node contract.
1084///
1085/// Nodes are parameterized by:
1086/// - `IN`: number of input ports; `OUT`: number of output ports;
1087/// - `InP`: input payload type; `OutP`: output payload type.
1088///
1089/// Queue and manager types are introduced on each method via `where` clauses
1090/// rather than on the trait itself, keeping the trait payload-focused and
1091/// avoiding an explosion of type parameters on the `impl`.
1092pub trait Node<const IN: usize, const OUT: usize, InP, OutP>
1093where
1094    InP: Payload,
1095    OutP: Payload,
1096{
1097    /// Return the node's capability descriptor.
1098    fn describe_capabilities(&self) -> NodeCapabilities;
1099
1100    /// Return the node's port placement acceptances (zero-copy compatibility).
1101    fn input_acceptance(&self) -> [PlacementAcceptance; IN];
1102
1103    /// Return the node's output placement preferences (zero-copy compatibility).
1104    fn output_acceptance(&self) -> [PlacementAcceptance; OUT];
1105
1106    /// Return the node's policy bundle.
1107    fn policy(&self) -> NodePolicy;
1108
1109    /// **TEST ONLY** method used to override batching policies for node contract tests.
1110    #[cfg(any(test, feature = "bench"))]
1111    fn set_policy(&mut self, policy: NodePolicy);
1112
1113    /// Return the type of node (Model, processing, source, sink).
1114    fn node_kind(&self) -> NodeKind;
1115
1116    /// Prepare internal state, acquire buffers, and register telemetry series.
1117    fn initialize<C, Tel>(&mut self, clock: &C, telemetry: &mut Tel) -> Result<(), NodeError>
1118    where
1119        Tel: Telemetry;
1120
1121    /// Optional warm-up (e.g., compile kernels, prime pools). Default: no-op.
1122    fn start<C, Tel>(&mut self, _clock: &C, _telemetry: &mut Tel) -> Result<(), NodeError>
1123    where
1124        Tel: Telemetry;
1125
1126    /// Per-message processing hook.
1127    ///
1128    /// Receives a shared reference to the input message and returns a
1129    /// `ProcessResult` indicating what output (if any) was produced.
1130    /// The framework handles pushing outputs to edges; the node never
1131    /// interacts with queues or managers directly.
1132    fn process_message<C>(
1133        &mut self,
1134        msg: &Message<InP>,
1135        sys_clock: &C,
1136    ) -> Result<ProcessResult<OutP>, NodeError>
1137    where
1138        C: PlatformClock + Sized;
1139
1140    /// Execute one cooperative step using the provided context.
1141    ///
1142    /// The default implementation:
1143    /// 1. Finds a ready input port via `input_edge_has_batch`.
1144    /// 2. Pops a single message via `pop_and_process`.
1145    /// 3. Delegates to `process_message` inside the callback.
1146    fn step<'graph, 'telemetry, 'clock, InQ, OutQ, InM, OutM, C, Tel>(
1147        &mut self,
1148        ctx: &mut StepContext<
1149            'graph,
1150            'telemetry,
1151            'clock,
1152            IN,
1153            OUT,
1154            InP,
1155            OutP,
1156            InQ,
1157            OutQ,
1158            InM,
1159            OutM,
1160            C,
1161            Tel,
1162        >,
1163    ) -> Result<StepResult, NodeError>
1164    where
1165        InQ: Edge,
1166        OutQ: Edge,
1167        InM: MemoryManager<InP>,
1168        OutM: MemoryManager<OutP>,
1169        C: PlatformClock + Sized,
1170        Tel: Telemetry + Sized,
1171    {
1172        let node_policy = self.policy();
1173        let port = match (0..IN).find(|&p| ctx.input_edge_has_batch(p, &node_policy)) {
1174            Some(p) => p,
1175            None => return Ok(StepResult::NoInput),
1176        };
1177
1178        ctx.pop_and_process(port, |msg| self.process_message(msg, ctx.clock))
1179    }
1180
1181    /// Default batched-step implementation that honors all NodePolicy batching
1182    /// variants while delegating actual consumption to the implementor's
1183    /// single-message `process_message()` method.
1184    fn step_batch<'graph, 'telemetry, 'clock, InQ, OutQ, InM, OutM, C, Tel>(
1185        &mut self,
1186        ctx: &mut StepContext<
1187            'graph,
1188            'telemetry,
1189            'clock,
1190            IN,
1191            OUT,
1192            InP,
1193            OutP,
1194            InQ,
1195            OutQ,
1196            InM,
1197            OutM,
1198            C,
1199            Tel,
1200        >,
1201    ) -> Result<StepResult, NodeError>
1202    where
1203        InQ: Edge,
1204        OutQ: Edge,
1205        InM: MemoryManager<InP>,
1206        OutM: MemoryManager<OutP>,
1207        C: PlatformClock + Sized,
1208        Tel: Telemetry + Sized,
1209    {
1210        let node_policy = self.policy();
1211        let port = match (0..IN).find(|&p| ctx.input_edge_has_batch(p, &node_policy)) {
1212            Some(p) => p,
1213            None => return Ok(StepResult::NoInput),
1214        };
1215        let nmax = node_policy.batching().fixed_n().unwrap_or(1);
1216
1217        ctx.pop_batch_and_process(port, nmax, &node_policy, |msg| {
1218            self.process_message(msg, ctx.clock)
1219        })
1220    }
1221
1222    /// Handle watchdog timeouts by applying over-budget policy (degrade/default/skip).
1223    fn on_watchdog_timeout<C, Tel>(
1224        &mut self,
1225        _clock: &C,
1226        _telemetry: &mut Tel,
1227    ) -> Result<StepResult, NodeError>
1228    where
1229        C: PlatformClock + Sized,
1230        Tel: Telemetry;
1231
1232    /// Flush and release resources, if any. Default: no-op.
1233    fn stop<C, Tel>(&mut self, _clock: &C, _telemetry: &mut Tel) -> Result<(), NodeError>
1234    where
1235        Tel: Telemetry;
1236}