Skip to main content

limen_core/telemetry/
sink.rs

1//! Telemetry sink trait and implementations.
2
3use super::*;
4
5/// Error type returned by event writers when writing fails.
6///
7/// Implementations use this to signal input output failures or buffer
8/// exhaustion without tying the core telemetry code to any particular
9/// error type.
10#[non_exhaustive]
11#[derive(Copy, Clone, Debug)]
12pub enum TelemetrySinkError {
13    /// The event could not be pushed to the underlying sink.
14    PushFailed,
15}
16
17/// Write-only sink for structured telemetry events.
18///
19/// Event writers are responsible for transporting events to logs, sockets,
20/// buffers, or any other destination. They are deliberately minimal and can
21/// be implemented in both no_std and std environments.
22pub trait TelemetrySink {
23    /// Write a single telemetry event to the underlying sink.
24    #[inline]
25    fn push_event(&mut self, _event: &TelemetryEvent) -> Result<(), TelemetrySinkError> {
26        Ok(())
27    }
28
29    /// Write a snapshot of the current graph metrics.
30    ///
31    /// Implementations that are only interested in events can ignore this
32    /// by keeping the default no-op implementation.
33    #[inline]
34    fn push_metrics<const MAX_NODES: usize, const MAX_EDGES: usize>(
35        &mut self,
36        _graph: &GraphMetrics<MAX_NODES, MAX_EDGES>,
37    ) -> Result<(), TelemetrySinkError> {
38        Ok(())
39    }
40
41    /// Flush any buffered events to the underlying sink.
42    ///
43    /// The default implementation is a no operation; implementations that
44    /// buffer data should override this.
45    #[inline]
46    fn flush(&mut self) -> Result<(), TelemetrySinkError> {
47        Ok(())
48    }
49}
50
51/// Convert a watermark state into a stable string representation.
52///
53/// This helper is used by line based writers to render human readable
54/// representations of watermark transitions.
55#[inline]
56fn wm_str(wm: WatermarkState) -> &'static str {
57    match wm {
58        WatermarkState::BelowSoft => "BelowSoft",
59        WatermarkState::BetweenSoftAndHard => "BetweenSoftAndHard",
60        WatermarkState::AtOrAboveHard => "AtOrAboveHard",
61    }
62}
63
64/// Write an unsigned integer in base ten to a `fmt::Write` target.
65///
66/// This helper is a small, allocation free formatter used by line based
67/// writers in both no_std and std configurations.
68#[inline]
69pub fn write_u64<W: fmt::Write>(writer: &mut W, mut value: u64) -> fmt::Result {
70    if value == 0 {
71        return writer.write_str("0");
72    }
73
74    let mut buffer = [0u8; 20];
75    let mut write_index = buffer.len();
76
77    while value != 0 {
78        write_index -= 1;
79        let digit = (value % 10) as u8;
80        buffer[write_index] = b'0' + digit;
81        value /= 10;
82    }
83
84    // This cannot fail because we only wrote ASCII digits.
85    let string_slice = core::str::from_utf8(&buffer[write_index..]).unwrap();
86    writer.write_str(string_slice)
87}
88
89/// Format a telemetry event as a single line of text.
90///
91/// The exact format is stable and intended for machine parsing as well as
92/// human reading. It does not allocate and only uses `fmt::Write`.
93pub fn fmt_event<W: fmt::Write>(w: &mut W, e: &TelemetryEvent) -> fmt::Result {
94    match e {
95        TelemetryEvent::Runtime(ev) => {
96            w.write_str("runtime id=")?;
97            write_u64(w, *ev.graph_id() as u64)?;
98            w.write_str(" ts=")?;
99            write_u64(w, *ev.timestamp_ns())?;
100            w.write_str(" kind=")?;
101            w.write_str(match ev.event_kind() {
102                RuntimeTelemetryEventKind::GraphStarted => "GraphStarted",
103                RuntimeTelemetryEventKind::GraphStopped => "GraphStopped",
104                RuntimeTelemetryEventKind::GraphPanicked => "GraphPanicked",
105                RuntimeTelemetryEventKind::SensorDisconnected => "SensorDisconnected",
106                RuntimeTelemetryEventKind::SensorRecovered => "SensorRecovered",
107                RuntimeTelemetryEventKind::ModelLoadFailed => "ModelLoadFailed",
108                RuntimeTelemetryEventKind::ModelRecovered => "ModelRecovered",
109                RuntimeTelemetryEventKind::MqttDisconnected => "MqttDisconnected",
110                RuntimeTelemetryEventKind::MqttRecovered => "MqttRecovered",
111                RuntimeTelemetryEventKind::DataGapDetected => "DataGapDetected",
112                RuntimeTelemetryEventKind::InvalidDataSeen => "InvalidDataSeen",
113            })?;
114            w.write_str(" msg=")?;
115            if let Some(msg) = ev.message() {
116                w.write_str(msg.as_str())?;
117            } else {
118                w.write_str("-")?;
119            }
120            w.write_str("\n")
121        }
122        TelemetryEvent::NodeStep(ev) => {
123            w.write_str("node-step gid=")?;
124            write_u64(w, *ev.graph_id() as u64)?;
125            w.write_str(" nin=")?;
126            write_u64(w, *ev.node_index().as_usize() as u64)?;
127            w.write_str(" ts_start=")?;
128            write_u64(w, *ev.timestamp_start_ns())?;
129            w.write_str(" ts_end=")?;
130            write_u64(w, *ev.timestamp_end_ns())?;
131            w.write_str(" dur=")?;
132            w.write_str(" msg_processed=")?;
133            write_u64(w, *ev.processed_count())?;
134            write_u64(w, *ev.duration_ns())?;
135            w.write_str(" dl=")?;
136            if let Some(d) = *ev.deadline_ns() {
137                write_u64(w, d)?;
138            } else {
139                w.write_str("-")?;
140            }
141            w.write_str(" miss=")?;
142            w.write_str(if *ev.deadline_missed() { "1" } else { "0" })?;
143            w.write_str(" err=")?;
144            if let Some(k) = ev.error_kind() {
145                w.write_str(match k {
146                    NodeStepError::NoInput => "NoInput",
147                    NodeStepError::Backpressured => "BackPressured",
148                    NodeStepError::OverBudget => "OverBudget",
149                    NodeStepError::ExternalUnavailable => "ExternalUnavailable",
150                    NodeStepError::ExecutionFailed => "ExecutionFailed",
151                })?;
152            } else {
153                w.write_str("-")?;
154            }
155            w.write_str("\n")
156        }
157        TelemetryEvent::EdgeSnapshot(ev) => {
158            w.write_str("edge-snap gid=")?;
159            write_u64(w, *ev.graph_id() as u64)?;
160            w.write_str(" eid=")?;
161            write_u64(w, *ev.edge_index().as_usize() as u64)?;
162            w.write_str(" ts=")?;
163            write_u64(w, *ev.timestamp_ns())?;
164            w.write_str(" occ=")?;
165            write_u64(w, *ev.current_occupancy() as u64)?;
166            w.write_str(" wm=")?;
167            w.write_str(wm_str(*ev.watermark_state()))?;
168            w.write_str("\n")
169        }
170    }
171}
172
173// ---- no_std sink ----
174
175/// Event writer that formats events as lines of text using `core::fmt::Write`.
176///
177/// This is a no_std friendly sink that can write to any type that implements
178/// `fmt::Write`, such as a ring buffer or fixed size string.
179pub struct FmtLineWriter<W: fmt::Write> {
180    /// Inner writer that receives formatted event lines.
181    inner: W,
182}
183
184impl<W: fmt::Write> FmtLineWriter<W> {
185    /// Create a new line based event writer around the given writer.
186    pub fn new(writer: W) -> Self {
187        Self { inner: writer }
188    }
189
190    /// Access the inner `fmt::Write` target.
191    #[inline]
192    pub fn inner(&self) -> &W {
193        &self.inner
194    }
195}
196
197impl<W: fmt::Write> TelemetrySink for FmtLineWriter<W> {
198    fn push_event(&mut self, e: &TelemetryEvent) -> Result<(), TelemetrySinkError> {
199        fmt_event(&mut self.inner, e).map_err(|_| TelemetrySinkError::PushFailed)
200    }
201
202    fn push_metrics<const MAX_NODES: usize, const MAX_EDGES: usize>(
203        &mut self,
204        graph: &GraphMetrics<MAX_NODES, MAX_EDGES>,
205    ) -> Result<(), TelemetrySinkError> {
206        graph
207            .fmt(&mut self.inner)
208            .map_err(|_| TelemetrySinkError::PushFailed)
209    }
210}
211
212impl<W: fmt::Write + Clone> Clone for FmtLineWriter<W> {
213    fn clone(&self) -> Self {
214        Self {
215            inner: self.inner.clone(),
216        }
217    }
218}
219
220/// Fixed-size, owned buffer implementing `core::fmt::Write`.
221///
222/// - Completely no_std and allocation-free.
223/// - Capacity is a const generic `N`.
224/// - On overflow, `write_str` returns `Err(fmt::Error)` (no partial writes).
225#[derive(Clone, Copy)]
226pub struct FixedBuffer<const N: usize> {
227    buffer: [u8; N],
228    length: usize,
229}
230
231impl<const N: usize> Default for FixedBuffer<N> {
232    fn default() -> Self {
233        Self::new()
234    }
235}
236
237impl<const N: usize> FixedBuffer<N> {
238    /// Create a new empty buffer.
239    pub const fn new() -> Self {
240        Self {
241            buffer: [0u8; N],
242            length: 0,
243        }
244    }
245
246    /// Maximum capacity in bytes.
247    #[inline]
248    pub const fn capacity(&self) -> usize {
249        N
250    }
251
252    /// Number of bytes currently written.
253    #[inline]
254    pub fn len(&self) -> &usize {
255        &self.length
256    }
257
258    /// Returns true if the buffer is empty.
259    #[inline]
260    pub const fn is_empty(&self) -> bool {
261        self.length == 0
262    }
263
264    /// Access the written portion as bytes.
265    #[inline]
266    pub fn as_bytes(&self) -> &[u8] {
267        &self.buffer[..self.length]
268    }
269
270    /// Access the written portion as `&str`.
271    ///
272    /// Panics if the contents are not valid UTF-8, which is fine for telemetry
273    /// because we only ever write ASCII.
274    #[inline]
275    pub fn as_str(&self) -> &str {
276        core::str::from_utf8(self.as_bytes()).unwrap()
277    }
278
279    /// Clear the buffer without reallocating.
280    #[inline]
281    pub fn clear(&mut self) {
282        self.length = 0;
283    }
284}
285
286impl<const N: usize> fmt::Write for FixedBuffer<N> {
287    fn write_str(&mut self, s: &str) -> fmt::Result {
288        let bytes = s.as_bytes();
289        if self.length + bytes.len() > N {
290            return Err(fmt::Error);
291        }
292        let start = self.length;
293        let end = start + bytes.len();
294        self.buffer[start..end].copy_from_slice(bytes);
295        self.length = end;
296        Ok(())
297    }
298}
299
300/// Convenience constructor for a line writer over a fixed owned buffer.
301///
302/// This is pure no_std: no heap, no std::io.
303pub fn fixed_buffer_line_writer<const N: usize>() -> FmtLineWriter<FixedBuffer<N>> {
304    FmtLineWriter::new(FixedBuffer::<N>::new())
305}
306
307// ---- std sink ----
308
309#[cfg(feature = "std")]
310struct BufWriter<'a> {
311    data: &'a mut [u8],
312    len: usize,
313}
314
315#[cfg(feature = "std")]
316impl<'a> BufWriter<'a> {
317    fn new(storage: &'a mut [u8]) -> Self {
318        Self {
319            data: storage,
320            len: 0,
321        }
322    }
323
324    fn as_slice(&self) -> &[u8] {
325        &self.data[..self.len]
326    }
327}
328
329#[cfg(feature = "std")]
330impl<'a> fmt::Write for BufWriter<'a> {
331    fn write_str(&mut self, s: &str) -> fmt::Result {
332        let bytes = s.as_bytes();
333        if self.len + bytes.len() > self.data.len() {
334            return Err(fmt::Error);
335        }
336        self.data[self.len..self.len + bytes.len()].copy_from_slice(bytes);
337        self.len += bytes.len();
338        Ok(())
339    }
340}
341
342/// Event writer that formats events as lines and writes them to a `std::io::Write`
343/// target.
344///
345/// This is a std only sink suitable for writing to files, sockets, or standard
346/// output without any heap allocation in the formatting path.
347#[cfg(feature = "std")]
348pub struct IoLineWriter<W: std::io::Write> {
349    /// Inner writer that receives encoded event bytes.
350    inner: W,
351}
352
353#[cfg(feature = "std")]
354impl<W: std::io::Write> IoLineWriter<W> {
355    /// Create a new input output backed event writer around the given writer.
356    pub fn new(writer: W) -> Self {
357        Self { inner: writer }
358    }
359
360    /// Create an `IoLineWriter` that writes telemetry lines to `stdout`.
361    pub fn stdout_writer() -> IoLineWriter<std::io::Stdout> {
362        IoLineWriter::new(std::io::stdout())
363    }
364
365    /// Create an `IoLineWriter` that writes telemetry lines to a file at `path`.
366    pub fn file_writer(path: &str) -> std::io::Result<IoLineWriter<std::fs::File>> {
367        let file = std::fs::File::create(path)?;
368        Ok(IoLineWriter::new(file))
369    }
370}
371
372#[cfg(feature = "std")]
373impl<W: std::io::Write> TelemetrySink for IoLineWriter<W> {
374    fn push_event(&mut self, event: &TelemetryEvent) -> Result<(), TelemetrySinkError> {
375        let mut buffer = [0u8; 256];
376        let mut writer = BufWriter::new(&mut buffer);
377
378        if fmt_event(&mut writer, event).is_err() {
379            let mut heap_buffer = String::new();
380            fmt_event(&mut heap_buffer, event).map_err(|_| TelemetrySinkError::PushFailed)?;
381            self.inner
382                .write_all(heap_buffer.as_bytes())
383                .map_err(|_| TelemetrySinkError::PushFailed)
384        } else {
385            self.inner
386                .write_all(writer.as_slice())
387                .map_err(|_| TelemetrySinkError::PushFailed)
388        }
389    }
390
391    fn push_metrics<const MAX_NODES: usize, const MAX_EDGES: usize>(
392        &mut self,
393        graph: &GraphMetrics<MAX_NODES, MAX_EDGES>,
394    ) -> Result<(), TelemetrySinkError> {
395        let mut buffer = [0u8; 4096];
396        let mut writer = BufWriter::new(&mut buffer);
397
398        if graph.fmt(&mut writer).is_err() {
399            let mut heap_buffer = String::new();
400            graph
401                .fmt(&mut heap_buffer)
402                .map_err(|_| TelemetrySinkError::PushFailed)?;
403            self.inner
404                .write_all(heap_buffer.as_bytes())
405                .map_err(|_| TelemetrySinkError::PushFailed)
406        } else {
407            self.inner
408                .write_all(writer.as_slice())
409                .map_err(|_| TelemetrySinkError::PushFailed)
410        }
411    }
412
413    fn flush(&mut self) -> Result<(), TelemetrySinkError> {
414        self.inner
415            .flush()
416            .map_err(|_| TelemetrySinkError::PushFailed)
417    }
418}
419
420#[cfg(feature = "std")]
421impl<W: std::io::Write + Clone> Clone for IoLineWriter<W> {
422    fn clone(&self) -> Self {
423        Self {
424            inner: self.inner.clone(),
425        }
426    }
427}
428
429impl TelemetrySink for () {}