Skip to main content

px_core/websocket/
ndjson.rs

1use serde::Serialize;
2use std::io::Write;
3use tokio::sync::mpsc;
4
5/// Channel-based async NDJSON writer that decouples the hot path from disk I/O.
6///
7/// Broadcast sites call `write_record()` which serializes to `Vec<u8>` and does
8/// a non-blocking `mpsc::send()` — no lock, no disk I/O on the hot path.
9/// A dedicated tokio task owns the `BufWriter` and flushes on a timer.
10pub struct NdjsonWriter {
11    tx: mpsc::UnboundedSender<Vec<u8>>,
12}
13
14impl NdjsonWriter {
15    /// Spawn a dedicated writer task that owns `writer` exclusively.
16    ///
17    /// Accepts `W: Write + Send + 'static` so callers can pass `File`,
18    /// `BufWriter<File>`, or a future rotation-aware writer without changing
19    /// any broadcast site code.
20    pub fn new<W: Write + Send + 'static>(writer: W) -> Self {
21        let (tx, rx) = mpsc::unbounded_channel();
22        tokio::spawn(Self::writer_task(writer, rx));
23        Self { tx }
24    }
25
26    /// Serialize `msg` to NDJSON and enqueue for writing. Non-blocking.
27    /// If the writer task has died (channel closed), the record is silently dropped.
28    #[inline]
29    pub fn write_record<T: Serialize>(&self, msg: &T) {
30        match serde_json::to_vec(msg) {
31            Ok(mut buf) => {
32                buf.push(b'\n');
33                // Non-blocking send — if channel is closed, record is dropped
34                if self.tx.send(buf).is_err() {
35                    tracing::warn!("ndjson writer channel closed, record dropped");
36                }
37            }
38            Err(e) => {
39                tracing::error!("ndjson serialization error: {e}");
40                metrics::counter!("openpx.ndjson.write_errors").increment(1);
41            }
42        }
43    }
44
45    /// Background task: drains the channel, writes to BufWriter, flushes every 250ms.
46    async fn writer_task<W: Write + Send + 'static>(
47        inner: W,
48        mut rx: mpsc::UnboundedReceiver<Vec<u8>>,
49    ) {
50        use std::io::BufWriter;
51
52        let mut writer = BufWriter::new(inner);
53        let mut flush_interval = tokio::time::interval(std::time::Duration::from_millis(250));
54
55        loop {
56            tokio::select! {
57                msg = rx.recv() => {
58                    match msg {
59                        Some(buf) => {
60                            if let Err(e) = writer.write_all(&buf) {
61                                tracing::error!("ndjson write error: {e}");
62                                metrics::counter!("openpx.ndjson.write_errors").increment(1);
63                            }
64                        }
65                        None => {
66                            // Channel closed — flush and exit
67                            let _ = writer.flush();
68                            return;
69                        }
70                    }
71                }
72                _ = flush_interval.tick() => {
73                    if let Err(e) = writer.flush() {
74                        tracing::error!("ndjson flush error: {e}");
75                        metrics::counter!("openpx.ndjson.write_errors").increment(1);
76                    }
77                }
78            }
79        }
80    }
81}