1use crate::edge::{Edge, EdgeOccupancy};
15use crate::errors::NodeError;
16use crate::memory::PlacementAcceptance;
17use crate::message::{payload::Payload, Message};
18use crate::node::{Node, NodeCapabilities, NodeKind, ProcessResult, StepContext, StepResult};
19use crate::policy::NodePolicy;
20use crate::prelude::{MemoryManager, PlatformClock, Telemetry};
21
22use core::marker::PhantomData;
23
24pub trait Sink<InP, const IN: usize>
30where
31 InP: Payload,
32{
33 type Error;
35
36 fn open(&mut self) -> Result<(), Self::Error>;
41
42 fn consume(&mut self, msg: &Message<InP>) -> Result<(), Self::Error>;
47
48 fn input_acceptance(&self) -> [PlacementAcceptance; IN];
50
51 fn capabilities(&self) -> NodeCapabilities;
53
54 fn policy(&self) -> NodePolicy;
56
57 #[inline]
62 fn select_input(&mut self, occ: &[EdgeOccupancy; IN]) -> Option<usize> {
63 occ.iter().position(|o| *o.items() > 0)
64 }
65}
66
67pub struct SinkNode<S, InP, const IN: usize>
72where
73 S: Sink<InP, IN>,
74 InP: Payload,
75{
76 sink: S,
77 policy: NodePolicy,
78 _pd: PhantomData<InP>,
79}
80
81impl<S, InP, const IN: usize> SinkNode<S, InP, IN>
82where
83 S: Sink<InP, IN>,
84 InP: Payload,
85{
86 #[inline]
88 pub const fn new(sink: S, policy: NodePolicy) -> Self {
89 Self {
90 sink,
91 policy,
92 _pd: PhantomData,
93 }
94 }
95
96 #[inline]
98 pub fn sink_ref(&self) -> &S {
99 &self.sink
100 }
101
102 #[inline]
104 pub fn sink_mut(&mut self) -> &mut S {
105 &mut self.sink
106 }
107}
108
109impl<S, InP, const IN: usize> From<S> for SinkNode<S, InP, IN>
111where
112 S: Sink<InP, IN>,
113 InP: Payload,
114{
115 #[inline]
116 fn from(sink: S) -> Self {
117 let policy = sink.policy();
118 SinkNode::new(sink, policy)
119 }
120}
121
122impl<S, InP, const IN: usize> Node<IN, 0, InP, ()> for SinkNode<S, InP, IN>
123where
124 S: Sink<InP, IN>,
125 InP: Payload + Copy,
126{
127 #[inline]
128 fn describe_capabilities(&self) -> NodeCapabilities {
129 self.sink.capabilities()
130 }
131
132 #[inline]
133 fn input_acceptance(&self) -> [PlacementAcceptance; IN] {
134 self.sink.input_acceptance()
135 }
136
137 #[inline]
138 fn output_acceptance(&self) -> [PlacementAcceptance; 0] {
139 []
140 }
141
142 #[inline]
143 fn policy(&self) -> NodePolicy {
144 self.policy
145 }
146
147 #[cfg(any(test, feature = "bench"))]
149 fn set_policy(&mut self, policy: NodePolicy) {
150 self.policy = policy;
151 }
152
153 #[inline]
154 fn node_kind(&self) -> NodeKind {
155 NodeKind::Sink
156 }
157
158 #[inline]
159 fn initialize<C, T>(&mut self, _c: &C, _t: &mut T) -> Result<(), NodeError>
160 where
161 T: Telemetry,
162 {
163 self.sink
164 .open()
165 .map_err(|_| NodeError::external_unavailable())
166 }
167
168 #[inline]
169 fn start<C, T>(&mut self, _c: &C, _t: &mut T) -> Result<(), NodeError>
170 where
171 T: Telemetry,
172 {
173 Ok(())
174 }
175
176 #[inline]
177 fn process_message<C>(
178 &mut self,
179 msg: &Message<InP>,
180 _sys_clock: &C,
181 ) -> Result<ProcessResult<()>, NodeError>
182 where
183 C: PlatformClock + Sized,
184 {
185 self.sink
186 .consume(msg)
187 .map(|_| ProcessResult::Consumed)
188 .map_err(|_| NodeError::execution_failed())
189 }
190
191 #[inline]
192 fn step<'g, 't, 'ck, InQ, OutQ, InM, OutM, C, Tel>(
193 &mut self,
194 cx: &mut StepContext<'g, 't, 'ck, IN, 0, InP, (), InQ, OutQ, InM, OutM, C, Tel>,
195 ) -> Result<StepResult, NodeError>
196 where
197 InQ: Edge,
198 OutQ: Edge,
199 InM: MemoryManager<InP>,
200 OutM: MemoryManager<()>,
201 C: PlatformClock + Sized,
202 Tel: Telemetry + Sized,
203 {
204 let occ: [EdgeOccupancy; IN] = core::array::from_fn(|i| cx.in_occupancy(i));
206 let port = match self.sink.select_input(&occ) {
207 Some(i) => i,
208 None => return Ok(StepResult::NoInput),
209 };
210
211 cx.pop_and_process(port, |msg| {
212 self.sink
213 .consume(msg)
214 .map(|_| ProcessResult::Consumed)
215 .map_err(|_| NodeError::execution_failed())
216 })
217 }
218
219 #[inline]
220 fn step_batch<'graph, 'telemetry, 'clock, InQ, OutQ, InM, OutM, C, Tel>(
221 &mut self,
222 ctx: &mut StepContext<
223 'graph,
224 'telemetry,
225 'clock,
226 IN,
227 0,
228 InP,
229 (),
230 InQ,
231 OutQ,
232 InM,
233 OutM,
234 C,
235 Tel,
236 >,
237 ) -> Result<StepResult, NodeError>
238 where
239 InQ: Edge,
240 OutQ: Edge,
241 InM: MemoryManager<InP>,
242 OutM: MemoryManager<()>,
243 C: PlatformClock + Sized,
244 Tel: Telemetry + Sized,
245 {
246 let node_policy = self.policy();
247 let port = match (0..IN).find(|&p| ctx.input_edge_has_batch(p, &node_policy)) {
248 Some(p) => p,
249 None => return Ok(StepResult::NoInput),
250 };
251 let nmax = node_policy.batching().fixed_n().unwrap_or(1);
252 let clock = ctx.clock;
253
254 ctx.pop_batch_and_process(port, nmax, &node_policy, |msg| {
255 self.process_message(msg, clock)
256 })
257 }
258
259 #[inline]
260 fn on_watchdog_timeout<C, Tel>(
261 &mut self,
262 clock: &C,
263 _t: &mut Tel,
264 ) -> Result<StepResult, NodeError>
265 where
266 C: PlatformClock + Sized,
267 Tel: Telemetry,
268 {
269 Ok(StepResult::YieldUntil(clock.now_ticks()))
270 }
271
272 #[inline]
273 fn stop<C, Tel>(&mut self, _c: &C, _t: &mut Tel) -> Result<(), NodeError>
274 where
275 Tel: Telemetry,
276 {
277 Ok(())
278 }
279}