Skip to main content

obs_core/sink/
ndjson.rs

1//! `NdjsonFileSink` — writes one JSON object per line, on top of any
2//! `MakeWriter` (typically a `RollingFileWriter`). Spec 20 § 3.6.
3
4use std::{
5    io::Write,
6    sync::{
7        Arc,
8        atomic::{AtomicU64, Ordering},
9    },
10};
11
12use parking_lot::Mutex;
13
14use super::{
15    Sink,
16    writer::{ErasedWriter, MakeWriter, RollingFileWriter},
17};
18use crate::registry::ScrubbedEnvelope;
19
20/// File sink that writes envelopes as JSON lines to the underlying
21/// `MakeWriter`. Buffers a `Mutex<ErasedWriter>` per batch to avoid
22/// re-opening files for every event.
23pub struct NdjsonFileSink {
24    writer: Mutex<ErasedWriterMaker>,
25    written: AtomicU64,
26}
27
28impl std::fmt::Debug for NdjsonFileSink {
29    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
30        f.debug_struct("NdjsonFileSink")
31            .field("written", &self.written.load(Ordering::Relaxed))
32            .finish_non_exhaustive()
33    }
34}
35
36struct ErasedWriterMaker {
37    make: Box<dyn FnMut() -> ErasedWriter + Send>,
38}
39
40impl NdjsonFileSink {
41    /// Build atop `RollingFileWriter`, the canonical file destination.
42    #[must_use]
43    pub fn new(rolling: RollingFileWriter) -> Self {
44        Self::with_make_writer(rolling)
45    }
46
47    /// Build atop any `MakeWriter` (test harness, custom destinations).
48    pub fn with_make_writer<M: MakeWriter>(mw: M) -> Self {
49        let mw = Arc::new(mw);
50        let make = Box::new(move || ErasedWriter::new(Arc::clone(&mw).make_writer()));
51        Self {
52            writer: Mutex::new(ErasedWriterMaker { make }),
53            written: AtomicU64::new(0),
54        }
55    }
56
57    /// Total events written successfully.
58    #[must_use]
59    pub fn written_total(&self) -> u64 {
60        self.written.load(Ordering::Relaxed)
61    }
62}
63
64impl Sink for NdjsonFileSink {
65    fn deliver(&self, env: ScrubbedEnvelope<'_>) {
66        let mut maker = self.writer.lock();
67        let mut w = (maker.make)();
68        // Reuse the JSON formatter from StdoutSink for consistency.
69        // Spec 14 § 5 / spec 93 P0-8: render the *scrubbed* payload —
70        // the worker has already redacted classified fields, and
71        // `env.schema()`'s `render_json` walks the bytes here.
72        let envelope = env.envelope();
73        let value = render_json_value(envelope, env.payload(), env.schema());
74        let _ = writeln!(&mut w, "{value}");
75        let _ = w.flush();
76        self.written.fetch_add(1, Ordering::Relaxed);
77    }
78}
79
80fn render_json_value(
81    env: &obs_proto::obs::v1::ObsEnvelope,
82    payload: &[u8],
83    schema: Option<&'static dyn crate::EventSchemaErased>,
84) -> serde_json::Value {
85    use serde_json::{Map, Value};
86    let mut root = Map::new();
87    root.insert("ts_ns".into(), Value::from(env.ts_ns));
88    root.insert("full_name".into(), Value::from(env.full_name.clone()));
89    if env.schema_hash != 0 {
90        root.insert("schema_hash".into(), Value::from(env.schema_hash));
91    }
92    if env.callsite_id != 0 {
93        root.insert("callsite_id".into(), Value::from(env.callsite_id));
94    }
95    if !env.service.is_empty() {
96        root.insert("service".into(), Value::from(env.service.clone()));
97    }
98    if !env.instance.is_empty() {
99        root.insert("instance".into(), Value::from(env.instance.clone()));
100    }
101    if !env.version.is_empty() {
102        root.insert("version".into(), Value::from(env.version.clone()));
103    }
104    if !env.trace_id.is_empty() {
105        root.insert("trace_id".into(), Value::from(env.trace_id.clone()));
106    }
107    if !env.span_id.is_empty() {
108        root.insert("span_id".into(), Value::from(env.span_id.clone()));
109    }
110    if !env.parent_span_id.is_empty() {
111        root.insert(
112            "parent_span_id".into(),
113            Value::from(env.parent_span_id.clone()),
114        );
115    }
116    let mut labels = Map::new();
117    for (k, v) in env.labels.iter() {
118        labels.insert(k.clone(), Value::from(v.clone()));
119    }
120    if !labels.is_empty() {
121        root.insert("labels".into(), Value::Object(labels));
122    }
123    // Project the typed payload (spec 14 § 4.2) so consumers see the
124    // typed fields, not just the wire-bytes blob. Skipped when schema
125    // is unknown or the projection errors (truncation).
126    if !payload.is_empty()
127        && let Some(s) = schema
128    {
129        let mut payload_map = Map::new();
130        if s.render_json(payload, &mut payload_map).is_ok() && !payload_map.is_empty() {
131            root.insert("payload".into(), Value::Object(payload_map));
132        }
133    }
134    Value::Object(root)
135}