px_core/websocket/ndjson.rs
1use serde::Serialize;
2use std::io::Write;
3use tokio::sync::mpsc;
4
5/// Channel-based async NDJSON writer that decouples the hot path from disk I/O.
6///
7/// Broadcast sites call `write_record()` which serializes to `Vec<u8>` and does
8/// a non-blocking `mpsc::send()` — no lock, no disk I/O on the hot path.
9/// A dedicated tokio task owns the `BufWriter` and flushes on a timer.
10pub struct NdjsonWriter {
11 tx: mpsc::UnboundedSender<Vec<u8>>,
12}
13
14impl NdjsonWriter {
15 /// Spawn a dedicated writer task that owns `writer` exclusively.
16 ///
17 /// Accepts `W: Write + Send + 'static` so callers can pass `File`,
18 /// `BufWriter<File>`, or a future rotation-aware writer without changing
19 /// any broadcast site code.
20 pub fn new<W: Write + Send + 'static>(writer: W) -> Self {
21 let (tx, rx) = mpsc::unbounded_channel();
22 tokio::spawn(Self::writer_task(writer, rx));
23 Self { tx }
24 }
25
26 /// Serialize `msg` to NDJSON and enqueue for writing. Non-blocking.
27 /// If the writer task has died (channel closed), the record is silently dropped.
28 #[inline]
29 pub fn write_record<T: Serialize>(&self, msg: &T) {
30 match serde_json::to_vec(msg) {
31 Ok(mut buf) => {
32 buf.push(b'\n');
33 // Non-blocking send — if channel is closed, record is dropped
34 if self.tx.send(buf).is_err() {
35 tracing::warn!("ndjson writer channel closed, record dropped");
36 }
37 }
38 Err(e) => {
39 tracing::error!("ndjson serialization error: {e}");
40 metrics::counter!("openpx.ndjson.write_errors").increment(1);
41 }
42 }
43 }
44
45 /// Background task: drains the channel, writes to BufWriter, flushes every 250ms.
46 async fn writer_task<W: Write + Send + 'static>(
47 inner: W,
48 mut rx: mpsc::UnboundedReceiver<Vec<u8>>,
49 ) {
50 use std::io::BufWriter;
51
52 let mut writer = BufWriter::new(inner);
53 let mut flush_interval = tokio::time::interval(std::time::Duration::from_millis(250));
54
55 loop {
56 tokio::select! {
57 msg = rx.recv() => {
58 match msg {
59 Some(buf) => {
60 if let Err(e) = writer.write_all(&buf) {
61 tracing::error!("ndjson write error: {e}");
62 metrics::counter!("openpx.ndjson.write_errors").increment(1);
63 }
64 }
65 None => {
66 // Channel closed — flush and exit
67 let _ = writer.flush();
68 return;
69 }
70 }
71 }
72 _ = flush_interval.tick() => {
73 if let Err(e) = writer.flush() {
74 tracing::error!("ndjson flush error: {e}");
75 metrics::counter!("openpx.ndjson.write_errors").increment(1);
76 }
77 }
78 }
79 }
80 }
81}