use crate::{
edge::Edge,
errors::{NodeError, NodeErrorKind},
memory::PlacementAcceptance,
message::{payload::Payload, Message},
node::{Node, NodeCapabilities, NodeKind, ProcessResult, StepContext, StepResult},
policy::NodePolicy,
prelude::{
MemoryManager, NodeStepError, NodeStepTelemetry, PlatformClock, Telemetry, TelemetryEvent,
TelemetryKey, TelemetryKind,
},
types::{NodeIndex, PortId, PortIndex},
};
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct NodeLink<N, const IN: usize, const OUT: usize, InP, OutP>
where
InP: Payload,
OutP: Payload,
N: Node<IN, OUT, InP, OutP>,
{
node: N,
id: NodeIndex,
name: Option<&'static str>,
_payload_marker: core::marker::PhantomData<(InP, OutP)>,
}
impl<N, const IN: usize, const OUT: usize, InP, OutP> NodeLink<N, IN, OUT, InP, OutP>
where
InP: Payload,
OutP: Payload,
N: Node<IN, OUT, InP, OutP>,
{
pub fn new(node: N, id: NodeIndex, name: Option<&'static str>) -> Self {
Self {
node,
id,
name,
_payload_marker: core::marker::PhantomData,
}
}
#[inline]
pub fn node(&self) -> &N {
&self.node
}
#[inline]
pub fn node_mut(&mut self) -> &mut N {
&mut self.node
}
#[inline]
pub fn id(&self) -> NodeIndex {
self.id
}
#[inline]
pub fn input_port_ids(&self) -> [PortId; IN] {
core::array::from_fn(|i| PortId::new(self.id, PortIndex::new(i)))
}
#[inline]
pub fn output_port_ids(&self) -> [PortId; OUT] {
core::array::from_fn(|i| PortId::new(self.id, PortIndex::new(i)))
}
pub fn policy(&self) -> NodePolicy {
self.node.policy()
}
#[inline]
pub fn name(&self) -> Option<&'static str> {
self.name
}
#[inline]
pub fn descriptor(&self) -> NodeDescriptor {
NodeDescriptor {
id: self.id(),
kind: self.node.node_kind(),
in_ports: IN as u16,
out_ports: OUT as u16,
name: self.name(),
}
}
}
impl<N, const IN: usize, const OUT: usize, InP, OutP> Node<IN, OUT, InP, OutP>
for NodeLink<N, IN, OUT, InP, OutP>
where
InP: Payload,
OutP: Payload,
N: Node<IN, OUT, InP, OutP>,
{
#[inline]
fn describe_capabilities(&self) -> NodeCapabilities {
self.node.describe_capabilities()
}
#[inline]
fn input_acceptance(&self) -> [PlacementAcceptance; IN] {
self.node.input_acceptance()
}
#[inline]
fn output_acceptance(&self) -> [PlacementAcceptance; OUT] {
self.node.output_acceptance()
}
#[inline]
fn policy(&self) -> NodePolicy {
self.node.policy()
}
#[cfg(any(test, feature = "bench"))]
fn set_policy(&mut self, policy: NodePolicy) {
self.node.set_policy(policy);
}
#[inline]
fn node_kind(&self) -> NodeKind {
self.node.node_kind()
}
#[inline]
fn initialize<C, T>(&mut self, clock: &C, telemetry: &mut T) -> Result<(), NodeError>
where
T: Telemetry,
{
self.node.initialize(clock, telemetry)
}
#[inline]
fn start<C, T>(&mut self, clock: &C, telemetry: &mut T) -> Result<(), NodeError>
where
T: Telemetry,
{
self.node.start(clock, telemetry)
}
fn process_message<C>(
&mut self,
msg: &Message<InP>,
sys_clock: &C,
) -> Result<ProcessResult<OutP>, NodeError>
where
C: PlatformClock + Sized,
{
self.node.process_message(msg, sys_clock)
}
#[inline]
fn step<'graph, 'telemetry, 'clock, InQ, OutQ, InM, OutM, C, T>(
&mut self,
ctx: &mut StepContext<
'graph,
'telemetry,
'clock,
IN,
OUT,
InP,
OutP,
InQ,
OutQ,
InM,
OutM,
C,
T,
>,
) -> Result<StepResult, NodeError>
where
InQ: Edge,
OutQ: Edge,
InM: MemoryManager<InP>,
OutM: MemoryManager<OutP>,
C: PlatformClock + Sized,
T: Telemetry + Sized,
{
let policy = self.node.policy();
let batching_enabled = {
let nb = policy.batching();
(nb.fixed_n().unwrap_or(1) > 1) || nb.max_delta_t().is_some()
};
if !T::METRICS_ENABLED {
if batching_enabled {
return self.node.step_batch(ctx);
} else {
return self.node.step(ctx);
}
}
const GRAPH_ID: crate::telemetry::GraphInstanceId = 0;
let policy = self.node.policy();
let budget_policy = policy.budget();
let deadline_policy = policy.deadline();
let timestamp_start_ns = ctx.now_nanos();
let result = if batching_enabled {
self.node.step_batch(ctx)
} else {
self.node.step(ctx)
};
let timestamp_end_ns = ctx.now_nanos();
let duration_ns = timestamp_end_ns.saturating_sub(timestamp_start_ns);
let mut budget_ns_opt: Option<u64> = None;
if let Some(default_deadline_ns) = deadline_policy.default_deadline_ns() {
budget_ns_opt = Some(*default_deadline_ns.as_u64());
} else if let Some(tick_budget) = budget_policy.tick_budget() {
let budget_ns = ctx.ticks_to_nanos(*tick_budget);
budget_ns_opt = Some(budget_ns);
}
let slack_ns: u64 = match deadline_policy.slack_tolerance_ns() {
Some(slack) => *slack.as_u64(),
None => 0,
};
let mut deadline_ns: Option<u64> = None;
let mut deadline_missed = false;
if let Some(budget_ns) = budget_ns_opt {
deadline_ns = Some(timestamp_start_ns.saturating_add(budget_ns));
if duration_ns > budget_ns.saturating_add(slack_ns) {
deadline_missed = true;
}
}
let telemetry = ctx.telemetry_mut();
telemetry.record_latency_ns(
TelemetryKey::node(*self.id.as_usize() as u32, TelemetryKind::Latency),
duration_ns,
);
if deadline_missed {
telemetry.incr_counter(
TelemetryKey::node(*self.id.as_usize() as u32, TelemetryKind::DeadlineMiss),
1,
);
}
if let Ok(step_result) = &result {
use crate::node::StepResult::*;
match step_result {
MadeProgress | Terminal | YieldUntil(_) => {
telemetry.incr_counter(
TelemetryKey::node(*self.id.as_usize() as u32, TelemetryKind::Processed),
policy.batching().fixed_n().unwrap_or(1) as u64,
);
}
NoInput | Backpressured | WaitingOnExternal => {
}
}
}
if T::EVENTS_STATICALLY_ENABLED && telemetry.events_enabled() {
let error_kind = match &result {
Ok(step_result) => {
use crate::node::StepResult::*;
match step_result {
NoInput => Some(NodeStepError::NoInput),
Backpressured => Some(NodeStepError::Backpressured),
WaitingOnExternal => Some(NodeStepError::ExternalUnavailable),
MadeProgress | Terminal | YieldUntil(_) => {
if deadline_missed {
Some(NodeStepError::OverBudget)
} else {
None
}
}
}
}
Err(error) => {
Some(match error.kind() {
NodeErrorKind::NoInput => NodeStepError::NoInput,
NodeErrorKind::Backpressured => NodeStepError::Backpressured,
_ => NodeStepError::ExecutionFailed,
})
}
};
let event = TelemetryEvent::node_step(NodeStepTelemetry::new(
GRAPH_ID,
self.id,
self.name,
timestamp_start_ns,
timestamp_end_ns,
duration_ns,
policy.batching().fixed_n().unwrap_or(1) as u64,
deadline_ns,
deadline_missed,
error_kind,
));
telemetry.push_event(event);
}
result
}
fn step_batch<'graph, 'telemetry, 'clock, InQ, OutQ, InM, OutM, C, T>(
&mut self,
ctx: &mut StepContext<
'graph,
'telemetry,
'clock,
IN,
OUT,
InP,
OutP,
InQ,
OutQ,
InM,
OutM,
C,
T,
>,
) -> Result<StepResult, NodeError>
where
InQ: Edge,
OutQ: Edge,
InM: MemoryManager<InP>,
OutM: MemoryManager<OutP>,
C: PlatformClock + Sized,
T: Telemetry + Sized,
{
if !T::METRICS_ENABLED {
return self.node.step_batch(ctx);
}
const GRAPH_ID: crate::telemetry::GraphInstanceId = 0;
let policy = self.node.policy();
let budget_policy = policy.budget();
let deadline_policy = policy.deadline();
let timestamp_start_ns = ctx.now_nanos();
let result = self.node.step_batch(ctx);
let timestamp_end_ns = ctx.now_nanos();
let duration_ns = timestamp_end_ns.saturating_sub(timestamp_start_ns);
let mut budget_ns_opt: Option<u64> = None;
if let Some(default_deadline_ns) = deadline_policy.default_deadline_ns() {
budget_ns_opt = Some(*default_deadline_ns.as_u64());
} else if let Some(tick_budget) = budget_policy.tick_budget() {
let budget_ns = ctx.ticks_to_nanos(*tick_budget);
budget_ns_opt = Some(budget_ns);
}
let slack_ns: u64 = match deadline_policy.slack_tolerance_ns() {
Some(slack) => *slack.as_u64(),
None => 0,
};
let mut deadline_ns: Option<u64> = None;
let mut deadline_missed = false;
if let Some(budget_ns) = budget_ns_opt {
deadline_ns = Some(timestamp_start_ns.saturating_add(budget_ns));
if duration_ns > budget_ns.saturating_add(slack_ns) {
deadline_missed = true;
}
}
let telemetry = ctx.telemetry_mut();
telemetry.record_latency_ns(
TelemetryKey::node(*self.id.as_usize() as u32, TelemetryKind::Latency),
duration_ns,
);
if deadline_missed {
telemetry.incr_counter(
TelemetryKey::node(*self.id.as_usize() as u32, TelemetryKind::DeadlineMiss),
1,
);
}
if let Ok(step_result) = &result {
use crate::node::StepResult::*;
match step_result {
MadeProgress | Terminal | YieldUntil(_) => {
telemetry.incr_counter(
TelemetryKey::node(*self.id.as_usize() as u32, TelemetryKind::Processed),
policy.batching().fixed_n().unwrap_or(1) as u64,
);
}
NoInput | Backpressured | WaitingOnExternal => {
}
}
}
if T::EVENTS_STATICALLY_ENABLED && telemetry.events_enabled() {
let error_kind = match &result {
Ok(step_result) => {
use crate::node::StepResult::*;
match step_result {
NoInput => Some(NodeStepError::NoInput),
Backpressured => Some(NodeStepError::Backpressured),
WaitingOnExternal => Some(NodeStepError::ExternalUnavailable),
MadeProgress | Terminal | YieldUntil(_) => {
if deadline_missed {
Some(NodeStepError::OverBudget)
} else {
None
}
}
}
}
Err(error) => {
Some(match error.kind() {
NodeErrorKind::NoInput => NodeStepError::NoInput,
NodeErrorKind::Backpressured => NodeStepError::Backpressured,
_ => NodeStepError::ExecutionFailed,
})
}
};
let event = TelemetryEvent::node_step(NodeStepTelemetry::new(
GRAPH_ID,
self.id,
self.name,
timestamp_start_ns,
timestamp_end_ns,
duration_ns,
policy.batching().fixed_n().unwrap_or(1) as u64,
deadline_ns,
deadline_missed,
error_kind,
));
telemetry.push_event(event);
}
result
}
#[inline]
fn on_watchdog_timeout<C, T>(
&mut self,
clock: &C,
telemetry: &mut T,
) -> Result<StepResult, NodeError>
where
C: PlatformClock + Sized,
T: Telemetry,
{
self.node.on_watchdog_timeout(clock, telemetry)
}
#[inline]
fn stop<C, T>(&mut self, clock: &C, telemetry: &mut T) -> Result<(), NodeError>
where
T: Telemetry,
{
self.node.stop(clock, telemetry)
}
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct NodeDescriptor {
id: NodeIndex,
kind: NodeKind,
in_ports: u16,
out_ports: u16,
name: Option<&'static str>,
}
impl NodeDescriptor {
#[inline]
pub fn new(
id: NodeIndex,
kind: NodeKind,
in_ports: u16,
out_ports: u16,
name: Option<&'static str>,
) -> Self {
Self {
id,
kind,
in_ports,
out_ports,
name,
}
}
#[inline]
pub fn id(&self) -> &NodeIndex {
&self.id
}
#[inline]
pub fn kind(&self) -> &NodeKind {
&self.kind
}
#[inline]
pub fn in_ports(&self) -> &u16 {
&self.in_ports
}
#[inline]
pub fn out_ports(&self) -> &u16 {
&self.out_ports
}
#[inline]
pub fn name(&self) -> Option<&'static str> {
self.name
}
}