Skip to main content

entrenar/monitor/inference/collector/
stream.rs

1//! StreamCollector - Write-through for persistent logging
2
3use super::super::path::DecisionPath;
4use super::super::trace::DecisionTrace;
5use super::traits::TraceCollector;
6use serde::{Deserialize, Serialize};
7use std::io::Write;
8
9/// Trace format for serialization
10#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
11pub enum StreamFormat {
12    /// Binary format (compact, fast)
13    Binary,
14    /// JSON format (human-readable)
15    Json,
16    /// JSON Lines (one JSON object per line)
17    JsonLines,
18}
19
20/// Stream collector for persistent logging
21///
22/// Target: <1µs per trace
23///
24/// # Features
25/// - Write-through to any `Write` impl
26/// - Supports binary and JSON formats
27/// - Buffered writes for efficiency
28///
29/// # Example
30///
31/// ```ignore
32/// use entrenar::monitor::inference::{StreamCollector, LinearPath, StreamFormat};
33/// use std::fs::File;
34///
35/// let file = File::create("traces.jsonl")?;
36/// let mut collector = StreamCollector::<LinearPath, _>::new(file, StreamFormat::JsonLines);
37/// collector.record(trace);
38/// collector.flush()?;
39/// ```
40pub struct StreamCollector<P: DecisionPath, W: Write + Send> {
41    writer: W,
42    format: StreamFormat,
43    buffer: Vec<DecisionTrace<P>>,
44    flush_threshold: usize,
45    count: usize,
46}
47
48impl<P: DecisionPath + Serialize, W: Write + Send + Sync> StreamCollector<P, W> {
49    /// Create a new stream collector
50    pub fn new(writer: W, format: StreamFormat) -> Self {
51        Self { writer, format, buffer: Vec::with_capacity(100), flush_threshold: 100, count: 0 }
52    }
53
54    /// Set the flush threshold (number of traces before auto-flush)
55    pub fn with_flush_threshold(mut self, threshold: usize) -> Self {
56        self.flush_threshold = threshold;
57        self
58    }
59
60    /// Get reference to the underlying writer
61    pub fn writer(&self) -> &W {
62        &self.writer
63    }
64
65    /// Get mutable reference to the underlying writer
66    pub fn writer_mut(&mut self) -> &mut W {
67        &mut self.writer
68    }
69
70    /// Write a single trace
71    fn write_trace(&mut self, trace: &DecisionTrace<P>) -> std::io::Result<()> {
72        match self.format {
73            StreamFormat::Binary => {
74                let bytes = trace.to_bytes();
75                // Write length prefix
76                self.writer.write_all(&(bytes.len() as u32).to_le_bytes())?;
77                self.writer.write_all(&bytes)?;
78            }
79            StreamFormat::Json => {
80                serde_json::to_writer(&mut self.writer, trace)?;
81            }
82            StreamFormat::JsonLines => {
83                serde_json::to_writer(&mut self.writer, trace)?;
84                self.writer.write_all(b"\n")?;
85            }
86        }
87        Ok(())
88    }
89}
90
91impl<P: DecisionPath + Serialize, W: Write + Send + Sync> TraceCollector<P>
92    for StreamCollector<P, W>
93{
94    fn record(&mut self, trace: DecisionTrace<P>) {
95        self.buffer.push(trace);
96        self.count += 1;
97
98        if self.buffer.len() >= self.flush_threshold {
99            let _ = self.flush();
100        }
101    }
102
103    fn flush(&mut self) -> std::io::Result<()> {
104        let traces: Vec<_> = self.buffer.drain(..).collect();
105        for trace in traces {
106            self.write_trace(&trace)?;
107        }
108        self.writer.flush()
109    }
110
111    fn len(&self) -> usize {
112        self.count
113    }
114}