1use std::{
5 io::Write,
6 sync::{
7 Arc,
8 atomic::{AtomicU64, Ordering},
9 },
10};
11
12use parking_lot::Mutex;
13
14use super::{
15 Sink,
16 writer::{ErasedWriter, MakeWriter, RollingFileWriter},
17};
18use crate::registry::ScrubbedEnvelope;
19
20pub struct NdjsonFileSink {
24 writer: Mutex<ErasedWriterMaker>,
25 written: AtomicU64,
26}
27
28impl std::fmt::Debug for NdjsonFileSink {
29 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
30 f.debug_struct("NdjsonFileSink")
31 .field("written", &self.written.load(Ordering::Relaxed))
32 .finish_non_exhaustive()
33 }
34}
35
36struct ErasedWriterMaker {
37 make: Box<dyn FnMut() -> ErasedWriter + Send>,
38}
39
40impl NdjsonFileSink {
41 #[must_use]
43 pub fn new(rolling: RollingFileWriter) -> Self {
44 Self::with_make_writer(rolling)
45 }
46
47 pub fn with_make_writer<M: MakeWriter>(mw: M) -> Self {
49 let mw = Arc::new(mw);
50 let make = Box::new(move || ErasedWriter::new(Arc::clone(&mw).make_writer()));
51 Self {
52 writer: Mutex::new(ErasedWriterMaker { make }),
53 written: AtomicU64::new(0),
54 }
55 }
56
57 #[must_use]
59 pub fn written_total(&self) -> u64 {
60 self.written.load(Ordering::Relaxed)
61 }
62}
63
64impl Sink for NdjsonFileSink {
65 fn deliver(&self, env: ScrubbedEnvelope<'_>) {
66 let mut maker = self.writer.lock();
67 let mut w = (maker.make)();
68 let envelope = env.envelope();
73 let value = render_json_value(envelope, env.payload(), env.schema());
74 let _ = writeln!(&mut w, "{value}");
75 let _ = w.flush();
76 self.written.fetch_add(1, Ordering::Relaxed);
77 }
78}
79
80fn render_json_value(
81 env: &obs_proto::obs::v1::ObsEnvelope,
82 payload: &[u8],
83 schema: Option<&'static dyn crate::EventSchemaErased>,
84) -> serde_json::Value {
85 use serde_json::{Map, Value};
86 let mut root = Map::new();
87 root.insert("ts_ns".into(), Value::from(env.ts_ns));
88 root.insert("full_name".into(), Value::from(env.full_name.clone()));
89 if env.schema_hash != 0 {
90 root.insert("schema_hash".into(), Value::from(env.schema_hash));
91 }
92 if env.callsite_id != 0 {
93 root.insert("callsite_id".into(), Value::from(env.callsite_id));
94 }
95 if !env.service.is_empty() {
96 root.insert("service".into(), Value::from(env.service.clone()));
97 }
98 if !env.instance.is_empty() {
99 root.insert("instance".into(), Value::from(env.instance.clone()));
100 }
101 if !env.version.is_empty() {
102 root.insert("version".into(), Value::from(env.version.clone()));
103 }
104 if !env.trace_id.is_empty() {
105 root.insert("trace_id".into(), Value::from(env.trace_id.clone()));
106 }
107 if !env.span_id.is_empty() {
108 root.insert("span_id".into(), Value::from(env.span_id.clone()));
109 }
110 if !env.parent_span_id.is_empty() {
111 root.insert(
112 "parent_span_id".into(),
113 Value::from(env.parent_span_id.clone()),
114 );
115 }
116 let mut labels = Map::new();
117 for (k, v) in env.labels.iter() {
118 labels.insert(k.clone(), Value::from(v.clone()));
119 }
120 if !labels.is_empty() {
121 root.insert("labels".into(), Value::Object(labels));
122 }
123 if !payload.is_empty()
127 && let Some(s) = schema
128 {
129 let mut payload_map = Map::new();
130 if s.render_json(payload, &mut payload_map).is_ok() && !payload_map.is_empty() {
131 root.insert("payload".into(), Value::Object(payload_map));
132 }
133 }
134 Value::Object(root)
135}