use super::*;
#[non_exhaustive]
#[derive(Copy, Clone, Debug)]
pub enum TelemetrySinkError {
PushFailed,
}
pub trait TelemetrySink {
#[inline]
fn push_event(&mut self, _event: &TelemetryEvent) -> Result<(), TelemetrySinkError> {
Ok(())
}
#[inline]
fn push_metrics<const MAX_NODES: usize, const MAX_EDGES: usize>(
&mut self,
_graph: &GraphMetrics<MAX_NODES, MAX_EDGES>,
) -> Result<(), TelemetrySinkError> {
Ok(())
}
#[inline]
fn flush(&mut self) -> Result<(), TelemetrySinkError> {
Ok(())
}
}
#[inline]
fn wm_str(wm: WatermarkState) -> &'static str {
match wm {
WatermarkState::BelowSoft => "BelowSoft",
WatermarkState::BetweenSoftAndHard => "BetweenSoftAndHard",
WatermarkState::AtOrAboveHard => "AtOrAboveHard",
}
}
#[inline]
pub fn write_u64<W: fmt::Write>(writer: &mut W, mut value: u64) -> fmt::Result {
if value == 0 {
return writer.write_str("0");
}
let mut buffer = [0u8; 20];
let mut write_index = buffer.len();
while value != 0 {
write_index -= 1;
let digit = (value % 10) as u8;
buffer[write_index] = b'0' + digit;
value /= 10;
}
let string_slice = core::str::from_utf8(&buffer[write_index..]).unwrap();
writer.write_str(string_slice)
}
pub fn fmt_event<W: fmt::Write>(w: &mut W, e: &TelemetryEvent) -> fmt::Result {
match e {
TelemetryEvent::Runtime(ev) => {
w.write_str("runtime id=")?;
write_u64(w, *ev.graph_id() as u64)?;
w.write_str(" ts=")?;
write_u64(w, *ev.timestamp_ns())?;
w.write_str(" kind=")?;
w.write_str(match ev.event_kind() {
RuntimeTelemetryEventKind::GraphStarted => "GraphStarted",
RuntimeTelemetryEventKind::GraphStopped => "GraphStopped",
RuntimeTelemetryEventKind::GraphPanicked => "GraphPanicked",
RuntimeTelemetryEventKind::SensorDisconnected => "SensorDisconnected",
RuntimeTelemetryEventKind::SensorRecovered => "SensorRecovered",
RuntimeTelemetryEventKind::ModelLoadFailed => "ModelLoadFailed",
RuntimeTelemetryEventKind::ModelRecovered => "ModelRecovered",
RuntimeTelemetryEventKind::MqttDisconnected => "MqttDisconnected",
RuntimeTelemetryEventKind::MqttRecovered => "MqttRecovered",
RuntimeTelemetryEventKind::DataGapDetected => "DataGapDetected",
RuntimeTelemetryEventKind::InvalidDataSeen => "InvalidDataSeen",
})?;
w.write_str(" msg=")?;
if let Some(msg) = ev.message() {
w.write_str(msg.as_str())?;
} else {
w.write_str("-")?;
}
w.write_str("\n")
}
TelemetryEvent::NodeStep(ev) => {
w.write_str("node-step gid=")?;
write_u64(w, *ev.graph_id() as u64)?;
w.write_str(" nin=")?;
write_u64(w, *ev.node_index().as_usize() as u64)?;
w.write_str(" ts_start=")?;
write_u64(w, *ev.timestamp_start_ns())?;
w.write_str(" ts_end=")?;
write_u64(w, *ev.timestamp_end_ns())?;
w.write_str(" dur=")?;
w.write_str(" msg_processed=")?;
write_u64(w, *ev.processed_count())?;
write_u64(w, *ev.duration_ns())?;
w.write_str(" dl=")?;
if let Some(d) = *ev.deadline_ns() {
write_u64(w, d)?;
} else {
w.write_str("-")?;
}
w.write_str(" miss=")?;
w.write_str(if *ev.deadline_missed() { "1" } else { "0" })?;
w.write_str(" err=")?;
if let Some(k) = ev.error_kind() {
w.write_str(match k {
NodeStepError::NoInput => "NoInput",
NodeStepError::Backpressured => "BackPressured",
NodeStepError::OverBudget => "OverBudget",
NodeStepError::ExternalUnavailable => "ExternalUnavailable",
NodeStepError::ExecutionFailed => "ExecutionFailed",
})?;
} else {
w.write_str("-")?;
}
w.write_str("\n")
}
TelemetryEvent::EdgeSnapshot(ev) => {
w.write_str("edge-snap gid=")?;
write_u64(w, *ev.graph_id() as u64)?;
w.write_str(" eid=")?;
write_u64(w, *ev.edge_index().as_usize() as u64)?;
w.write_str(" ts=")?;
write_u64(w, *ev.timestamp_ns())?;
w.write_str(" occ=")?;
write_u64(w, *ev.current_occupancy() as u64)?;
w.write_str(" wm=")?;
w.write_str(wm_str(*ev.watermark_state()))?;
w.write_str("\n")
}
}
}
pub struct FmtLineWriter<W: fmt::Write> {
inner: W,
}
impl<W: fmt::Write> FmtLineWriter<W> {
pub fn new(writer: W) -> Self {
Self { inner: writer }
}
#[inline]
pub fn inner(&self) -> &W {
&self.inner
}
}
impl<W: fmt::Write> TelemetrySink for FmtLineWriter<W> {
fn push_event(&mut self, e: &TelemetryEvent) -> Result<(), TelemetrySinkError> {
fmt_event(&mut self.inner, e).map_err(|_| TelemetrySinkError::PushFailed)
}
fn push_metrics<const MAX_NODES: usize, const MAX_EDGES: usize>(
&mut self,
graph: &GraphMetrics<MAX_NODES, MAX_EDGES>,
) -> Result<(), TelemetrySinkError> {
graph
.fmt(&mut self.inner)
.map_err(|_| TelemetrySinkError::PushFailed)
}
}
impl<W: fmt::Write + Clone> Clone for FmtLineWriter<W> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}
#[derive(Clone, Copy)]
pub struct FixedBuffer<const N: usize> {
buffer: [u8; N],
length: usize,
}
impl<const N: usize> Default for FixedBuffer<N> {
fn default() -> Self {
Self::new()
}
}
impl<const N: usize> FixedBuffer<N> {
pub const fn new() -> Self {
Self {
buffer: [0u8; N],
length: 0,
}
}
#[inline]
pub const fn capacity(&self) -> usize {
N
}
#[inline]
pub fn len(&self) -> &usize {
&self.length
}
#[inline]
pub const fn is_empty(&self) -> bool {
self.length == 0
}
#[inline]
pub fn as_bytes(&self) -> &[u8] {
&self.buffer[..self.length]
}
#[inline]
pub fn as_str(&self) -> &str {
core::str::from_utf8(self.as_bytes()).unwrap()
}
#[inline]
pub fn clear(&mut self) {
self.length = 0;
}
}
impl<const N: usize> fmt::Write for FixedBuffer<N> {
fn write_str(&mut self, s: &str) -> fmt::Result {
let bytes = s.as_bytes();
if self.length + bytes.len() > N {
return Err(fmt::Error);
}
let start = self.length;
let end = start + bytes.len();
self.buffer[start..end].copy_from_slice(bytes);
self.length = end;
Ok(())
}
}
pub fn fixed_buffer_line_writer<const N: usize>() -> FmtLineWriter<FixedBuffer<N>> {
FmtLineWriter::new(FixedBuffer::<N>::new())
}
#[cfg(feature = "std")]
struct BufWriter<'a> {
data: &'a mut [u8],
len: usize,
}
#[cfg(feature = "std")]
impl<'a> BufWriter<'a> {
fn new(storage: &'a mut [u8]) -> Self {
Self {
data: storage,
len: 0,
}
}
fn as_slice(&self) -> &[u8] {
&self.data[..self.len]
}
}
#[cfg(feature = "std")]
impl<'a> fmt::Write for BufWriter<'a> {
fn write_str(&mut self, s: &str) -> fmt::Result {
let bytes = s.as_bytes();
if self.len + bytes.len() > self.data.len() {
return Err(fmt::Error);
}
self.data[self.len..self.len + bytes.len()].copy_from_slice(bytes);
self.len += bytes.len();
Ok(())
}
}
#[cfg(feature = "std")]
pub struct IoLineWriter<W: std::io::Write> {
inner: W,
}
#[cfg(feature = "std")]
impl<W: std::io::Write> IoLineWriter<W> {
pub fn new(writer: W) -> Self {
Self { inner: writer }
}
pub fn stdout_writer() -> IoLineWriter<std::io::Stdout> {
IoLineWriter::new(std::io::stdout())
}
pub fn file_writer(path: &str) -> std::io::Result<IoLineWriter<std::fs::File>> {
let file = std::fs::File::create(path)?;
Ok(IoLineWriter::new(file))
}
}
#[cfg(feature = "std")]
impl<W: std::io::Write> TelemetrySink for IoLineWriter<W> {
fn push_event(&mut self, event: &TelemetryEvent) -> Result<(), TelemetrySinkError> {
let mut buffer = [0u8; 256];
let mut writer = BufWriter::new(&mut buffer);
if fmt_event(&mut writer, event).is_err() {
let mut heap_buffer = String::new();
fmt_event(&mut heap_buffer, event).map_err(|_| TelemetrySinkError::PushFailed)?;
self.inner
.write_all(heap_buffer.as_bytes())
.map_err(|_| TelemetrySinkError::PushFailed)
} else {
self.inner
.write_all(writer.as_slice())
.map_err(|_| TelemetrySinkError::PushFailed)
}
}
fn push_metrics<const MAX_NODES: usize, const MAX_EDGES: usize>(
&mut self,
graph: &GraphMetrics<MAX_NODES, MAX_EDGES>,
) -> Result<(), TelemetrySinkError> {
let mut buffer = [0u8; 4096];
let mut writer = BufWriter::new(&mut buffer);
if graph.fmt(&mut writer).is_err() {
let mut heap_buffer = String::new();
graph
.fmt(&mut heap_buffer)
.map_err(|_| TelemetrySinkError::PushFailed)?;
self.inner
.write_all(heap_buffer.as_bytes())
.map_err(|_| TelemetrySinkError::PushFailed)
} else {
self.inner
.write_all(writer.as_slice())
.map_err(|_| TelemetrySinkError::PushFailed)
}
}
fn flush(&mut self) -> Result<(), TelemetrySinkError> {
self.inner
.flush()
.map_err(|_| TelemetrySinkError::PushFailed)
}
}
#[cfg(feature = "std")]
impl<W: std::io::Write + Clone> Clone for IoLineWriter<W> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}
impl TelemetrySink for () {}