pub mod event_message;
pub mod graph_telemetry;
pub mod sink;
#[cfg(feature = "std")]
pub mod concurrent;
use core::fmt;
use crate::policy::WatermarkState;
use crate::types::{EdgeIndex, NodeIndex};
use event_message::EventMessage;
use sink::write_u64;
pub trait Telemetry {
const METRICS_ENABLED: bool = true;
const EVENTS_STATICALLY_ENABLED: bool = true;
fn incr_counter(&mut self, key: TelemetryKey, delta: u64);
fn set_gauge(&mut self, key: TelemetryKey, value: u64);
fn record_latency_ns(&mut self, key: TelemetryKey, value_ns: u64);
#[inline]
fn push_metrics(&mut self) {}
#[inline]
fn events_enabled(&self) -> bool {
false
}
#[inline]
fn push_event(&mut self, _event: TelemetryEvent) {}
#[inline]
fn flush(&mut self) {}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct TelemetryKey {
ns: TelemetryNs,
id: u32,
kind: TelemetryKind,
}
impl TelemetryKey {
#[inline]
pub const fn node(node_id: u32, kind: TelemetryKind) -> Self {
Self {
ns: TelemetryNs::Node,
id: node_id,
kind,
}
}
#[inline]
pub const fn edge(edge_id: u32, kind: TelemetryKind) -> Self {
Self {
ns: TelemetryNs::Edge,
id: edge_id,
kind,
}
}
#[inline]
pub const fn runtime(kind: TelemetryKind) -> Self {
Self {
ns: TelemetryNs::Runtime,
id: 0,
kind,
}
}
#[inline]
pub const fn node_port(
node_id: u32,
port_index: u16,
is_output: bool,
kind: TelemetryKind,
) -> Self {
let enc = ((node_id & 0x000F_FFFF) << 12)
| (((is_output as u32) & 0x1) << 11)
| (port_index as u32 & 0x7FF);
Self {
ns: TelemetryNs::Node,
id: enc,
kind,
}
}
#[inline]
pub const fn ns(&self) -> &TelemetryNs {
&self.ns
}
#[inline]
pub const fn id(&self) -> &u32 {
&self.id
}
#[inline]
pub const fn kind(&self) -> &TelemetryKind {
&self.kind
}
}
#[non_exhaustive]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum TelemetryNs {
Node,
Edge,
Runtime,
}
#[non_exhaustive]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum TelemetryKind {
Processed,
Dropped,
DeadlineMiss,
QueueDepth,
Latency,
IngressMsgs,
EgressMsgs,
}
pub type GraphInstanceId = u32;
#[non_exhaustive]
#[derive(Copy, Clone, Debug)]
pub enum NodeStepError {
NoInput,
Backpressured,
OverBudget,
ExternalUnavailable,
ExecutionFailed,
}
#[non_exhaustive]
#[derive(Copy, Clone, Debug)]
pub struct NodeStepTelemetry {
graph_id: GraphInstanceId,
node_index: NodeIndex,
node_name: Option<&'static str>,
timestamp_start_ns: u64,
timestamp_end_ns: u64,
duration_ns: u64,
processed_count: u64,
deadline_ns: Option<u64>,
deadline_missed: bool,
error_kind: Option<NodeStepError>,
}
impl NodeStepTelemetry {
#[inline]
#[allow(clippy::too_many_arguments)]
pub const fn new(
graph_id: GraphInstanceId,
node_index: NodeIndex,
node_name: Option<&'static str>,
timestamp_start_ns: u64,
timestamp_end_ns: u64,
duration_ns: u64,
processed_count: u64,
deadline_ns: Option<u64>,
deadline_missed: bool,
error_kind: Option<NodeStepError>,
) -> Self {
Self {
graph_id,
node_index,
node_name,
timestamp_start_ns,
timestamp_end_ns,
duration_ns,
processed_count,
deadline_ns,
deadline_missed,
error_kind,
}
}
#[inline]
pub const fn graph_id(&self) -> &GraphInstanceId {
&self.graph_id
}
#[inline]
pub const fn node_index(&self) -> &NodeIndex {
&self.node_index
}
#[inline]
pub const fn node_name(&self) -> &Option<&'static str> {
&self.node_name
}
#[inline]
pub const fn timestamp_start_ns(&self) -> &u64 {
&self.timestamp_start_ns
}
#[inline]
pub const fn timestamp_end_ns(&self) -> &u64 {
&self.timestamp_end_ns
}
#[inline]
pub const fn duration_ns(&self) -> &u64 {
&self.duration_ns
}
#[inline]
pub const fn processed_count(&self) -> &u64 {
&self.processed_count
}
#[inline]
pub const fn deadline_ns(&self) -> &Option<u64> {
&self.deadline_ns
}
#[inline]
pub const fn deadline_missed(&self) -> &bool {
&self.deadline_missed
}
#[inline]
pub const fn error_kind(&self) -> &Option<NodeStepError> {
&self.error_kind
}
}
#[non_exhaustive]
#[derive(Copy, Clone, Debug)]
pub struct EdgeSnapshotTelemetry {
graph_id: GraphInstanceId,
edge_index: EdgeIndex,
source_node_index: NodeIndex,
target_node_index: NodeIndex,
timestamp_ns: u64,
current_occupancy: u32,
soft_watermark: u32,
hard_watermark: u32,
watermark_state: WatermarkState,
}
impl EdgeSnapshotTelemetry {
#[inline]
#[allow(clippy::too_many_arguments)]
pub const fn new(
graph_id: GraphInstanceId,
edge_index: EdgeIndex,
source_node_index: NodeIndex,
target_node_index: NodeIndex,
timestamp_ns: u64,
current_occupancy: u32,
soft_watermark: u32,
hard_watermark: u32,
watermark_state: WatermarkState,
) -> Self {
Self {
graph_id,
edge_index,
source_node_index,
target_node_index,
timestamp_ns,
current_occupancy,
soft_watermark,
hard_watermark,
watermark_state,
}
}
#[inline]
pub const fn graph_id(&self) -> &GraphInstanceId {
&self.graph_id
}
#[inline]
pub const fn edge_index(&self) -> &EdgeIndex {
&self.edge_index
}
#[inline]
pub const fn source_node_index(&self) -> &NodeIndex {
&self.source_node_index
}
#[inline]
pub const fn target_node_index(&self) -> &NodeIndex {
&self.target_node_index
}
#[inline]
pub const fn timestamp_ns(&self) -> &u64 {
&self.timestamp_ns
}
#[inline]
pub const fn current_occupancy(&self) -> &u32 {
&self.current_occupancy
}
#[inline]
pub const fn soft_watermark(&self) -> &u32 {
&self.soft_watermark
}
#[inline]
pub const fn hard_watermark(&self) -> &u32 {
&self.hard_watermark
}
#[inline]
pub const fn watermark_state(&self) -> &WatermarkState {
&self.watermark_state
}
}
#[non_exhaustive]
#[derive(Copy, Clone, Debug)]
pub enum RuntimeTelemetryEventKind {
GraphStarted,
GraphStopped,
GraphPanicked,
SensorDisconnected,
SensorRecovered,
ModelLoadFailed,
ModelRecovered,
MqttDisconnected,
MqttRecovered,
DataGapDetected,
InvalidDataSeen,
}
#[non_exhaustive]
#[derive(Copy, Clone, Debug)]
pub struct RuntimeTelemetryEvent {
graph_id: GraphInstanceId,
timestamp_ns: u64,
event_kind: RuntimeTelemetryEventKind,
message: Option<EventMessage>,
}
impl RuntimeTelemetryEvent {
#[inline]
pub const fn new(
graph_id: GraphInstanceId,
timestamp_ns: u64,
event_kind: RuntimeTelemetryEventKind,
message: Option<EventMessage>,
) -> Self {
Self {
graph_id,
timestamp_ns,
event_kind,
message,
}
}
#[inline]
pub const fn graph_id(&self) -> &GraphInstanceId {
&self.graph_id
}
#[inline]
pub const fn timestamp_ns(&self) -> &u64 {
&self.timestamp_ns
}
#[inline]
pub const fn event_kind(&self) -> &RuntimeTelemetryEventKind {
&self.event_kind
}
#[inline]
pub const fn message(&self) -> &Option<EventMessage> {
&self.message
}
}
#[non_exhaustive]
#[derive(Copy, Clone, Debug)]
pub enum TelemetryEvent {
NodeStep(NodeStepTelemetry),
EdgeSnapshot(EdgeSnapshotTelemetry),
Runtime(RuntimeTelemetryEvent),
}
impl TelemetryEvent {
#[inline]
pub const fn node_step(ev: NodeStepTelemetry) -> Self {
TelemetryEvent::NodeStep(ev)
}
#[inline]
pub const fn edge_snapshot(ev: EdgeSnapshotTelemetry) -> Self {
TelemetryEvent::EdgeSnapshot(ev)
}
#[inline]
pub const fn runtime(ev: RuntimeTelemetryEvent) -> Self {
TelemetryEvent::Runtime(ev)
}
}
#[non_exhaustive]
#[derive(Debug, Clone, Copy)]
pub struct NodeMetrics {
processed: u64,
dropped: u64,
ingress: u64,
egress: u64,
lat_sum: u64,
lat_cnt: u64,
lat_max: u64,
deadline_miss_count: u64,
}
impl Default for NodeMetrics {
fn default() -> Self {
Self::new()
}
}
impl NodeMetrics {
pub const fn new() -> Self {
Self {
processed: 0,
dropped: 0,
ingress: 0,
egress: 0,
lat_sum: 0,
lat_cnt: 0,
lat_max: 0,
deadline_miss_count: 0,
}
}
#[inline]
pub const fn processed(&self) -> &u64 {
&self.processed
}
#[inline]
pub const fn dropped(&self) -> &u64 {
&self.dropped
}
#[inline]
pub const fn ingress(&self) -> &u64 {
&self.ingress
}
#[inline]
pub const fn egress(&self) -> &u64 {
&self.egress
}
#[inline]
pub const fn lat_sum(&self) -> &u64 {
&self.lat_sum
}
#[inline]
pub const fn lat_cnt(&self) -> &u64 {
&self.lat_cnt
}
#[inline]
pub const fn lat_max(&self) -> &u64 {
&self.lat_max
}
#[inline]
pub const fn deadline_miss_count(&self) -> &u64 {
&self.deadline_miss_count
}
#[inline]
pub fn inc_processed(&mut self, delta: u64) {
self.processed = self.processed.saturating_add(delta);
}
#[inline]
pub fn dec_processed(&mut self, delta: u64) {
self.processed = self.processed.saturating_sub(delta);
}
#[inline]
pub fn set_processed(&mut self, v: u64) {
self.processed = v;
}
#[inline]
pub fn inc_dropped(&mut self, delta: u64) {
self.dropped = self.dropped.saturating_add(delta);
}
#[inline]
pub fn dec_dropped(&mut self, delta: u64) {
self.dropped = self.dropped.saturating_sub(delta);
}
#[inline]
pub fn set_dropped(&mut self, v: u64) {
self.dropped = v;
}
#[inline]
pub fn inc_ingress(&mut self, delta: u64) {
self.ingress = self.ingress.saturating_add(delta);
}
#[inline]
pub fn dec_ingress(&mut self, delta: u64) {
self.ingress = self.ingress.saturating_sub(delta);
}
#[inline]
pub fn set_ingress(&mut self, v: u64) {
self.ingress = v;
}
#[inline]
pub fn inc_egress(&mut self, delta: u64) {
self.egress = self.egress.saturating_add(delta);
}
#[inline]
pub fn dec_egress(&mut self, delta: u64) {
self.egress = self.egress.saturating_sub(delta);
}
#[inline]
pub fn set_egress(&mut self, v: u64) {
self.egress = v;
}
#[inline]
pub fn record_latency_ns(&mut self, value_ns: u64) {
self.lat_sum = self.lat_sum.saturating_add(value_ns);
self.lat_cnt = self.lat_cnt.saturating_add(1);
if value_ns > self.lat_max {
self.lat_max = value_ns;
}
}
#[inline]
pub fn merge_from(&mut self, other: &Self) {
self.processed = self.processed.saturating_add(other.processed);
self.dropped = self.dropped.saturating_add(other.dropped);
self.ingress = self.ingress.saturating_add(other.ingress);
self.egress = self.egress.saturating_add(other.egress);
self.lat_sum = self.lat_sum.saturating_add(other.lat_sum);
self.lat_cnt = self.lat_cnt.saturating_add(other.lat_cnt);
if other.lat_max > self.lat_max {
self.lat_max = other.lat_max;
}
self.deadline_miss_count = self
.deadline_miss_count
.saturating_add(other.deadline_miss_count);
}
#[inline]
pub fn inc_deadline_miss_count(&mut self, delta: u64) {
self.deadline_miss_count = self.deadline_miss_count.saturating_add(delta);
}
#[inline]
pub fn reset(&mut self) {
*self = Self::new();
}
}
#[non_exhaustive]
#[derive(Debug, Clone, Copy)]
pub struct EdgeMetrics {
queue_depth: u32,
}
impl Default for EdgeMetrics {
fn default() -> Self {
Self::new()
}
}
impl EdgeMetrics {
pub const fn new() -> Self {
Self { queue_depth: 0 }
}
#[inline]
pub const fn queue_depth(&self) -> &u32 {
&self.queue_depth
}
#[inline]
pub fn set_queue_depth(&mut self, v: u32) {
self.queue_depth = v;
}
#[inline]
pub fn inc_queue_depth(&mut self, delta: u32) {
self.queue_depth = self.queue_depth.saturating_add(delta);
}
#[inline]
pub fn dec_queue_depth(&mut self, delta: u32) {
self.queue_depth = self.queue_depth.saturating_sub(delta);
}
#[inline]
pub fn merge_from(&mut self, other: &Self) {
self.queue_depth = other.queue_depth;
}
#[inline]
pub fn reset(&mut self) {
*self = Self::new();
}
}
#[derive(Debug, Clone, Copy)]
pub struct GraphMetrics<const MAX_NODES: usize, const MAX_EDGES: usize> {
id: u32,
nodes: [NodeMetrics; MAX_NODES],
edges: [EdgeMetrics; MAX_EDGES],
}
impl<const MAX_NODES: usize, const MAX_EDGES: usize> GraphMetrics<MAX_NODES, MAX_EDGES> {
pub const fn new(id: u32) -> Self {
Self {
id,
nodes: [NodeMetrics::new(); MAX_NODES],
edges: [EdgeMetrics::new(); MAX_EDGES],
}
}
pub fn id(&self) -> &u32 {
&self.id
}
pub fn nodes(&self) -> &[NodeMetrics; MAX_NODES] {
&self.nodes
}
pub fn edges(&self) -> &[EdgeMetrics; MAX_EDGES] {
&self.edges
}
}
impl<const MAX_NODES: usize, const MAX_EDGES: usize> GraphMetrics<MAX_NODES, MAX_EDGES> {
pub fn fmt<W: fmt::Write>(&self, w: &mut W) -> fmt::Result {
w.write_str("graph id: ")?;
write_u64(w, self.id as u64)?;
w.write_str("\n")?;
for i in 0..MAX_NODES {
let m = &self.nodes[i];
w.write_str(" node id: ")?;
write_u64(w, i as u64)?;
w.write_str(" processed=")?;
write_u64(w, m.processed)?;
w.write_str(" dropped=")?;
write_u64(w, m.dropped)?;
w.write_str(" ingress=")?;
write_u64(w, m.ingress)?;
w.write_str(" egress=")?;
write_u64(w, m.egress)?;
w.write_str(" lat_sum=")?;
write_u64(w, m.lat_sum)?;
w.write_str(" lat_cnt=")?;
write_u64(w, m.lat_cnt)?;
w.write_str(" lat_max=")?;
write_u64(w, m.lat_max)?;
w.write_str(" deadline_miss_count=")?;
write_u64(w, m.deadline_miss_count)?;
w.write_str("\n")?;
}
for i in 0..MAX_EDGES {
let m = &self.edges[i];
w.write_str(" edge id: ")?;
write_u64(w, i as u64)?;
w.write_str(" queue_depth=")?;
write_u64(w, m.queue_depth as u64)?;
w.write_str("\n")?;
}
Ok(())
}
}
#[derive(Debug, Default, Clone, Copy)]
pub struct NoopTelemetry;
impl Telemetry for NoopTelemetry {
const METRICS_ENABLED: bool = false;
const EVENTS_STATICALLY_ENABLED: bool = false;
#[inline]
fn incr_counter(&mut self, _key: TelemetryKey, _delta: u64) {}
#[inline]
fn set_gauge(&mut self, _key: TelemetryKey, _value: u64) {}
#[inline]
fn record_latency_ns(&mut self, _key: TelemetryKey, _value_ns: u64) {}
}
impl Telemetry for () {
const METRICS_ENABLED: bool = false;
const EVENTS_STATICALLY_ENABLED: bool = false;
#[inline]
fn incr_counter(&mut self, _key: TelemetryKey, _delta: u64) {}
#[inline]
fn set_gauge(&mut self, _key: TelemetryKey, _value: u64) {}
#[inline]
fn record_latency_ns(&mut self, _key: TelemetryKey, _value_ns: u64) {}
}