entrenar/monitor/inference/collector/
stream.rs1use super::super::path::DecisionPath;
4use super::super::trace::DecisionTrace;
5use super::traits::TraceCollector;
6use serde::{Deserialize, Serialize};
7use std::io::Write;
8
9#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
11pub enum StreamFormat {
12 Binary,
14 Json,
16 JsonLines,
18}
19
20pub struct StreamCollector<P: DecisionPath, W: Write + Send> {
41 writer: W,
42 format: StreamFormat,
43 buffer: Vec<DecisionTrace<P>>,
44 flush_threshold: usize,
45 count: usize,
46}
47
48impl<P: DecisionPath + Serialize, W: Write + Send + Sync> StreamCollector<P, W> {
49 pub fn new(writer: W, format: StreamFormat) -> Self {
51 Self { writer, format, buffer: Vec::with_capacity(100), flush_threshold: 100, count: 0 }
52 }
53
54 pub fn with_flush_threshold(mut self, threshold: usize) -> Self {
56 self.flush_threshold = threshold;
57 self
58 }
59
60 pub fn writer(&self) -> &W {
62 &self.writer
63 }
64
65 pub fn writer_mut(&mut self) -> &mut W {
67 &mut self.writer
68 }
69
70 fn write_trace(&mut self, trace: &DecisionTrace<P>) -> std::io::Result<()> {
72 match self.format {
73 StreamFormat::Binary => {
74 let bytes = trace.to_bytes();
75 self.writer.write_all(&(bytes.len() as u32).to_le_bytes())?;
77 self.writer.write_all(&bytes)?;
78 }
79 StreamFormat::Json => {
80 serde_json::to_writer(&mut self.writer, trace)?;
81 }
82 StreamFormat::JsonLines => {
83 serde_json::to_writer(&mut self.writer, trace)?;
84 self.writer.write_all(b"\n")?;
85 }
86 }
87 Ok(())
88 }
89}
90
91impl<P: DecisionPath + Serialize, W: Write + Send + Sync> TraceCollector<P>
92 for StreamCollector<P, W>
93{
94 fn record(&mut self, trace: DecisionTrace<P>) {
95 self.buffer.push(trace);
96 self.count += 1;
97
98 if self.buffer.len() >= self.flush_threshold {
99 let _ = self.flush();
100 }
101 }
102
103 fn flush(&mut self) -> std::io::Result<()> {
104 let traces: Vec<_> = self.buffer.drain(..).collect();
105 for trace in traces {
106 self.write_trace(&trace)?;
107 }
108 self.writer.flush()
109 }
110
111 fn len(&self) -> usize {
112 self.count
113 }
114}