Skip to main content

obs_core/sink/
stdout.rs

1//! `StdoutSink` — formatter-driven stdout / writer sink. Spec 20 § 3.6.
2
3use std::io::Write;
4
5use obs_proto::obs::v1::{ObsEnvelope, SamplingReason, Severity, Tier};
6use parking_lot::Mutex;
7
8use super::{
9    Sink,
10    writer::{ErasedWriter, MakeWriter, StdoutWriter},
11};
12use crate::registry::ScrubbedEnvelope;
13
14/// Output style for [`StdoutSink`]. See spec 20 § 3.6.
15#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
16#[non_exhaustive]
17pub enum FormatterStyle {
18    /// Single line; tracing-fmt-shaped. Default — readable under `tail
19    /// -f` and friendly to `grep`. Boundary-review § 4.6 + spec 20 § 3.6.
20    #[default]
21    Compact,
22    /// Single line; full envelope with explicit field names.
23    Full,
24    /// Multi-line; human-readable, dev-focused.
25    Pretty,
26    /// Newline-delimited JSON; production stdout.
27    Json,
28}
29
30/// Stdout / writer-backed sink.
31pub struct StdoutSink {
32    style: FormatterStyle,
33    writer: Mutex<ErasedWriterMaker>,
34    severity_floor: Severity,
35}
36
37impl std::fmt::Debug for StdoutSink {
38    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39        f.debug_struct("StdoutSink")
40            .field("style", &self.style)
41            .field("severity_floor", &self.severity_floor)
42            .finish_non_exhaustive()
43    }
44}
45
46/// Erases the `MakeWriter` factory behind a closure so `StdoutSink`
47/// can hold writers of arbitrary concrete types without leaking them
48/// into the public API.
49struct ErasedWriterMaker {
50    make: Box<dyn FnMut(Severity) -> ErasedWriter + Send>,
51}
52
53impl StdoutSink {
54    /// Construct a stdout sink with the given style; writes to
55    /// `std::io::stdout()`.
56    #[must_use]
57    pub fn new(style: FormatterStyle) -> Self {
58        Self::with_make_writer(style, StdoutWriter)
59    }
60
61    /// Construct with a caller-provided `MakeWriter`. Used to wire
62    /// `LevelSplitWriter`, `RollingFileWriter`, `NonBlockingWriter`,
63    /// or test harnesses.
64    pub fn with_make_writer<M: MakeWriter>(style: FormatterStyle, mw: M) -> Self {
65        let mw = std::sync::Arc::new(mw);
66        let make = Box::new(move |sev: Severity| {
67            let m = std::sync::Arc::clone(&mw);
68            ErasedWriter::new(m.make_writer_for(sev))
69        });
70        Self {
71            style,
72            writer: Mutex::new(ErasedWriterMaker { make }),
73            severity_floor: Severity::Trace,
74        }
75    }
76
77    /// Set a severity floor; envelopes below it are dropped.
78    #[must_use]
79    pub fn severity_floor(mut self, sev: Severity) -> Self {
80        self.severity_floor = sev;
81        self
82    }
83
84    /// Test helper: build a sink that writes into `writer` using
85    /// `FormatterStyle::Full`.
86    pub fn with_writer<W: Write + Send + 'static>(style: FormatterStyle, writer: W) -> Self {
87        struct OneShot<W>(parking_lot::Mutex<Option<W>>);
88        impl<W: Write + Send + 'static> MakeWriter for OneShot<W> {
89            type Writer = ErasedWriter;
90            fn make_writer(&self) -> ErasedWriter {
91                if let Some(w) = self.0.lock().take() {
92                    ErasedWriter::new(w)
93                } else {
94                    // Subsequent writes fall back to discarding to keep
95                    // the test contract simple.
96                    ErasedWriter::new(std::io::sink())
97                }
98            }
99        }
100        // OneShot can't actually own a single writer across batches;
101        // instead, share it through Arc<Mutex<...>>. Use a more
102        // realistic shared-vec test writer.
103        let shared = std::sync::Arc::new(parking_lot::Mutex::new(Some(writer)));
104        struct Shared<W>(std::sync::Arc<parking_lot::Mutex<Option<W>>>);
105        impl<W: Write + Send + 'static> MakeWriter for Shared<W> {
106            type Writer = ErasedWriter;
107            fn make_writer(&self) -> ErasedWriter {
108                let mut g = self.0.lock();
109                if let Some(w) = g.take() {
110                    ErasedWriter::new(SharedWriter {
111                        slot: Some(w),
112                        back: std::sync::Arc::clone(&self.0),
113                    })
114                } else {
115                    ErasedWriter::new(std::io::sink())
116                }
117            }
118        }
119        struct SharedWriter<W: Write> {
120            slot: Option<W>,
121            back: std::sync::Arc<parking_lot::Mutex<Option<W>>>,
122        }
123        impl<W: Write> Write for SharedWriter<W> {
124            fn write(&mut self, b: &[u8]) -> std::io::Result<usize> {
125                match self.slot.as_mut() {
126                    Some(w) => w.write(b),
127                    None => Ok(b.len()),
128                }
129            }
130            fn flush(&mut self) -> std::io::Result<()> {
131                match self.slot.as_mut() {
132                    Some(w) => w.flush(),
133                    None => Ok(()),
134                }
135            }
136        }
137        impl<W: Write> Drop for SharedWriter<W> {
138            fn drop(&mut self) {
139                if let Some(w) = self.slot.take() {
140                    *self.back.lock() = Some(w);
141                }
142            }
143        }
144        // Quiet the unused-OneShot warning: keep both for symmetry with
145        // earlier API but use Shared for actual functionality.
146        let _ = std::any::type_name::<OneShot<()>>();
147        Self::with_make_writer(style, Shared(shared))
148    }
149}
150
151impl Default for StdoutSink {
152    fn default() -> Self {
153        Self::new(FormatterStyle::default())
154    }
155}
156
157impl Sink for StdoutSink {
158    fn deliver(&self, env: ScrubbedEnvelope<'_>) {
159        let envelope = env.envelope();
160        let sev = native_sev(envelope);
161        if sev < self.severity_floor {
162            return;
163        }
164        let mut maker = self.writer.lock();
165        let mut w = (maker.make)(sev);
166        match self.style {
167            FormatterStyle::Compact => render_compact(&mut w, envelope),
168            FormatterStyle::Full => render_full(&mut w, envelope, env.payload().len()),
169            FormatterStyle::Pretty => render_pretty(&mut w, envelope, env.payload().len()),
170            FormatterStyle::Json => render_json(&mut w, envelope),
171        }
172    }
173}
174
175fn native_sev(env: &ObsEnvelope) -> Severity {
176    match env.sev {
177        ::buffa::EnumValue::Known(s) => s,
178        ::buffa::EnumValue::Unknown(_) => Severity::Unspecified,
179    }
180}
181
182fn render_compact<W: Write>(w: &mut W, env: &ObsEnvelope) {
183    // Match tracing-subscriber's compact format:
184    //
185    //   2026-05-07T15:31:00.123456Z  INFO scope{k=v ...}: target: message
186    //
187    // Mapping from the obs envelope:
188    //   - timestamp     → RFC3339 UTC from `ts_ns`
189    //   - LEVEL         → `sev_str(env)` upper-cased, right-padded to 5
190    //   - scope{fields} → envelope `labels` when present (sorted)
191    //   - target        → `env.full_name`
192    //   - message       → trailing trace_id/span_id when present, empty otherwise (obs envelopes
193    //     are schema-driven; the schema name IS the message).
194    let iso = iso8601_utc(env.ts_ns);
195    let lvl = sev_upper(env);
196
197    // Scope is the labels block in tracing style: `name{k=v k=v}`.
198    // There's no separate "span name" on the envelope, so use the
199    // leaf of `full_name` — matches how `tracing::instrument` prints
200    // the function name.
201    let scope_leaf = env
202        .full_name
203        .rsplit_once('.')
204        .map(|(_, leaf)| leaf)
205        .unwrap_or(env.full_name.as_str());
206
207    let fields = tracing_style_fields(env);
208    let scope = if fields.is_empty() {
209        String::new()
210    } else {
211        format!("{scope_leaf}{{{fields}}}: ")
212    };
213
214    // Target: the full schema name. tracing-subscriber prints the
215    // crate::module path; the envelope's `full_name` is the analogue
216    // for schema-driven emits.
217    let target = &env.full_name;
218
219    // Message tail: trace correlation when present. Keeps noise off
220    // the common line while still surfacing the linkage for any emit
221    // inside an active scope. When both are empty (the common case for
222    // schema-only emits) the `: <tail>` suffix disappears entirely so
223    // the line ends at the target name — no trailing `: ` dangler.
224    if !env.trace_id.is_empty() || !env.span_id.is_empty() {
225        let _ = writeln!(
226            w,
227            "{iso} {lvl:>5} {scope}{target}: trace_id={} span_id={}",
228            dash_or(&env.trace_id),
229            dash_or(&env.span_id),
230        );
231    } else {
232        let _ = writeln!(w, "{iso} {lvl:>5} {scope}{target}");
233    }
234    let _ = w.flush();
235}
236
237/// Render `env.labels` as tracing-style `k=v k=v` — space-separated,
238/// keys sorted, string values quoted iff they contain spaces or
239/// `=`/`"` characters so trivial values stay unquoted.
240fn tracing_style_fields(env: &ObsEnvelope) -> String {
241    if env.labels.is_empty() {
242        return String::new();
243    }
244    let mut keys: Vec<_> = env.labels.keys().collect();
245    keys.sort();
246    let mut s = String::with_capacity(env.labels.len() * 16);
247    for (i, k) in keys.iter().enumerate() {
248        if i > 0 {
249            s.push(' ');
250        }
251        if let Some(v) = env.labels.get(*k) {
252            s.push_str(k);
253            s.push('=');
254            if needs_quoting(v) {
255                s.push('"');
256                // Escape embedded quotes + backslashes so the output
257                // stays parseable.
258                for ch in v.chars() {
259                    if ch == '"' || ch == '\\' {
260                        s.push('\\');
261                    }
262                    s.push(ch);
263                }
264                s.push('"');
265            } else {
266                s.push_str(v);
267            }
268        }
269    }
270    s
271}
272
273fn needs_quoting(v: &str) -> bool {
274    v.is_empty()
275        || v.chars()
276            .any(|c| c.is_whitespace() || c == '=' || c == '"' || c == '{' || c == '}')
277}
278
279fn sev_upper(env: &ObsEnvelope) -> &'static str {
280    match env.sev {
281        ::buffa::EnumValue::Known(s) => match s {
282            Severity::Trace => "TRACE",
283            Severity::Debug => "DEBUG",
284            Severity::Info => "INFO",
285            Severity::Warn => "WARN",
286            Severity::Error => "ERROR",
287            Severity::Fatal => "FATAL",
288            _ => "?",
289        },
290        ::buffa::EnumValue::Unknown(_) => "?",
291    }
292}
293
294/// Render `ts_ns` (Unix epoch nanoseconds) as RFC3339 UTC with
295/// microsecond resolution: `YYYY-MM-DDTHH:MM:SS.ffffffZ`. Matches
296/// tracing-subscriber's default timestamp format.
297fn iso8601_utc(ts_ns: u64) -> String {
298    // Unix day 0 is 1970-01-01 (Thursday). Use the civil-date algorithm
299    // from Howard Hinnant — division-only, no lookup tables. Valid for
300    // the entire range ts_ns can represent (through AD 2554).
301    let secs = (ts_ns / 1_000_000_000) as i64;
302    let nanos = (ts_ns % 1_000_000_000) as u32;
303    let micros = nanos / 1_000;
304
305    let days = secs.div_euclid(86_400);
306    let sec_of_day = secs.rem_euclid(86_400);
307    let hour = (sec_of_day / 3600) as u32;
308    let minute = ((sec_of_day / 60) % 60) as u32;
309    let second = (sec_of_day % 60) as u32;
310
311    let (year, month, day) = civil_from_days(days);
312
313    format!("{year:04}-{month:02}-{day:02}T{hour:02}:{minute:02}:{second:02}.{micros:06}Z")
314}
315
316/// Convert days-since-1970 to `(year, month, day)`. Howard Hinnant's
317/// [date algorithm](https://howardhinnant.github.io/date_algorithms.html)
318/// — integer-only, no lookup tables.
319fn civil_from_days(z: i64) -> (i32, u32, u32) {
320    let z = z + 719_468;
321    let era = z.div_euclid(146_097);
322    let doe = z.rem_euclid(146_097) as u64; // [0, 146096]
323    let yoe = (doe - doe / 1460 + doe / 36_524 - doe / 146_096) / 365;
324    let y = (yoe as i64) + era * 400;
325    let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
326    let mp = (5 * doy + 2) / 153;
327    let d = (doy - (153 * mp + 2) / 5 + 1) as u32;
328    let m = (if mp < 10 { mp + 3 } else { mp - 9 }) as u32;
329    let y = if m <= 2 { y + 1 } else { y };
330    (y as i32, m, d)
331}
332
333fn render_full<W: Write>(w: &mut W, env: &ObsEnvelope, payload_len: usize) {
334    let _ = writeln!(
335        w,
336        "[{ts:>10}.{ns:09} {sev:<5}] {tier:<6} {full_name}",
337        ts = env.ts_ns / 1_000_000_000,
338        ns = env.ts_ns % 1_000_000_000,
339        sev = sev_str(env),
340        tier = tier_str(env),
341        full_name = env.full_name,
342    );
343    let _ = writeln!(
344        w,
345        "  service={} instance={} version={} reason={}",
346        dash_or(&env.service),
347        dash_or(&env.instance),
348        dash_or(&env.version),
349        sampling_reason_str(env),
350    );
351    if !env.trace_id.is_empty() || !env.span_id.is_empty() {
352        let _ = writeln!(
353            w,
354            "  trace_id={} span_id={} parent={}",
355            dash_or(&env.trace_id),
356            dash_or(&env.span_id),
357            dash_or(&env.parent_span_id),
358        );
359    }
360    if !env.labels.is_empty() {
361        let mut keys: Vec<_> = env.labels.keys().collect();
362        keys.sort();
363        for k in keys {
364            if let Some(v) = env.labels.get(k) {
365                let _ = writeln!(w, "  label.{k}={v}");
366            }
367        }
368    }
369    if payload_len > 0 {
370        let _ = writeln!(w, "  payload_bytes={payload_len}");
371    }
372    let _ = w.flush();
373}
374
375fn render_pretty<W: Write>(w: &mut W, env: &ObsEnvelope, payload_len: usize) {
376    let _ = writeln!(
377        w,
378        "─── {full_name} @ {ts}.{ns:09} {sev} {tier} ───",
379        full_name = env.full_name,
380        ts = env.ts_ns / 1_000_000_000,
381        ns = env.ts_ns % 1_000_000_000,
382        sev = sev_str(env),
383        tier = tier_str(env),
384    );
385    let _ = writeln!(
386        w,
387        "    service: {} ({}) instance: {}",
388        env.service, env.version, env.instance
389    );
390    if !env.trace_id.is_empty() {
391        let _ = writeln!(
392            w,
393            "    trace:   {}/{} parent={}",
394            env.trace_id, env.span_id, env.parent_span_id
395        );
396    }
397    if !env.labels.is_empty() {
398        let _ = writeln!(w, "    labels:");
399        let mut keys: Vec<_> = env.labels.keys().collect();
400        keys.sort();
401        for k in keys {
402            if let Some(v) = env.labels.get(k) {
403                let _ = writeln!(w, "        {k} = {v}");
404            }
405        }
406    }
407    if payload_len > 0 {
408        let _ = writeln!(w, "    payload: {payload_len} bytes");
409    }
410    let _ = w.flush();
411}
412
413fn render_json<W: Write>(w: &mut W, env: &ObsEnvelope) {
414    use serde_json::{Map, Value};
415    let mut root = Map::new();
416    root.insert("ts_ns".into(), Value::from(env.ts_ns));
417    root.insert("sev".into(), Value::from(sev_str(env)));
418    root.insert("tier".into(), Value::from(tier_str(env)));
419    root.insert("full_name".into(), Value::from(env.full_name.clone()));
420    if !env.service.is_empty() {
421        root.insert("service".into(), Value::from(env.service.clone()));
422    }
423    if !env.instance.is_empty() {
424        root.insert("instance".into(), Value::from(env.instance.clone()));
425    }
426    if !env.version.is_empty() {
427        root.insert("version".into(), Value::from(env.version.clone()));
428    }
429    if !env.trace_id.is_empty() {
430        root.insert("trace_id".into(), Value::from(env.trace_id.clone()));
431    }
432    if !env.span_id.is_empty() {
433        root.insert("span_id".into(), Value::from(env.span_id.clone()));
434    }
435    if !env.parent_span_id.is_empty() {
436        root.insert(
437            "parent_span_id".into(),
438            Value::from(env.parent_span_id.clone()),
439        );
440    }
441    if env.schema_hash != 0 {
442        root.insert("schema_hash".into(), Value::from(env.schema_hash));
443    }
444    if env.callsite_id != 0 {
445        root.insert("callsite_id".into(), Value::from(env.callsite_id));
446    }
447    if !env.labels.is_empty() {
448        let mut labels = Map::new();
449        for (k, v) in env.labels.iter() {
450            labels.insert(k.clone(), Value::from(v.clone()));
451        }
452        root.insert("labels".into(), Value::Object(labels));
453    }
454    let value = Value::Object(root);
455    let _ = writeln!(w, "{value}");
456    let _ = w.flush();
457}
458
459fn dash_or(s: &str) -> &str {
460    if s.is_empty() { "-" } else { s }
461}
462
463#[allow(dead_code)]
464fn compact_labels(env: &ObsEnvelope) -> String {
465    if env.labels.is_empty() {
466        return "{}".to_string();
467    }
468    let mut keys: Vec<_> = env.labels.keys().collect();
469    keys.sort();
470    let mut s = String::with_capacity(env.labels.len() * 16);
471    s.push('{');
472    for (i, k) in keys.iter().enumerate() {
473        if i > 0 {
474            s.push_str(", ");
475        }
476        if let Some(v) = env.labels.get(*k) {
477            s.push_str(k);
478            s.push('=');
479            s.push_str(v);
480        }
481    }
482    s.push('}');
483    s
484}
485
486fn sev_str(env: &ObsEnvelope) -> &'static str {
487    match env.sev {
488        ::buffa::EnumValue::Known(s) => s.as_str(),
489        ::buffa::EnumValue::Unknown(_) => Severity::Unspecified.as_str(),
490    }
491}
492
493fn tier_str(env: &ObsEnvelope) -> &'static str {
494    match env.tier {
495        ::buffa::EnumValue::Known(t) => t.as_str(),
496        ::buffa::EnumValue::Unknown(_) => Tier::Unspecified.as_str(),
497    }
498}
499
500fn sampling_reason_str(env: &ObsEnvelope) -> &'static str {
501    match env.sampling_reason {
502        ::buffa::EnumValue::Known(r) => r.as_str(),
503        ::buffa::EnumValue::Unknown(_) => SamplingReason::Unspecified.as_str(),
504    }
505}
506
507#[cfg(test)]
508mod tests {
509    use obs_proto::obs::v1::Severity as PSev;
510
511    use super::*;
512
513    fn env(full_name: &str, sev: PSev, ts_ns: u64) -> ObsEnvelope {
514        ObsEnvelope {
515            full_name: full_name.to_string(),
516            sev: ::buffa::EnumValue::Known(sev),
517            ts_ns,
518            ..Default::default()
519        }
520    }
521
522    // 2026-05-07T15:31:00 UTC = 1778167860 seconds since epoch.
523    // Add 123_456 µs = 123_456_000 ns to get the exact timestamp
524    // from the tracing-fmt reference line.
525    const REF_TS_NS: u64 = 1_778_167_860_000_000_000 + 123_456_000;
526
527    #[test]
528    fn test_iso8601_utc_matches_tracing_fmt_shape() {
529        let s = iso8601_utc(REF_TS_NS);
530        assert_eq!(s, "2026-05-07T15:31:00.123456Z");
531    }
532
533    #[test]
534    fn test_render_compact_mirrors_tracing_fmt_compact() {
535        // Matches the shape:
536        //   2026-05-07T15:31:00.123456Z  INFO scope{k=v}: target
537        // No trailing `: ` when there's no trace context / message.
538        let mut e = env("my_crate.process_order", PSev::SEVERITY_INFO, REF_TS_NS);
539        e.labels.insert("id".to_string(), "42".to_string());
540        e.labels.insert("item".to_string(), "Rust Book".to_string());
541        let mut buf: Vec<u8> = Vec::new();
542        render_compact(&mut buf, &e);
543        let line = String::from_utf8(buf).expect("utf-8");
544        assert_eq!(
545            line,
546            "2026-05-07T15:31:00.123456Z  INFO process_order{id=42 item=\"Rust Book\"}: \
547             my_crate.process_order\n"
548        );
549    }
550
551    #[test]
552    fn test_render_compact_appends_trace_context_when_present() {
553        let mut e = env("x.y", PSev::SEVERITY_INFO, REF_TS_NS);
554        e.trace_id = "0123456789abcdef0123456789abcdef".to_string();
555        e.span_id = "0123456789abcdef".to_string();
556        let mut buf: Vec<u8> = Vec::new();
557        render_compact(&mut buf, &e);
558        let line = String::from_utf8(buf).expect("utf-8");
559        assert_eq!(
560            line,
561            "2026-05-07T15:31:00.123456Z  INFO x.y: trace_id=0123456789abcdef0123456789abcdef \
562             span_id=0123456789abcdef\n"
563        );
564    }
565
566    #[test]
567    fn test_render_compact_drops_scope_block_when_no_labels() {
568        // Empty labels → no `scope{...}` prefix, no trailing `: `.
569        let e = env("x.y.Z", PSev::SEVERITY_INFO, REF_TS_NS);
570        let mut buf: Vec<u8> = Vec::new();
571        render_compact(&mut buf, &e);
572        let line = String::from_utf8(buf).expect("utf-8");
573        assert_eq!(line, "2026-05-07T15:31:00.123456Z  INFO x.y.Z\n");
574    }
575
576    #[test]
577    fn test_render_compact_pads_severity_to_five() {
578        let e = env("x.y", PSev::SEVERITY_WARN, 0);
579        let mut buf: Vec<u8> = Vec::new();
580        render_compact(&mut buf, &e);
581        let line = String::from_utf8(buf).expect("utf-8");
582        // `  WARN` — two-space lead (from the format right-pad), then
583        // "WARN" (4 chars, padded to 5 = 1 trailing space).
584        assert!(line.contains(" WARN "), "line: {line}");
585    }
586
587    #[test]
588    fn test_tracing_style_fields_quotes_when_needed() {
589        let mut e = env("x.y", PSev::SEVERITY_INFO, 0);
590        e.labels.insert("a".to_string(), "simple".to_string());
591        e.labels.insert("b".to_string(), "with space".to_string());
592        e.labels
593            .insert("c".to_string(), "with \"quote\"".to_string());
594        let s = tracing_style_fields(&e);
595        assert!(s.contains("a=simple"));
596        assert!(s.contains("b=\"with space\""));
597        assert!(s.contains(r#"c="with \"quote\"""#));
598    }
599
600    #[test]
601    fn test_civil_from_days_round_trip_recent_dates() {
602        // Unix day 0 = 1970-01-01.
603        assert_eq!(civil_from_days(0), (1970, 1, 1));
604        // 2026-05-07 — 20,580 days since epoch.
605        assert_eq!(civil_from_days(20_580), (2026, 5, 7));
606        // Leap day: 2024-02-29 — 19,782 days since epoch.
607        assert_eq!(civil_from_days(19_782), (2024, 2, 29));
608    }
609}