limen_core/telemetry/
sink.rs1use super::*;
4
5#[non_exhaustive]
11#[derive(Copy, Clone, Debug)]
12pub enum TelemetrySinkError {
13 PushFailed,
15}
16
17pub trait TelemetrySink {
23 #[inline]
25 fn push_event(&mut self, _event: &TelemetryEvent) -> Result<(), TelemetrySinkError> {
26 Ok(())
27 }
28
29 #[inline]
34 fn push_metrics<const MAX_NODES: usize, const MAX_EDGES: usize>(
35 &mut self,
36 _graph: &GraphMetrics<MAX_NODES, MAX_EDGES>,
37 ) -> Result<(), TelemetrySinkError> {
38 Ok(())
39 }
40
41 #[inline]
46 fn flush(&mut self) -> Result<(), TelemetrySinkError> {
47 Ok(())
48 }
49}
50
51#[inline]
56fn wm_str(wm: WatermarkState) -> &'static str {
57 match wm {
58 WatermarkState::BelowSoft => "BelowSoft",
59 WatermarkState::BetweenSoftAndHard => "BetweenSoftAndHard",
60 WatermarkState::AtOrAboveHard => "AtOrAboveHard",
61 }
62}
63
64#[inline]
69pub fn write_u64<W: fmt::Write>(writer: &mut W, mut value: u64) -> fmt::Result {
70 if value == 0 {
71 return writer.write_str("0");
72 }
73
74 let mut buffer = [0u8; 20];
75 let mut write_index = buffer.len();
76
77 while value != 0 {
78 write_index -= 1;
79 let digit = (value % 10) as u8;
80 buffer[write_index] = b'0' + digit;
81 value /= 10;
82 }
83
84 let string_slice = core::str::from_utf8(&buffer[write_index..]).unwrap();
86 writer.write_str(string_slice)
87}
88
89pub fn fmt_event<W: fmt::Write>(w: &mut W, e: &TelemetryEvent) -> fmt::Result {
94 match e {
95 TelemetryEvent::Runtime(ev) => {
96 w.write_str("runtime id=")?;
97 write_u64(w, *ev.graph_id() as u64)?;
98 w.write_str(" ts=")?;
99 write_u64(w, *ev.timestamp_ns())?;
100 w.write_str(" kind=")?;
101 w.write_str(match ev.event_kind() {
102 RuntimeTelemetryEventKind::GraphStarted => "GraphStarted",
103 RuntimeTelemetryEventKind::GraphStopped => "GraphStopped",
104 RuntimeTelemetryEventKind::GraphPanicked => "GraphPanicked",
105 RuntimeTelemetryEventKind::SensorDisconnected => "SensorDisconnected",
106 RuntimeTelemetryEventKind::SensorRecovered => "SensorRecovered",
107 RuntimeTelemetryEventKind::ModelLoadFailed => "ModelLoadFailed",
108 RuntimeTelemetryEventKind::ModelRecovered => "ModelRecovered",
109 RuntimeTelemetryEventKind::MqttDisconnected => "MqttDisconnected",
110 RuntimeTelemetryEventKind::MqttRecovered => "MqttRecovered",
111 RuntimeTelemetryEventKind::DataGapDetected => "DataGapDetected",
112 RuntimeTelemetryEventKind::InvalidDataSeen => "InvalidDataSeen",
113 })?;
114 w.write_str(" msg=")?;
115 if let Some(msg) = ev.message() {
116 w.write_str(msg.as_str())?;
117 } else {
118 w.write_str("-")?;
119 }
120 w.write_str("\n")
121 }
122 TelemetryEvent::NodeStep(ev) => {
123 w.write_str("node-step gid=")?;
124 write_u64(w, *ev.graph_id() as u64)?;
125 w.write_str(" nin=")?;
126 write_u64(w, *ev.node_index().as_usize() as u64)?;
127 w.write_str(" ts_start=")?;
128 write_u64(w, *ev.timestamp_start_ns())?;
129 w.write_str(" ts_end=")?;
130 write_u64(w, *ev.timestamp_end_ns())?;
131 w.write_str(" dur=")?;
132 w.write_str(" msg_processed=")?;
133 write_u64(w, *ev.processed_count())?;
134 write_u64(w, *ev.duration_ns())?;
135 w.write_str(" dl=")?;
136 if let Some(d) = *ev.deadline_ns() {
137 write_u64(w, d)?;
138 } else {
139 w.write_str("-")?;
140 }
141 w.write_str(" miss=")?;
142 w.write_str(if *ev.deadline_missed() { "1" } else { "0" })?;
143 w.write_str(" err=")?;
144 if let Some(k) = ev.error_kind() {
145 w.write_str(match k {
146 NodeStepError::NoInput => "NoInput",
147 NodeStepError::Backpressured => "BackPressured",
148 NodeStepError::OverBudget => "OverBudget",
149 NodeStepError::ExternalUnavailable => "ExternalUnavailable",
150 NodeStepError::ExecutionFailed => "ExecutionFailed",
151 })?;
152 } else {
153 w.write_str("-")?;
154 }
155 w.write_str("\n")
156 }
157 TelemetryEvent::EdgeSnapshot(ev) => {
158 w.write_str("edge-snap gid=")?;
159 write_u64(w, *ev.graph_id() as u64)?;
160 w.write_str(" eid=")?;
161 write_u64(w, *ev.edge_index().as_usize() as u64)?;
162 w.write_str(" ts=")?;
163 write_u64(w, *ev.timestamp_ns())?;
164 w.write_str(" occ=")?;
165 write_u64(w, *ev.current_occupancy() as u64)?;
166 w.write_str(" wm=")?;
167 w.write_str(wm_str(*ev.watermark_state()))?;
168 w.write_str("\n")
169 }
170 }
171}
172
173pub struct FmtLineWriter<W: fmt::Write> {
180 inner: W,
182}
183
184impl<W: fmt::Write> FmtLineWriter<W> {
185 pub fn new(writer: W) -> Self {
187 Self { inner: writer }
188 }
189
190 #[inline]
192 pub fn inner(&self) -> &W {
193 &self.inner
194 }
195}
196
197impl<W: fmt::Write> TelemetrySink for FmtLineWriter<W> {
198 fn push_event(&mut self, e: &TelemetryEvent) -> Result<(), TelemetrySinkError> {
199 fmt_event(&mut self.inner, e).map_err(|_| TelemetrySinkError::PushFailed)
200 }
201
202 fn push_metrics<const MAX_NODES: usize, const MAX_EDGES: usize>(
203 &mut self,
204 graph: &GraphMetrics<MAX_NODES, MAX_EDGES>,
205 ) -> Result<(), TelemetrySinkError> {
206 graph
207 .fmt(&mut self.inner)
208 .map_err(|_| TelemetrySinkError::PushFailed)
209 }
210}
211
212impl<W: fmt::Write + Clone> Clone for FmtLineWriter<W> {
213 fn clone(&self) -> Self {
214 Self {
215 inner: self.inner.clone(),
216 }
217 }
218}
219
220#[derive(Clone, Copy)]
226pub struct FixedBuffer<const N: usize> {
227 buffer: [u8; N],
228 length: usize,
229}
230
231impl<const N: usize> Default for FixedBuffer<N> {
232 fn default() -> Self {
233 Self::new()
234 }
235}
236
237impl<const N: usize> FixedBuffer<N> {
238 pub const fn new() -> Self {
240 Self {
241 buffer: [0u8; N],
242 length: 0,
243 }
244 }
245
246 #[inline]
248 pub const fn capacity(&self) -> usize {
249 N
250 }
251
252 #[inline]
254 pub fn len(&self) -> &usize {
255 &self.length
256 }
257
258 #[inline]
260 pub const fn is_empty(&self) -> bool {
261 self.length == 0
262 }
263
264 #[inline]
266 pub fn as_bytes(&self) -> &[u8] {
267 &self.buffer[..self.length]
268 }
269
270 #[inline]
275 pub fn as_str(&self) -> &str {
276 core::str::from_utf8(self.as_bytes()).unwrap()
277 }
278
279 #[inline]
281 pub fn clear(&mut self) {
282 self.length = 0;
283 }
284}
285
286impl<const N: usize> fmt::Write for FixedBuffer<N> {
287 fn write_str(&mut self, s: &str) -> fmt::Result {
288 let bytes = s.as_bytes();
289 if self.length + bytes.len() > N {
290 return Err(fmt::Error);
291 }
292 let start = self.length;
293 let end = start + bytes.len();
294 self.buffer[start..end].copy_from_slice(bytes);
295 self.length = end;
296 Ok(())
297 }
298}
299
300pub fn fixed_buffer_line_writer<const N: usize>() -> FmtLineWriter<FixedBuffer<N>> {
304 FmtLineWriter::new(FixedBuffer::<N>::new())
305}
306
307#[cfg(feature = "std")]
310struct BufWriter<'a> {
311 data: &'a mut [u8],
312 len: usize,
313}
314
315#[cfg(feature = "std")]
316impl<'a> BufWriter<'a> {
317 fn new(storage: &'a mut [u8]) -> Self {
318 Self {
319 data: storage,
320 len: 0,
321 }
322 }
323
324 fn as_slice(&self) -> &[u8] {
325 &self.data[..self.len]
326 }
327}
328
329#[cfg(feature = "std")]
330impl<'a> fmt::Write for BufWriter<'a> {
331 fn write_str(&mut self, s: &str) -> fmt::Result {
332 let bytes = s.as_bytes();
333 if self.len + bytes.len() > self.data.len() {
334 return Err(fmt::Error);
335 }
336 self.data[self.len..self.len + bytes.len()].copy_from_slice(bytes);
337 self.len += bytes.len();
338 Ok(())
339 }
340}
341
342#[cfg(feature = "std")]
348pub struct IoLineWriter<W: std::io::Write> {
349 inner: W,
351}
352
353#[cfg(feature = "std")]
354impl<W: std::io::Write> IoLineWriter<W> {
355 pub fn new(writer: W) -> Self {
357 Self { inner: writer }
358 }
359
360 pub fn stdout_writer() -> IoLineWriter<std::io::Stdout> {
362 IoLineWriter::new(std::io::stdout())
363 }
364
365 pub fn file_writer(path: &str) -> std::io::Result<IoLineWriter<std::fs::File>> {
367 let file = std::fs::File::create(path)?;
368 Ok(IoLineWriter::new(file))
369 }
370}
371
372#[cfg(feature = "std")]
373impl<W: std::io::Write> TelemetrySink for IoLineWriter<W> {
374 fn push_event(&mut self, event: &TelemetryEvent) -> Result<(), TelemetrySinkError> {
375 let mut buffer = [0u8; 256];
376 let mut writer = BufWriter::new(&mut buffer);
377
378 if fmt_event(&mut writer, event).is_err() {
379 let mut heap_buffer = String::new();
380 fmt_event(&mut heap_buffer, event).map_err(|_| TelemetrySinkError::PushFailed)?;
381 self.inner
382 .write_all(heap_buffer.as_bytes())
383 .map_err(|_| TelemetrySinkError::PushFailed)
384 } else {
385 self.inner
386 .write_all(writer.as_slice())
387 .map_err(|_| TelemetrySinkError::PushFailed)
388 }
389 }
390
391 fn push_metrics<const MAX_NODES: usize, const MAX_EDGES: usize>(
392 &mut self,
393 graph: &GraphMetrics<MAX_NODES, MAX_EDGES>,
394 ) -> Result<(), TelemetrySinkError> {
395 let mut buffer = [0u8; 4096];
396 let mut writer = BufWriter::new(&mut buffer);
397
398 if graph.fmt(&mut writer).is_err() {
399 let mut heap_buffer = String::new();
400 graph
401 .fmt(&mut heap_buffer)
402 .map_err(|_| TelemetrySinkError::PushFailed)?;
403 self.inner
404 .write_all(heap_buffer.as_bytes())
405 .map_err(|_| TelemetrySinkError::PushFailed)
406 } else {
407 self.inner
408 .write_all(writer.as_slice())
409 .map_err(|_| TelemetrySinkError::PushFailed)
410 }
411 }
412
413 fn flush(&mut self) -> Result<(), TelemetrySinkError> {
414 self.inner
415 .flush()
416 .map_err(|_| TelemetrySinkError::PushFailed)
417 }
418}
419
420#[cfg(feature = "std")]
421impl<W: std::io::Write + Clone> Clone for IoLineWriter<W> {
422 fn clone(&self) -> Self {
423 Self {
424 inner: self.inner.clone(),
425 }
426 }
427}
428
429impl TelemetrySink for () {}