Skip to main content

limen_core/node/
sink.rs

1//! Sink node trait and adapter.
2//!
3//! A `Sink` has ≥1 inputs and **0 outputs**. It consumes messages from one input
4//! per `step()` and commits them to an external side effect (file, stdout, GPIO,
5//! network, etc.). No dynamic dispatch in the hot path; everything is monomorphized.
6//!
7//! Design goals:
8//! - Minimal trait to implement a new sink.
9//! - Default input-selection strategy (first non-empty), overridable per sink.
10//! - Adapter `SinkNode<S, InP, IN>` that implements `Node<IN, 0, InP, ()>`.
11//! - Implicit `From<S>` so graphs can take `impl Into<SinkNode<...>>` and users
12//!   never have to mention the adapter type.
13
14use crate::edge::{Edge, EdgeOccupancy};
15use crate::errors::NodeError;
16use crate::memory::PlacementAcceptance;
17use crate::message::{payload::Payload, Message};
18use crate::node::{Node, NodeCapabilities, NodeKind, ProcessResult, StepContext, StepResult};
19use crate::policy::NodePolicy;
20use crate::prelude::{MemoryManager, PlatformClock, Telemetry};
21
22use core::marker::PhantomData;
23
24/// Uniform contract for sink implementations (≥1 inputs / 0 outputs).
25///
26/// # Type Parameters
27/// * `InP` — Payload type consumed by the sink.
28/// * `IN`  — Number of input ports on the sink node.
29pub trait Sink<InP, const IN: usize>
30where
31    InP: Payload,
32{
33    /// Sink-specific error type for `open()` or `consume()`.
34    type Error;
35
36    /// Prepare the sink for consumption (open file/device, connect network, etc.).
37    ///
38    /// Called from `Node::initialize`. Must be idempotent or fail safely if called
39    /// multiple times by a higher layer.
40    fn open(&mut self) -> Result<(), Self::Error>;
41
42    /// Consume a single message pulled from `port`.
43    ///
44    /// This is where side effects happen (write, print, publish). Return `Ok(())`
45    /// on success. Errors are mapped to `NodeError::execution_failed()`.
46    fn consume(&mut self, msg: &Message<InP>) -> Result<(), Self::Error>;
47
48    /// Input placement acceptances for zero-copy compatibility.
49    fn input_acceptance(&self) -> [PlacementAcceptance; IN];
50
51    /// Describe sink capabilities (device streams, degrade tiers, etc.).
52    fn capabilities(&self) -> NodeCapabilities;
53
54    /// Provide the node policy bundle (batching/budget/deadlines).
55    fn policy(&self) -> NodePolicy;
56
57    /// Optional: choose which input to read this step based on occupancies.
58    ///
59    /// Default strategy: first input with `items > 0`. Return `None` to indicate
60    /// "no input available now".
61    #[inline]
62    fn select_input(&mut self, occ: &[EdgeOccupancy; IN]) -> Option<usize> {
63        occ.iter().position(|o| *o.items() > 0)
64    }
65}
66
67/// A thin adapter that exposes a `Sink` as a `Node<IN, 0, InP, ()>`.
68///
69/// Owns the sink and forwards lifecycle calls. Users do **not** construct this
70/// directly — graphs can accept `impl Into<SinkNode<...>>` and rely on `From<S>`.
71pub struct SinkNode<S, InP, const IN: usize>
72where
73    S: Sink<InP, IN>,
74    InP: Payload,
75{
76    sink: S,
77    policy: NodePolicy,
78    _pd: PhantomData<InP>,
79}
80
81impl<S, InP, const IN: usize> SinkNode<S, InP, IN>
82where
83    S: Sink<InP, IN>,
84    InP: Payload,
85{
86    /// Construct a `SinkNode` from a sink and a static policy bundle.
87    #[inline]
88    pub const fn new(sink: S, policy: NodePolicy) -> Self {
89        Self {
90            sink,
91            policy,
92            _pd: PhantomData,
93        }
94    }
95
96    /// Borrow the underlying sink.
97    #[inline]
98    pub fn sink_ref(&self) -> &S {
99        &self.sink
100    }
101
102    /// Mutably borrow the underlying sink.
103    #[inline]
104    pub fn sink_mut(&mut self) -> &mut S {
105        &mut self.sink
106    }
107}
108
109/// Allow graphs to accept any `Sink` and convert implicitly.
110impl<S, InP, const IN: usize> From<S> for SinkNode<S, InP, IN>
111where
112    S: Sink<InP, IN>,
113    InP: Payload,
114{
115    #[inline]
116    fn from(sink: S) -> Self {
117        let policy = sink.policy();
118        SinkNode::new(sink, policy)
119    }
120}
121
122impl<S, InP, const IN: usize> Node<IN, 0, InP, ()> for SinkNode<S, InP, IN>
123where
124    S: Sink<InP, IN>,
125    InP: Payload + Copy,
126{
127    #[inline]
128    fn describe_capabilities(&self) -> NodeCapabilities {
129        self.sink.capabilities()
130    }
131
132    #[inline]
133    fn input_acceptance(&self) -> [PlacementAcceptance; IN] {
134        self.sink.input_acceptance()
135    }
136
137    #[inline]
138    fn output_acceptance(&self) -> [PlacementAcceptance; 0] {
139        []
140    }
141
142    #[inline]
143    fn policy(&self) -> NodePolicy {
144        self.policy
145    }
146
147    /// **TEST ONLY** method used to override batching policies for node contract tests.
148    #[cfg(any(test, feature = "bench"))]
149    fn set_policy(&mut self, policy: NodePolicy) {
150        self.policy = policy;
151    }
152
153    #[inline]
154    fn node_kind(&self) -> NodeKind {
155        NodeKind::Sink
156    }
157
158    #[inline]
159    fn initialize<C, T>(&mut self, _c: &C, _t: &mut T) -> Result<(), NodeError>
160    where
161        T: Telemetry,
162    {
163        self.sink
164            .open()
165            .map_err(|_| NodeError::external_unavailable())
166    }
167
168    #[inline]
169    fn start<C, T>(&mut self, _c: &C, _t: &mut T) -> Result<(), NodeError>
170    where
171        T: Telemetry,
172    {
173        Ok(())
174    }
175
176    #[inline]
177    fn process_message<C>(
178        &mut self,
179        msg: &Message<InP>,
180        _sys_clock: &C,
181    ) -> Result<ProcessResult<()>, NodeError>
182    where
183        C: PlatformClock + Sized,
184    {
185        self.sink
186            .consume(msg)
187            .map(|_| ProcessResult::Consumed)
188            .map_err(|_| NodeError::execution_failed())
189    }
190
191    #[inline]
192    fn step<'g, 't, 'ck, InQ, OutQ, InM, OutM, C, Tel>(
193        &mut self,
194        cx: &mut StepContext<'g, 't, 'ck, IN, 0, InP, (), InQ, OutQ, InM, OutM, C, Tel>,
195    ) -> Result<StepResult, NodeError>
196    where
197        InQ: Edge,
198        OutQ: Edge,
199        InM: MemoryManager<InP>,
200        OutM: MemoryManager<()>,
201        C: PlatformClock + Sized,
202        Tel: Telemetry + Sized,
203    {
204        // Snapshot occupancies and let the sink choose an input.
205        let occ: [EdgeOccupancy; IN] = core::array::from_fn(|i| cx.in_occupancy(i));
206        let port = match self.sink.select_input(&occ) {
207            Some(i) => i,
208            None => return Ok(StepResult::NoInput),
209        };
210
211        cx.pop_and_process(port, |msg| {
212            self.sink
213                .consume(msg)
214                .map(|_| ProcessResult::Consumed)
215                .map_err(|_| NodeError::execution_failed())
216        })
217    }
218
219    #[inline]
220    fn step_batch<'graph, 'telemetry, 'clock, InQ, OutQ, InM, OutM, C, Tel>(
221        &mut self,
222        ctx: &mut StepContext<
223            'graph,
224            'telemetry,
225            'clock,
226            IN,
227            0,
228            InP,
229            (),
230            InQ,
231            OutQ,
232            InM,
233            OutM,
234            C,
235            Tel,
236        >,
237    ) -> Result<StepResult, NodeError>
238    where
239        InQ: Edge,
240        OutQ: Edge,
241        InM: MemoryManager<InP>,
242        OutM: MemoryManager<()>,
243        C: PlatformClock + Sized,
244        Tel: Telemetry + Sized,
245    {
246        let node_policy = self.policy();
247        let port = match (0..IN).find(|&p| ctx.input_edge_has_batch(p, &node_policy)) {
248            Some(p) => p,
249            None => return Ok(StepResult::NoInput),
250        };
251        let nmax = node_policy.batching().fixed_n().unwrap_or(1);
252        let clock = ctx.clock;
253
254        ctx.pop_batch_and_process(port, nmax, &node_policy, |msg| {
255            self.process_message(msg, clock)
256        })
257    }
258
259    #[inline]
260    fn on_watchdog_timeout<C, Tel>(
261        &mut self,
262        clock: &C,
263        _t: &mut Tel,
264    ) -> Result<StepResult, NodeError>
265    where
266        C: PlatformClock + Sized,
267        Tel: Telemetry,
268    {
269        Ok(StepResult::YieldUntil(clock.now_ticks()))
270    }
271
272    #[inline]
273    fn stop<C, Tel>(&mut self, _c: &C, _t: &mut Tel) -> Result<(), NodeError>
274    where
275        Tel: Telemetry,
276    {
277        Ok(())
278    }
279}