use serde::Serialize;
use std::io::Write;
use tokio::sync::mpsc;
pub struct NdjsonWriter {
tx: mpsc::UnboundedSender<Vec<u8>>,
}
impl NdjsonWriter {
pub fn new<W: Write + Send + 'static>(writer: W) -> Self {
let (tx, rx) = mpsc::unbounded_channel();
tokio::spawn(Self::writer_task(writer, rx));
Self { tx }
}
#[inline]
pub fn write_record<T: Serialize>(&self, msg: &T) {
match serde_json::to_vec(msg) {
Ok(mut buf) => {
buf.push(b'\n');
if self.tx.send(buf).is_err() {
tracing::warn!("ndjson writer channel closed, record dropped");
}
}
Err(e) => {
tracing::error!("ndjson serialization error: {e}");
metrics::counter!("openpx.ndjson.write_errors").increment(1);
}
}
}
async fn writer_task<W: Write + Send + 'static>(
inner: W,
mut rx: mpsc::UnboundedReceiver<Vec<u8>>,
) {
use std::io::BufWriter;
let mut writer = BufWriter::new(inner);
let mut flush_interval = tokio::time::interval(std::time::Duration::from_millis(250));
loop {
tokio::select! {
msg = rx.recv() => {
match msg {
Some(buf) => {
if let Err(e) = writer.write_all(&buf) {
tracing::error!("ndjson write error: {e}");
metrics::counter!("openpx.ndjson.write_errors").increment(1);
}
}
None => {
let _ = writer.flush();
return;
}
}
}
_ = flush_interval.tick() => {
if let Err(e) = writer.flush() {
tracing::error!("ndjson flush error: {e}");
metrics::counter!("openpx.ndjson.write_errors").increment(1);
}
}
}
}
}
}