pub mod link;
pub mod model;
pub mod sink;
pub mod source;
#[cfg(any(test, feature = "bench"))]
pub mod bench;
#[cfg(any(test, feature = "bench"))]
pub mod contract_tests;
#[cfg(any(test, feature = "bench"))]
pub use contract_tests::*;
use crate::edge::{Edge, EdgeOccupancy, EnqueueResult};
use crate::errors::{NodeError, QueueError};
use crate::memory::PlacementAcceptance;
use crate::message::{payload::Payload, Message};
use crate::policy::{
AdmissionDecision, BatchingPolicy, EdgePolicy, NodePolicy, SlidingWindow, WindowKind,
};
use crate::prelude::{BatchMessageIter, MemoryManager, PlatformClock, TelemetryKey, TelemetryKind};
use crate::telemetry::Telemetry;
use crate::types::Ticks;
#[non_exhaustive]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum NodeKind {
Source,
Process,
Model,
Split,
Join,
Sink,
External,
}
#[non_exhaustive]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub struct NodeCapabilities {
device_streams: bool,
degrade_tiers: bool,
}
impl NodeCapabilities {
#[inline]
pub const fn new(device_streams: bool, degrade_tiers: bool) -> Self {
Self {
device_streams,
degrade_tiers,
}
}
#[inline]
pub fn device_streams(&self) -> &bool {
&self.device_streams
}
#[inline]
pub fn degrade_tiers(&self) -> &bool {
&self.degrade_tiers
}
}
#[non_exhaustive]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum StepResult {
MadeProgress,
NoInput,
Backpressured,
WaitingOnExternal,
YieldUntil(Ticks),
Terminal,
}
#[non_exhaustive]
#[derive(Debug)]
pub enum ProcessResult<P: Payload> {
Output(Message<P>),
Consumed,
Skip,
}
pub struct StepContext<
'graph,
'telemetry,
'clock,
const IN: usize,
const OUT: usize,
InP,
OutP,
InQ,
OutQ,
InM,
OutM,
C,
T,
> where
InP: Payload,
OutP: Payload,
C: PlatformClock + Sized,
T: Telemetry + Sized,
{
inputs: [&'graph mut InQ; IN],
outputs: [&'graph mut OutQ; OUT],
in_managers: [&'graph mut InM; IN],
out_managers: [&'graph mut OutM; OUT],
in_policies: [EdgePolicy; IN],
out_policies: [EdgePolicy; OUT],
node_id: u32,
in_edge_ids: [u32; IN],
out_edge_ids: [u32; OUT],
clock: &'clock C,
telemetry: &'telemetry mut T,
_marker: core::marker::PhantomData<(InP, OutP)>,
}
impl<
'graph,
'telemetry,
'clock,
const IN: usize,
const OUT: usize,
InP,
OutP,
InQ,
OutQ,
InM,
OutM,
C,
T,
> StepContext<'graph, 'telemetry, 'clock, IN, OUT, InP, OutP, InQ, OutQ, InM, OutM, C, T>
where
InP: Payload,
OutP: Payload,
C: PlatformClock + Sized,
T: Telemetry + Sized,
{
#[allow(clippy::too_many_arguments)]
pub fn new(
inputs: [&'graph mut InQ; IN],
outputs: [&'graph mut OutQ; OUT],
in_managers: [&'graph mut InM; IN],
out_managers: [&'graph mut OutM; OUT],
in_policies: [EdgePolicy; IN],
out_policies: [EdgePolicy; OUT],
node_id: u32,
in_edge_ids: [u32; IN],
out_edge_ids: [u32; OUT],
clock: &'clock C,
telemetry: &'telemetry mut T,
) -> Self {
Self {
inputs,
outputs,
in_managers,
out_managers,
in_policies,
out_policies,
node_id,
in_edge_ids,
out_edge_ids,
clock,
telemetry,
_marker: core::marker::PhantomData,
}
}
}
impl<
'graph,
'telemetry,
'clock,
const IN: usize,
const OUT: usize,
InP,
OutP,
InQ,
OutQ,
InM,
OutM,
C,
T,
> StepContext<'graph, 'telemetry, 'clock, IN, OUT, InP, OutP, InQ, OutQ, InM, OutM, C, T>
where
InP: Payload,
OutP: Payload,
InQ: Edge,
OutQ: Edge,
InM: MemoryManager<InP>,
OutM: MemoryManager<OutP>,
C: PlatformClock + Sized,
T: Telemetry + Sized,
{
#[inline]
pub fn in_peek_header(&self, i: usize) -> Result<InM::HeaderGuard<'_>, QueueError> {
debug_assert!(i < IN);
let token = self.inputs[i].try_peek()?;
self.in_managers[i]
.peek_header(token)
.map_err(|_| QueueError::Empty)
}
pub fn pop_and_process<F>(&mut self, port: usize, f: F) -> Result<StepResult, NodeError>
where
F: FnOnce(&Message<InP>) -> Result<ProcessResult<OutP>, NodeError>,
{
debug_assert!(port < IN);
let token = match self.inputs[port].try_pop(&*self.in_managers[port]) {
Ok(t) => t,
Err(QueueError::Empty) => return Ok(StepResult::NoInput),
Err(QueueError::Backpressured) | Err(QueueError::AtOrAboveHardCap) => {
return Err(NodeError::backpressured());
}
Err(QueueError::Poisoned) | Err(QueueError::Unsupported) => {
return Err(NodeError::execution_failed());
}
};
let guard = self.in_managers[port]
.read(token)
.map_err(|_| NodeError::execution_failed())?;
let result = f(&*guard)?;
drop(guard);
let _ = self.in_managers[port].free(token);
if T::METRICS_ENABLED {
self.telemetry.incr_counter(
TelemetryKey::node(self.node_id, TelemetryKind::IngressMsgs),
1,
);
let _ = self.in_occupancy(port);
}
match result {
ProcessResult::Output(out_msg) => self.push_output(0, out_msg),
ProcessResult::Consumed => Ok(StepResult::MadeProgress),
ProcessResult::Skip => Ok(StepResult::NoInput),
}
}
pub fn pop_batch_and_process<F>(
&mut self,
port: usize,
nmax: usize,
node_policy: &NodePolicy,
mut f: F,
) -> Result<StepResult, NodeError>
where
F: FnMut(&Message<InP>) -> Result<ProcessResult<OutP>, NodeError>,
{
debug_assert!(port < IN);
if nmax == 0 {
return Err(NodeError::execution_failed());
}
let requested_policy = {
let nb = *node_policy.batching();
BatchingPolicy::with_window(
nb.fixed_n().map(|f_n| core::cmp::min(f_n, nmax)),
*nb.max_delta_t(),
match nb.window_kind() {
WindowKind::Disjoint => WindowKind::Disjoint,
WindowKind::Sliding(sw) => {
let size = nb
.fixed_n()
.map(|f_n| core::cmp::min(f_n, nmax))
.unwrap_or(1);
let stride = core::cmp::min(*sw.stride(), size);
WindowKind::Sliding(SlidingWindow::new(stride))
}
},
)
};
let stride = match requested_policy.window_kind() {
WindowKind::Disjoint => usize::MAX,
WindowKind::Sliding(sw) => *sw.stride(),
};
let occ_before = self.inputs[port].occupancy(&self.in_policies[port]);
let batch =
match self.inputs[port].try_pop_batch(&requested_policy, &*self.in_managers[port]) {
Ok(b) => b,
Err(QueueError::Empty) => return Ok(StepResult::NoInput),
Err(QueueError::Backpressured) | Err(QueueError::AtOrAboveHardCap) => {
return Err(NodeError::backpressured());
}
Err(QueueError::Poisoned) => {
return Err(NodeError::execution_failed().with_code(1));
}
Err(QueueError::Unsupported) => {
return Err(NodeError::execution_failed().with_code(2));
}
};
let batch_len = batch.len();
if batch_len == 0 {
return Ok(StepResult::NoInput);
}
let actual_stride = core::cmp::min(stride, batch_len);
let in_mgr: &mut InM = &mut *self.in_managers[port];
for (idx, &token) in batch.as_slice().iter().enumerate() {
if idx < actual_stride {
if let Ok(mut wg) = in_mgr.read_mut(token) {
if idx == 0 {
wg.header_mut().set_first_in_batch();
}
if idx == batch_len - 1 || batch_len == 1 {
wg.header_mut().set_last_in_batch();
}
}
}
}
let iter =
BatchMessageIter::new(batch.as_slice().iter(), &*in_mgr, actual_stride, batch_len);
let out_policies = self.out_policies;
let out_edge_ids = self.out_edge_ids;
let node_id = self.node_id;
let clock = self.clock;
let telemetry: &mut T = &mut *self.telemetry;
let outputs = &mut self.outputs;
let out_managers = &mut self.out_managers;
let mut out = OutStepContext {
outputs,
out_managers,
out_policies,
out_edge_ids,
node_id,
clock,
telemetry,
_marker: core::marker::PhantomData,
};
let mut any_made = false;
let mut backpressured = false;
for guard in iter {
if backpressured {
drop(guard);
continue;
}
match f(&*guard)? {
ProcessResult::Output(out_msg) => {
drop(guard);
match out.out_try_push(0, out_msg) {
EnqueueResult::Enqueued => {
any_made = true;
}
EnqueueResult::DroppedNewest | EnqueueResult::Rejected => {
backpressured = true;
}
}
}
ProcessResult::Consumed => {
drop(guard);
any_made = true;
}
ProcessResult::Skip => {
drop(guard);
}
}
}
for (idx, &token) in batch.as_slice().iter().enumerate() {
if idx < actual_stride {
let _ = in_mgr.free(token);
}
}
if T::METRICS_ENABLED {
let telemetry = &mut *out.telemetry;
telemetry.incr_counter(
TelemetryKey::node(node_id, TelemetryKind::IngressMsgs),
actual_stride as u64,
);
let after_items = occ_before.items().saturating_sub(actual_stride);
telemetry.set_gauge(
TelemetryKey::edge(self.in_edge_ids[port], TelemetryKind::QueueDepth),
after_items as u64,
);
}
if backpressured {
Ok(StepResult::Backpressured)
} else if any_made {
Ok(StepResult::MadeProgress)
} else {
Ok(StepResult::NoInput)
}
}
#[inline]
pub fn in_occupancy(&mut self, i: usize) -> EdgeOccupancy {
debug_assert!(i < IN);
let occ = self.inputs[i].occupancy(&self.in_policies[i]);
if T::METRICS_ENABLED {
self.telemetry.set_gauge(
TelemetryKey::edge(self.in_edge_ids[i], TelemetryKind::QueueDepth),
*occ.items() as u64,
);
}
occ
}
#[inline]
pub fn in_policy(&mut self, i: usize) -> EdgePolicy {
debug_assert!(i < IN);
self.in_policies[i]
}
pub fn out_try_push(&mut self, o: usize, m: Message<OutP>) -> EnqueueResult {
debug_assert!(o < OUT);
let token = match self.out_managers[o].store(m) {
Ok(t) => t,
Err(_) => return EnqueueResult::Rejected,
};
let decision = self.outputs[o].get_admission_decision(
&self.out_policies[o],
token,
&*self.out_managers[o],
);
match decision {
AdmissionDecision::Evict(n) => {
for _ in 0..n {
match self.outputs[o].try_pop(&*self.out_managers[o]) {
Ok(evicted) => {
let _ = self.out_managers[o].free(evicted);
if T::METRICS_ENABLED {
self.telemetry.incr_counter(
TelemetryKey::node(self.node_id, TelemetryKind::Dropped),
1,
);
}
}
Err(_) => break,
}
}
}
AdmissionDecision::EvictUntilBelowHard => loop {
let occ = self.outputs[o].occupancy(&self.out_policies[o]);
if !self.out_policies[o]
.caps
.at_or_above_hard(*occ.items(), *occ.bytes())
{
break;
}
match self.outputs[o].try_pop(&*self.out_managers[o]) {
Ok(evicted) => {
let _ = self.out_managers[o].free(evicted);
if T::METRICS_ENABLED {
self.telemetry.incr_counter(
TelemetryKey::node(self.node_id, TelemetryKind::Dropped),
1,
);
}
}
Err(_) => break,
}
},
AdmissionDecision::DropNewest => {
let _ = self.out_managers[o].free(token);
if T::METRICS_ENABLED {
self.telemetry
.incr_counter(TelemetryKey::node(self.node_id, TelemetryKind::Dropped), 1);
}
return EnqueueResult::DroppedNewest;
}
AdmissionDecision::Reject | AdmissionDecision::Block => {
let _ = self.out_managers[o].free(token);
if T::METRICS_ENABLED {
self.telemetry
.incr_counter(TelemetryKey::node(self.node_id, TelemetryKind::Dropped), 1);
}
return EnqueueResult::Rejected;
}
AdmissionDecision::Admit => {}
}
match self.outputs[o].try_push(token, &self.out_policies[o], &*self.out_managers[o]) {
EnqueueResult::Enqueued => {
if T::METRICS_ENABLED {
self.telemetry.incr_counter(
TelemetryKey::node(self.node_id, TelemetryKind::EgressMsgs),
1,
);
let _ = self.out_occupancy(o);
}
EnqueueResult::Enqueued
}
EnqueueResult::DroppedNewest | EnqueueResult::Rejected => {
let _ = self.out_managers[o].free(token);
if T::METRICS_ENABLED {
self.telemetry
.incr_counter(TelemetryKey::node(self.node_id, TelemetryKind::Dropped), 1);
}
EnqueueResult::Rejected
}
}
}
pub fn push_output(
&mut self,
port: usize,
msg: Message<OutP>,
) -> Result<StepResult, NodeError> {
debug_assert!(port < OUT);
let admission_decision =
self.outputs[port].get_admission_decision_from_message(&self.out_policies[port], &msg);
match admission_decision {
AdmissionDecision::Evict(eviction_count) => {
for _ in 0..eviction_count {
match self.outputs[port].try_pop(&*self.out_managers[port]) {
Ok(evicted_token) => {
let _ = self.out_managers[port].free(evicted_token);
if T::METRICS_ENABLED {
self.telemetry.incr_counter(
TelemetryKey::node(self.node_id, TelemetryKind::Dropped),
1,
);
}
}
Err(_) => break,
}
}
}
AdmissionDecision::EvictUntilBelowHard => loop {
let occupancy = self.outputs[port].occupancy(&self.out_policies[port]);
if !self.out_policies[port]
.caps
.at_or_above_hard(*occupancy.items(), *occupancy.bytes())
{
break;
}
match self.outputs[port].try_pop(&*self.out_managers[port]) {
Ok(evicted_token) => {
let _ = self.out_managers[port].free(evicted_token);
if T::METRICS_ENABLED {
self.telemetry.incr_counter(
TelemetryKey::node(self.node_id, TelemetryKind::Dropped),
1,
);
}
}
Err(_) => break,
}
},
AdmissionDecision::DropNewest
| AdmissionDecision::Reject
| AdmissionDecision::Block => {
if T::METRICS_ENABLED {
self.telemetry
.incr_counter(TelemetryKey::node(self.node_id, TelemetryKind::Dropped), 1);
}
return Ok(StepResult::Backpressured);
}
AdmissionDecision::Admit => {}
}
let token = match self.out_managers[port].store(msg) {
Ok(token) => token,
Err(_) => {
if T::METRICS_ENABLED {
self.telemetry
.incr_counter(TelemetryKey::node(self.node_id, TelemetryKind::Dropped), 1);
}
return Ok(StepResult::Backpressured);
}
};
match self.outputs[port].try_push(
token,
&self.out_policies[port],
&*self.out_managers[port],
) {
EnqueueResult::Enqueued => {
if T::METRICS_ENABLED {
self.telemetry.incr_counter(
TelemetryKey::node(self.node_id, TelemetryKind::EgressMsgs),
1,
);
let _ = self.out_occupancy(port);
}
Ok(StepResult::MadeProgress)
}
EnqueueResult::DroppedNewest | EnqueueResult::Rejected => {
let _ = self.out_managers[port].free(token);
if T::METRICS_ENABLED {
self.telemetry
.incr_counter(TelemetryKey::node(self.node_id, TelemetryKind::Dropped), 1);
}
Ok(StepResult::Backpressured)
}
}
}
#[inline]
pub fn out_occupancy(&mut self, o: usize) -> EdgeOccupancy {
debug_assert!(o < OUT);
let occ = self.outputs[o].occupancy(&self.out_policies[o]);
if T::METRICS_ENABLED {
self.telemetry.set_gauge(
TelemetryKey::edge(self.out_edge_ids[o], TelemetryKind::QueueDepth),
*occ.items() as u64,
);
}
occ
}
#[inline]
pub fn out_policy(&mut self, i: usize) -> EdgePolicy {
debug_assert!(i < OUT);
self.out_policies[i]
}
#[inline]
pub fn clock(&self) -> &C {
self.clock
}
#[inline]
pub fn telemetry_mut(&mut self) -> &mut T {
self.telemetry
}
#[inline]
pub fn now_ticks(&self) -> Ticks {
self.clock.now_ticks()
}
#[inline]
pub fn now_nanos(&self) -> u64 {
self.clock.ticks_to_nanos(self.clock.now_ticks())
}
#[inline]
pub fn ticks_to_nanos(&self, t: Ticks) -> u64 {
self.clock.ticks_to_nanos(t)
}
#[inline]
pub fn nanos_to_ticks(&self, ns: u64) -> Ticks {
self.clock.nanos_to_ticks(ns)
}
#[inline]
pub fn input_edge_has_batch(&mut self, port: usize, policy: &NodePolicy) -> bool {
debug_assert!(port < IN);
let occ = self.in_occupancy(port);
if occ.items() == &0 {
return false;
}
let fixed_opt = *policy.batching().fixed_n();
let delta_opt = *policy.batching().max_delta_t();
match (fixed_opt, delta_opt) {
(Some(fixed_n), None) => *occ.items() >= fixed_n,
(None, Some(_max_delta_t)) => true,
(Some(fixed_n), Some(max_delta_t)) => {
if *occ.items() < fixed_n {
return false;
}
let first_token = match self.inputs[port].try_peek_at(0) {
Ok(t) => t,
Err(_) => return false,
};
let last_token = match self.inputs[port].try_peek_at(fixed_n - 1) {
Ok(t) => t,
Err(_) => return false,
};
let first_ticks = match self.in_managers[port].peek_header(first_token) {
Ok(h) => *h.creation_tick(),
Err(_) => return false,
};
let last_ticks = match self.in_managers[port].peek_header(last_token) {
Ok(h) => *h.creation_tick(),
Err(_) => return false,
};
let span = last_ticks.saturating_sub(first_ticks);
span <= max_delta_t
}
(None, None) => true,
}
}
#[allow(dead_code)]
#[inline]
fn as_out_step_context<'ctx>(
&'ctx mut self,
) -> OutStepContext<'graph, 'ctx, 'clock, OUT, OutP, OutQ, OutM, C, T>
where
EdgePolicy: Copy,
{
let out_policies = self.out_policies;
let out_edge_ids = self.out_edge_ids;
let node_id = self.node_id;
let clock = self.clock;
let telemetry = &mut *self.telemetry;
let outputs: &'ctx mut [&'graph mut OutQ; OUT] = &mut self.outputs;
let out_managers: &'ctx mut [&'graph mut OutM; OUT] = &mut self.out_managers;
OutStepContext {
outputs,
out_managers,
out_policies,
out_edge_ids,
node_id,
clock,
telemetry,
_marker: core::marker::PhantomData,
}
}
}
pub struct OutStepContext<'graph, 'ctx, 'clock, const OUT: usize, OutP, OutQ, OutM, C, T>
where
OutP: Payload,
C: PlatformClock + Sized,
T: Telemetry + Sized,
{
outputs: &'ctx mut [&'graph mut OutQ; OUT],
out_managers: &'ctx mut [&'graph mut OutM; OUT],
out_policies: [EdgePolicy; OUT],
out_edge_ids: [u32; OUT],
node_id: u32,
clock: &'clock C,
telemetry: &'ctx mut T,
_marker: core::marker::PhantomData<OutP>,
}
impl<'graph, 'ctx, 'clock, const OUT: usize, OutP, OutQ, OutM, C, T>
OutStepContext<'graph, 'ctx, 'clock, OUT, OutP, OutQ, OutM, C, T>
where
OutP: Payload,
OutQ: Edge,
OutM: MemoryManager<OutP>,
C: PlatformClock + Sized,
T: Telemetry + Sized,
{
pub fn out_try_push(&mut self, o: usize, m: Message<OutP>) -> EnqueueResult {
debug_assert!(o < OUT);
let token = match self.out_managers[o].store(m) {
Ok(t) => t,
Err(_) => return EnqueueResult::Rejected,
};
let decision = self.outputs[o].get_admission_decision(
&self.out_policies[o],
token,
&*self.out_managers[o],
);
match decision {
AdmissionDecision::Evict(n) => {
for _ in 0..n {
match self.outputs[o].try_pop(&*self.out_managers[o]) {
Ok(evicted) => {
let _ = self.out_managers[o].free(evicted);
if T::METRICS_ENABLED {
self.telemetry.incr_counter(
TelemetryKey::node(self.node_id, TelemetryKind::Dropped),
1,
);
}
}
Err(_) => break,
}
}
}
AdmissionDecision::EvictUntilBelowHard => loop {
let occ = self.outputs[o].occupancy(&self.out_policies[o]);
if !self.out_policies[o]
.caps
.at_or_above_hard(*occ.items(), *occ.bytes())
{
break;
}
match self.outputs[o].try_pop(&*self.out_managers[o]) {
Ok(evicted) => {
let _ = self.out_managers[o].free(evicted);
if T::METRICS_ENABLED {
self.telemetry.incr_counter(
TelemetryKey::node(self.node_id, TelemetryKind::Dropped),
1,
);
}
}
Err(_) => break,
}
},
AdmissionDecision::DropNewest => {
let _ = self.out_managers[o].free(token);
if T::METRICS_ENABLED {
self.telemetry
.incr_counter(TelemetryKey::node(self.node_id, TelemetryKind::Dropped), 1);
}
return EnqueueResult::DroppedNewest;
}
AdmissionDecision::Reject | AdmissionDecision::Block => {
let _ = self.out_managers[o].free(token);
if T::METRICS_ENABLED {
self.telemetry
.incr_counter(TelemetryKey::node(self.node_id, TelemetryKind::Dropped), 1);
}
return EnqueueResult::Rejected;
}
AdmissionDecision::Admit => {}
}
match self.outputs[o].try_push(token, &self.out_policies[o], &*self.out_managers[o]) {
EnqueueResult::Enqueued => {
if T::METRICS_ENABLED {
self.telemetry.incr_counter(
TelemetryKey::node(self.node_id, TelemetryKind::EgressMsgs),
1,
);
let occ = self.outputs[o].occupancy(&self.out_policies[o]);
self.telemetry.set_gauge(
TelemetryKey::edge(self.out_edge_ids[o], TelemetryKind::QueueDepth),
*occ.items() as u64,
);
}
EnqueueResult::Enqueued
}
EnqueueResult::DroppedNewest | EnqueueResult::Rejected => {
let _ = self.out_managers[o].free(token);
if T::METRICS_ENABLED {
self.telemetry
.incr_counter(TelemetryKey::node(self.node_id, TelemetryKind::Dropped), 1);
}
EnqueueResult::Rejected
}
}
}
#[inline]
pub fn out_occupancy(&mut self, o: usize) -> EdgeOccupancy {
debug_assert!(o < OUT);
let occ = self.outputs[o].occupancy(&self.out_policies[o]);
if T::METRICS_ENABLED {
self.telemetry.set_gauge(
TelemetryKey::edge(self.out_edge_ids[o], TelemetryKind::QueueDepth),
*occ.items() as u64,
);
}
occ
}
#[inline]
pub fn out_policy(&mut self, o: usize) -> EdgePolicy {
debug_assert!(o < OUT);
self.out_policies[o]
}
#[inline]
pub fn clock(&self) -> &C {
self.clock
}
#[inline]
pub fn telemetry_mut(&mut self) -> &mut T {
self.telemetry
}
#[inline]
pub fn now_ticks(&self) -> Ticks {
self.clock.now_ticks()
}
#[inline]
pub fn now_nanos(&self) -> u64 {
self.clock.ticks_to_nanos(self.clock.now_ticks())
}
#[inline]
pub fn ticks_to_nanos(&self, t: Ticks) -> u64 {
self.clock.ticks_to_nanos(t)
}
#[inline]
pub fn nanos_to_ticks(&self, ns: u64) -> Ticks {
self.clock.nanos_to_ticks(ns)
}
}
pub trait Node<const IN: usize, const OUT: usize, InP, OutP>
where
InP: Payload,
OutP: Payload,
{
fn describe_capabilities(&self) -> NodeCapabilities;
fn input_acceptance(&self) -> [PlacementAcceptance; IN];
fn output_acceptance(&self) -> [PlacementAcceptance; OUT];
fn policy(&self) -> NodePolicy;
#[cfg(any(test, feature = "bench"))]
fn set_policy(&mut self, policy: NodePolicy);
fn node_kind(&self) -> NodeKind;
fn initialize<C, Tel>(&mut self, clock: &C, telemetry: &mut Tel) -> Result<(), NodeError>
where
Tel: Telemetry;
fn start<C, Tel>(&mut self, _clock: &C, _telemetry: &mut Tel) -> Result<(), NodeError>
where
Tel: Telemetry;
fn process_message<C>(
&mut self,
msg: &Message<InP>,
sys_clock: &C,
) -> Result<ProcessResult<OutP>, NodeError>
where
C: PlatformClock + Sized;
fn step<'graph, 'telemetry, 'clock, InQ, OutQ, InM, OutM, C, Tel>(
&mut self,
ctx: &mut StepContext<
'graph,
'telemetry,
'clock,
IN,
OUT,
InP,
OutP,
InQ,
OutQ,
InM,
OutM,
C,
Tel,
>,
) -> Result<StepResult, NodeError>
where
InQ: Edge,
OutQ: Edge,
InM: MemoryManager<InP>,
OutM: MemoryManager<OutP>,
C: PlatformClock + Sized,
Tel: Telemetry + Sized,
{
let node_policy = self.policy();
let port = match (0..IN).find(|&p| ctx.input_edge_has_batch(p, &node_policy)) {
Some(p) => p,
None => return Ok(StepResult::NoInput),
};
ctx.pop_and_process(port, |msg| self.process_message(msg, ctx.clock))
}
fn step_batch<'graph, 'telemetry, 'clock, InQ, OutQ, InM, OutM, C, Tel>(
&mut self,
ctx: &mut StepContext<
'graph,
'telemetry,
'clock,
IN,
OUT,
InP,
OutP,
InQ,
OutQ,
InM,
OutM,
C,
Tel,
>,
) -> Result<StepResult, NodeError>
where
InQ: Edge,
OutQ: Edge,
InM: MemoryManager<InP>,
OutM: MemoryManager<OutP>,
C: PlatformClock + Sized,
Tel: Telemetry + Sized,
{
let node_policy = self.policy();
let port = match (0..IN).find(|&p| ctx.input_edge_has_batch(p, &node_policy)) {
Some(p) => p,
None => return Ok(StepResult::NoInput),
};
let nmax = node_policy.batching().fixed_n().unwrap_or(1);
ctx.pop_batch_and_process(port, nmax, &node_policy, |msg| {
self.process_message(msg, ctx.clock)
})
}
fn on_watchdog_timeout<C, Tel>(
&mut self,
_clock: &C,
_telemetry: &mut Tel,
) -> Result<StepResult, NodeError>
where
C: PlatformClock + Sized,
Tel: Telemetry;
fn stop<C, Tel>(&mut self, _clock: &C, _telemetry: &mut Tel) -> Result<(), NodeError>
where
Tel: Telemetry;
}