use crate::edge::{Edge, EdgeOccupancy};
use crate::errors::NodeError;
use crate::memory::PlacementAcceptance;
use crate::message::{payload::Payload, Message};
use crate::node::{Node, NodeCapabilities, NodeKind, ProcessResult, StepContext, StepResult};
use crate::policy::NodePolicy;
use crate::prelude::{MemoryManager, PlatformClock, Telemetry};
use core::marker::PhantomData;
pub trait Sink<InP, const IN: usize>
where
InP: Payload,
{
type Error;
fn open(&mut self) -> Result<(), Self::Error>;
fn consume(&mut self, msg: &Message<InP>) -> Result<(), Self::Error>;
fn input_acceptance(&self) -> [PlacementAcceptance; IN];
fn capabilities(&self) -> NodeCapabilities;
fn policy(&self) -> NodePolicy;
#[inline]
fn select_input(&mut self, occ: &[EdgeOccupancy; IN]) -> Option<usize> {
occ.iter().position(|o| *o.items() > 0)
}
}
pub struct SinkNode<S, InP, const IN: usize>
where
S: Sink<InP, IN>,
InP: Payload,
{
sink: S,
policy: NodePolicy,
_pd: PhantomData<InP>,
}
impl<S, InP, const IN: usize> SinkNode<S, InP, IN>
where
S: Sink<InP, IN>,
InP: Payload,
{
#[inline]
pub const fn new(sink: S, policy: NodePolicy) -> Self {
Self {
sink,
policy,
_pd: PhantomData,
}
}
#[inline]
pub fn sink_ref(&self) -> &S {
&self.sink
}
#[inline]
pub fn sink_mut(&mut self) -> &mut S {
&mut self.sink
}
}
impl<S, InP, const IN: usize> From<S> for SinkNode<S, InP, IN>
where
S: Sink<InP, IN>,
InP: Payload,
{
#[inline]
fn from(sink: S) -> Self {
let policy = sink.policy();
SinkNode::new(sink, policy)
}
}
impl<S, InP, const IN: usize> Node<IN, 0, InP, ()> for SinkNode<S, InP, IN>
where
S: Sink<InP, IN>,
InP: Payload + Copy,
{
#[inline]
fn describe_capabilities(&self) -> NodeCapabilities {
self.sink.capabilities()
}
#[inline]
fn input_acceptance(&self) -> [PlacementAcceptance; IN] {
self.sink.input_acceptance()
}
#[inline]
fn output_acceptance(&self) -> [PlacementAcceptance; 0] {
[]
}
#[inline]
fn policy(&self) -> NodePolicy {
self.policy
}
#[cfg(any(test, feature = "bench"))]
fn set_policy(&mut self, policy: NodePolicy) {
self.policy = policy;
}
#[inline]
fn node_kind(&self) -> NodeKind {
NodeKind::Sink
}
#[inline]
fn initialize<C, T>(&mut self, _c: &C, _t: &mut T) -> Result<(), NodeError>
where
T: Telemetry,
{
self.sink
.open()
.map_err(|_| NodeError::external_unavailable())
}
#[inline]
fn start<C, T>(&mut self, _c: &C, _t: &mut T) -> Result<(), NodeError>
where
T: Telemetry,
{
Ok(())
}
#[inline]
fn process_message<C>(
&mut self,
msg: &Message<InP>,
_sys_clock: &C,
) -> Result<ProcessResult<()>, NodeError>
where
C: PlatformClock + Sized,
{
self.sink
.consume(msg)
.map(|_| ProcessResult::Consumed)
.map_err(|_| NodeError::execution_failed())
}
#[inline]
fn step<'g, 't, 'ck, InQ, OutQ, InM, OutM, C, Tel>(
&mut self,
cx: &mut StepContext<'g, 't, 'ck, IN, 0, InP, (), InQ, OutQ, InM, OutM, C, Tel>,
) -> Result<StepResult, NodeError>
where
InQ: Edge,
OutQ: Edge,
InM: MemoryManager<InP>,
OutM: MemoryManager<()>,
C: PlatformClock + Sized,
Tel: Telemetry + Sized,
{
let occ: [EdgeOccupancy; IN] = core::array::from_fn(|i| cx.in_occupancy(i));
let port = match self.sink.select_input(&occ) {
Some(i) => i,
None => return Ok(StepResult::NoInput),
};
cx.pop_and_process(port, |msg| {
self.sink
.consume(msg)
.map(|_| ProcessResult::Consumed)
.map_err(|_| NodeError::execution_failed())
})
}
#[inline]
fn step_batch<'graph, 'telemetry, 'clock, InQ, OutQ, InM, OutM, C, Tel>(
&mut self,
ctx: &mut StepContext<
'graph,
'telemetry,
'clock,
IN,
0,
InP,
(),
InQ,
OutQ,
InM,
OutM,
C,
Tel,
>,
) -> Result<StepResult, NodeError>
where
InQ: Edge,
OutQ: Edge,
InM: MemoryManager<InP>,
OutM: MemoryManager<()>,
C: PlatformClock + Sized,
Tel: Telemetry + Sized,
{
let node_policy = self.policy();
let port = match (0..IN).find(|&p| ctx.input_edge_has_batch(p, &node_policy)) {
Some(p) => p,
None => return Ok(StepResult::NoInput),
};
let nmax = node_policy.batching().fixed_n().unwrap_or(1);
let clock = ctx.clock;
ctx.pop_batch_and_process(port, nmax, &node_policy, |msg| {
self.process_message(msg, clock)
})
}
#[inline]
fn on_watchdog_timeout<C, Tel>(
&mut self,
clock: &C,
_t: &mut Tel,
) -> Result<StepResult, NodeError>
where
C: PlatformClock + Sized,
Tel: Telemetry,
{
Ok(StepResult::YieldUntil(clock.now_ticks()))
}
#[inline]
fn stop<C, Tel>(&mut self, _c: &C, _t: &mut Tel) -> Result<(), NodeError>
where
Tel: Telemetry,
{
Ok(())
}
}