1use std::io::Write;
4
5use obs_proto::obs::v1::ObsEnvelope;
6use obs_types::{SamplingReason, Severity, Tier};
7use parking_lot::Mutex;
8
9use super::{
10 Sink,
11 writer::{ErasedWriter, MakeWriter, StdoutWriter},
12};
13use crate::registry::ScrubbedEnvelope;
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
17#[non_exhaustive]
18pub enum FormatterStyle {
19 #[default]
22 Compact,
23 Full,
25 Pretty,
27 Json,
29}
30
31pub struct StdoutSink {
33 style: FormatterStyle,
34 writer: Mutex<ErasedWriterMaker>,
35 severity_floor: Severity,
36}
37
38impl std::fmt::Debug for StdoutSink {
39 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40 f.debug_struct("StdoutSink")
41 .field("style", &self.style)
42 .field("severity_floor", &self.severity_floor)
43 .finish_non_exhaustive()
44 }
45}
46
47struct ErasedWriterMaker {
51 make: Box<dyn FnMut(Severity) -> ErasedWriter + Send>,
52}
53
54impl StdoutSink {
55 #[must_use]
58 pub fn new(style: FormatterStyle) -> Self {
59 Self::with_make_writer(style, StdoutWriter)
60 }
61
62 pub fn with_make_writer<M: MakeWriter>(style: FormatterStyle, mw: M) -> Self {
66 let mw = std::sync::Arc::new(mw);
67 let make = Box::new(move |sev: Severity| {
68 let m = std::sync::Arc::clone(&mw);
69 ErasedWriter::new(m.make_writer_for(sev))
70 });
71 Self {
72 style,
73 writer: Mutex::new(ErasedWriterMaker { make }),
74 severity_floor: Severity::Trace,
75 }
76 }
77
78 #[must_use]
80 pub fn severity_floor(mut self, sev: Severity) -> Self {
81 self.severity_floor = sev;
82 self
83 }
84
85 pub fn with_writer<W: Write + Send + 'static>(style: FormatterStyle, writer: W) -> Self {
88 struct OneShot<W>(parking_lot::Mutex<Option<W>>);
89 impl<W: Write + Send + 'static> MakeWriter for OneShot<W> {
90 type Writer = ErasedWriter;
91 fn make_writer(&self) -> ErasedWriter {
92 if let Some(w) = self.0.lock().take() {
93 ErasedWriter::new(w)
94 } else {
95 ErasedWriter::new(std::io::sink())
98 }
99 }
100 }
101 let shared = std::sync::Arc::new(parking_lot::Mutex::new(Some(writer)));
105 struct Shared<W>(std::sync::Arc<parking_lot::Mutex<Option<W>>>);
106 impl<W: Write + Send + 'static> MakeWriter for Shared<W> {
107 type Writer = ErasedWriter;
108 fn make_writer(&self) -> ErasedWriter {
109 let mut g = self.0.lock();
110 if let Some(w) = g.take() {
111 ErasedWriter::new(SharedWriter {
112 slot: Some(w),
113 back: std::sync::Arc::clone(&self.0),
114 })
115 } else {
116 ErasedWriter::new(std::io::sink())
117 }
118 }
119 }
120 struct SharedWriter<W: Write> {
121 slot: Option<W>,
122 back: std::sync::Arc<parking_lot::Mutex<Option<W>>>,
123 }
124 impl<W: Write> Write for SharedWriter<W> {
125 fn write(&mut self, b: &[u8]) -> std::io::Result<usize> {
126 match self.slot.as_mut() {
127 Some(w) => w.write(b),
128 None => Ok(b.len()),
129 }
130 }
131 fn flush(&mut self) -> std::io::Result<()> {
132 match self.slot.as_mut() {
133 Some(w) => w.flush(),
134 None => Ok(()),
135 }
136 }
137 }
138 impl<W: Write> Drop for SharedWriter<W> {
139 fn drop(&mut self) {
140 if let Some(w) = self.slot.take() {
141 *self.back.lock() = Some(w);
142 }
143 }
144 }
145 let _ = std::any::type_name::<OneShot<()>>();
148 Self::with_make_writer(style, Shared(shared))
149 }
150}
151
152impl Default for StdoutSink {
153 fn default() -> Self {
154 Self::new(FormatterStyle::default())
155 }
156}
157
158impl Sink for StdoutSink {
159 fn deliver(&self, env: ScrubbedEnvelope<'_>) {
160 let envelope = env.envelope();
161 let sev = native_sev(envelope);
162 if sev < self.severity_floor {
163 return;
164 }
165 let mut maker = self.writer.lock();
166 let mut w = (maker.make)(sev);
167 match self.style {
168 FormatterStyle::Compact => render_compact(&mut w, envelope),
169 FormatterStyle::Full => render_full(&mut w, envelope, env.payload().len()),
170 FormatterStyle::Pretty => render_pretty(&mut w, envelope, env.payload().len()),
171 FormatterStyle::Json => render_json(&mut w, envelope),
172 }
173 }
174}
175
176fn native_sev(env: &ObsEnvelope) -> Severity {
177 match env.sev {
178 ::buffa::EnumValue::Known(s) => proto_sev_to_native(s),
179 ::buffa::EnumValue::Unknown(_) => Severity::Unspecified,
180 }
181}
182
183fn render_compact<W: Write>(w: &mut W, env: &ObsEnvelope) {
184 let iso = iso8601_utc(env.ts_ns);
196 let lvl = sev_upper(env);
197
198 let scope_leaf = env
203 .full_name
204 .rsplit_once('.')
205 .map(|(_, leaf)| leaf)
206 .unwrap_or(env.full_name.as_str());
207
208 let fields = tracing_style_fields(env);
209 let scope = if fields.is_empty() {
210 String::new()
211 } else {
212 format!("{scope_leaf}{{{fields}}}: ")
213 };
214
215 let target = &env.full_name;
219
220 if !env.trace_id.is_empty() || !env.span_id.is_empty() {
226 let _ = writeln!(
227 w,
228 "{iso} {lvl:>5} {scope}{target}: trace_id={} span_id={}",
229 dash_or(&env.trace_id),
230 dash_or(&env.span_id),
231 );
232 } else {
233 let _ = writeln!(w, "{iso} {lvl:>5} {scope}{target}");
234 }
235 let _ = w.flush();
236}
237
238fn tracing_style_fields(env: &ObsEnvelope) -> String {
242 if env.labels.is_empty() {
243 return String::new();
244 }
245 let mut keys: Vec<_> = env.labels.keys().collect();
246 keys.sort();
247 let mut s = String::with_capacity(env.labels.len() * 16);
248 for (i, k) in keys.iter().enumerate() {
249 if i > 0 {
250 s.push(' ');
251 }
252 if let Some(v) = env.labels.get(*k) {
253 s.push_str(k);
254 s.push('=');
255 if needs_quoting(v) {
256 s.push('"');
257 for ch in v.chars() {
260 if ch == '"' || ch == '\\' {
261 s.push('\\');
262 }
263 s.push(ch);
264 }
265 s.push('"');
266 } else {
267 s.push_str(v);
268 }
269 }
270 }
271 s
272}
273
274fn needs_quoting(v: &str) -> bool {
275 v.is_empty()
276 || v.chars()
277 .any(|c| c.is_whitespace() || c == '=' || c == '"' || c == '{' || c == '}')
278}
279
280fn sev_upper(env: &ObsEnvelope) -> &'static str {
281 match env.sev {
282 ::buffa::EnumValue::Known(s) => match proto_sev_to_native(s) {
283 Severity::Trace => "TRACE",
284 Severity::Debug => "DEBUG",
285 Severity::Info => "INFO",
286 Severity::Warn => "WARN",
287 Severity::Error => "ERROR",
288 Severity::Fatal => "FATAL",
289 _ => "?",
290 },
291 ::buffa::EnumValue::Unknown(_) => "?",
292 }
293}
294
295fn iso8601_utc(ts_ns: u64) -> String {
299 let secs = (ts_ns / 1_000_000_000) as i64;
303 let nanos = (ts_ns % 1_000_000_000) as u32;
304 let micros = nanos / 1_000;
305
306 let days = secs.div_euclid(86_400);
307 let sec_of_day = secs.rem_euclid(86_400);
308 let hour = (sec_of_day / 3600) as u32;
309 let minute = ((sec_of_day / 60) % 60) as u32;
310 let second = (sec_of_day % 60) as u32;
311
312 let (year, month, day) = civil_from_days(days);
313
314 format!("{year:04}-{month:02}-{day:02}T{hour:02}:{minute:02}:{second:02}.{micros:06}Z")
315}
316
317fn civil_from_days(z: i64) -> (i32, u32, u32) {
321 let z = z + 719_468;
322 let era = z.div_euclid(146_097);
323 let doe = z.rem_euclid(146_097) as u64; let yoe = (doe - doe / 1460 + doe / 36_524 - doe / 146_096) / 365;
325 let y = (yoe as i64) + era * 400;
326 let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
327 let mp = (5 * doy + 2) / 153;
328 let d = (doy - (153 * mp + 2) / 5 + 1) as u32;
329 let m = (if mp < 10 { mp + 3 } else { mp - 9 }) as u32;
330 let y = if m <= 2 { y + 1 } else { y };
331 (y as i32, m, d)
332}
333
334fn render_full<W: Write>(w: &mut W, env: &ObsEnvelope, payload_len: usize) {
335 let _ = writeln!(
336 w,
337 "[{ts:>10}.{ns:09} {sev:<5}] {tier:<6} {full_name}",
338 ts = env.ts_ns / 1_000_000_000,
339 ns = env.ts_ns % 1_000_000_000,
340 sev = sev_str(env),
341 tier = tier_str(env),
342 full_name = env.full_name,
343 );
344 let _ = writeln!(
345 w,
346 " service={} instance={} version={} reason={}",
347 dash_or(&env.service),
348 dash_or(&env.instance),
349 dash_or(&env.version),
350 sampling_reason_str(env),
351 );
352 if !env.trace_id.is_empty() || !env.span_id.is_empty() {
353 let _ = writeln!(
354 w,
355 " trace_id={} span_id={} parent={}",
356 dash_or(&env.trace_id),
357 dash_or(&env.span_id),
358 dash_or(&env.parent_span_id),
359 );
360 }
361 if !env.labels.is_empty() {
362 let mut keys: Vec<_> = env.labels.keys().collect();
363 keys.sort();
364 for k in keys {
365 if let Some(v) = env.labels.get(k) {
366 let _ = writeln!(w, " label.{k}={v}");
367 }
368 }
369 }
370 if payload_len > 0 {
371 let _ = writeln!(w, " payload_bytes={payload_len}");
372 }
373 let _ = w.flush();
374}
375
376fn render_pretty<W: Write>(w: &mut W, env: &ObsEnvelope, payload_len: usize) {
377 let _ = writeln!(
378 w,
379 "─── {full_name} @ {ts}.{ns:09} {sev} {tier} ───",
380 full_name = env.full_name,
381 ts = env.ts_ns / 1_000_000_000,
382 ns = env.ts_ns % 1_000_000_000,
383 sev = sev_str(env),
384 tier = tier_str(env),
385 );
386 let _ = writeln!(
387 w,
388 " service: {} ({}) instance: {}",
389 env.service, env.version, env.instance
390 );
391 if !env.trace_id.is_empty() {
392 let _ = writeln!(
393 w,
394 " trace: {}/{} parent={}",
395 env.trace_id, env.span_id, env.parent_span_id
396 );
397 }
398 if !env.labels.is_empty() {
399 let _ = writeln!(w, " labels:");
400 let mut keys: Vec<_> = env.labels.keys().collect();
401 keys.sort();
402 for k in keys {
403 if let Some(v) = env.labels.get(k) {
404 let _ = writeln!(w, " {k} = {v}");
405 }
406 }
407 }
408 if payload_len > 0 {
409 let _ = writeln!(w, " payload: {payload_len} bytes");
410 }
411 let _ = w.flush();
412}
413
414fn render_json<W: Write>(w: &mut W, env: &ObsEnvelope) {
415 use serde_json::{Map, Value};
416 let mut root = Map::new();
417 root.insert("ts_ns".into(), Value::from(env.ts_ns));
418 root.insert("sev".into(), Value::from(sev_str(env)));
419 root.insert("tier".into(), Value::from(tier_str(env)));
420 root.insert("full_name".into(), Value::from(env.full_name.clone()));
421 if !env.service.is_empty() {
422 root.insert("service".into(), Value::from(env.service.clone()));
423 }
424 if !env.instance.is_empty() {
425 root.insert("instance".into(), Value::from(env.instance.clone()));
426 }
427 if !env.version.is_empty() {
428 root.insert("version".into(), Value::from(env.version.clone()));
429 }
430 if !env.trace_id.is_empty() {
431 root.insert("trace_id".into(), Value::from(env.trace_id.clone()));
432 }
433 if !env.span_id.is_empty() {
434 root.insert("span_id".into(), Value::from(env.span_id.clone()));
435 }
436 if !env.parent_span_id.is_empty() {
437 root.insert(
438 "parent_span_id".into(),
439 Value::from(env.parent_span_id.clone()),
440 );
441 }
442 if env.schema_hash != 0 {
443 root.insert("schema_hash".into(), Value::from(env.schema_hash));
444 }
445 if env.callsite_id != 0 {
446 root.insert("callsite_id".into(), Value::from(env.callsite_id));
447 }
448 if !env.labels.is_empty() {
449 let mut labels = Map::new();
450 for (k, v) in env.labels.iter() {
451 labels.insert(k.clone(), Value::from(v.clone()));
452 }
453 root.insert("labels".into(), Value::Object(labels));
454 }
455 let value = Value::Object(root);
456 let _ = writeln!(w, "{value}");
457 let _ = w.flush();
458}
459
460fn dash_or(s: &str) -> &str {
461 if s.is_empty() { "-" } else { s }
462}
463
464#[allow(dead_code)]
465fn compact_labels(env: &ObsEnvelope) -> String {
466 if env.labels.is_empty() {
467 return "{}".to_string();
468 }
469 let mut keys: Vec<_> = env.labels.keys().collect();
470 keys.sort();
471 let mut s = String::with_capacity(env.labels.len() * 16);
472 s.push('{');
473 for (i, k) in keys.iter().enumerate() {
474 if i > 0 {
475 s.push_str(", ");
476 }
477 if let Some(v) = env.labels.get(*k) {
478 s.push_str(k);
479 s.push('=');
480 s.push_str(v);
481 }
482 }
483 s.push('}');
484 s
485}
486
487fn sev_str(env: &ObsEnvelope) -> &'static str {
488 match env.sev {
489 ::buffa::EnumValue::Known(s) => proto_sev_to_native(s).as_str(),
490 ::buffa::EnumValue::Unknown(_) => Severity::Unspecified.as_str(),
491 }
492}
493
494fn tier_str(env: &ObsEnvelope) -> &'static str {
495 match env.tier {
496 ::buffa::EnumValue::Known(t) => proto_tier_to_native(t).as_str(),
497 ::buffa::EnumValue::Unknown(_) => Tier::Unspecified.as_str(),
498 }
499}
500
501fn sampling_reason_str(env: &ObsEnvelope) -> &'static str {
502 match env.sampling_reason {
503 ::buffa::EnumValue::Known(r) => proto_reason_to_native(r).as_str(),
504 ::buffa::EnumValue::Unknown(_) => SamplingReason::Unspecified.as_str(),
505 }
506}
507
508#[allow(non_snake_case, non_upper_case_globals)]
509fn proto_sev_to_native(s: obs_proto::obs::v1::Severity) -> Severity {
510 use obs_proto::obs::v1::Severity as P;
511 match s {
512 P::SEVERITY_UNSPECIFIED => Severity::Unspecified,
513 P::SEVERITY_TRACE => Severity::Trace,
514 P::SEVERITY_DEBUG => Severity::Debug,
515 P::SEVERITY_INFO => Severity::Info,
516 P::SEVERITY_WARN => Severity::Warn,
517 P::SEVERITY_ERROR => Severity::Error,
518 P::SEVERITY_FATAL => Severity::Fatal,
519 }
520}
521
522#[allow(non_snake_case, non_upper_case_globals)]
523fn proto_tier_to_native(t: obs_proto::obs::v1::Tier) -> Tier {
524 use obs_proto::obs::v1::Tier as P;
525 match t {
526 P::TIER_UNSPECIFIED => Tier::Unspecified,
527 P::TIER_LOG => Tier::Log,
528 P::TIER_METRIC => Tier::Metric,
529 P::TIER_TRACE => Tier::Trace,
530 P::TIER_AUDIT => Tier::Audit,
531 }
532}
533
534#[allow(non_snake_case, non_upper_case_globals)]
535fn proto_reason_to_native(r: obs_proto::obs::v1::SamplingReason) -> SamplingReason {
536 use obs_proto::obs::v1::SamplingReason as P;
537 match r {
538 P::SAMPLING_REASON_UNSPECIFIED => SamplingReason::Unspecified,
539 P::SAMPLING_REASON_HEAD_RATE => SamplingReason::HeadRate,
540 P::SAMPLING_REASON_TAIL_ERROR => SamplingReason::TailError,
541 P::SAMPLING_REASON_SLOW => SamplingReason::Slow,
542 P::SAMPLING_REASON_FORENSIC => SamplingReason::Forensic,
543 P::SAMPLING_REASON_AUDIT => SamplingReason::Audit,
544 P::SAMPLING_REASON_RUNTIME => SamplingReason::Runtime,
545 P::SAMPLING_REASON_OVERRIDE => SamplingReason::Override,
546 }
547}
548
549#[cfg(test)]
550mod tests {
551 use obs_proto::obs::v1::Severity as PSev;
552
553 use super::*;
554
555 fn env(full_name: &str, sev: PSev, ts_ns: u64) -> ObsEnvelope {
556 ObsEnvelope {
557 full_name: full_name.to_string(),
558 sev: ::buffa::EnumValue::Known(sev),
559 ts_ns,
560 ..Default::default()
561 }
562 }
563
564 const REF_TS_NS: u64 = 1_778_167_860_000_000_000 + 123_456_000;
568
569 #[test]
570 fn test_iso8601_utc_matches_tracing_fmt_shape() {
571 let s = iso8601_utc(REF_TS_NS);
572 assert_eq!(s, "2026-05-07T15:31:00.123456Z");
573 }
574
575 #[test]
576 fn test_render_compact_mirrors_tracing_fmt_compact() {
577 let mut e = env("my_crate.process_order", PSev::SEVERITY_INFO, REF_TS_NS);
581 e.labels.insert("id".to_string(), "42".to_string());
582 e.labels.insert("item".to_string(), "Rust Book".to_string());
583 let mut buf: Vec<u8> = Vec::new();
584 render_compact(&mut buf, &e);
585 let line = String::from_utf8(buf).expect("utf-8");
586 assert_eq!(
587 line,
588 "2026-05-07T15:31:00.123456Z INFO process_order{id=42 item=\"Rust Book\"}: \
589 my_crate.process_order\n"
590 );
591 }
592
593 #[test]
594 fn test_render_compact_appends_trace_context_when_present() {
595 let mut e = env("x.y", PSev::SEVERITY_INFO, REF_TS_NS);
596 e.trace_id = "0123456789abcdef0123456789abcdef".to_string();
597 e.span_id = "0123456789abcdef".to_string();
598 let mut buf: Vec<u8> = Vec::new();
599 render_compact(&mut buf, &e);
600 let line = String::from_utf8(buf).expect("utf-8");
601 assert_eq!(
602 line,
603 "2026-05-07T15:31:00.123456Z INFO x.y: trace_id=0123456789abcdef0123456789abcdef \
604 span_id=0123456789abcdef\n"
605 );
606 }
607
608 #[test]
609 fn test_render_compact_drops_scope_block_when_no_labels() {
610 let e = env("x.y.Z", PSev::SEVERITY_INFO, REF_TS_NS);
612 let mut buf: Vec<u8> = Vec::new();
613 render_compact(&mut buf, &e);
614 let line = String::from_utf8(buf).expect("utf-8");
615 assert_eq!(line, "2026-05-07T15:31:00.123456Z INFO x.y.Z\n");
616 }
617
618 #[test]
619 fn test_render_compact_pads_severity_to_five() {
620 let e = env("x.y", PSev::SEVERITY_WARN, 0);
621 let mut buf: Vec<u8> = Vec::new();
622 render_compact(&mut buf, &e);
623 let line = String::from_utf8(buf).expect("utf-8");
624 assert!(line.contains(" WARN "), "line: {line}");
627 }
628
629 #[test]
630 fn test_tracing_style_fields_quotes_when_needed() {
631 let mut e = env("x.y", PSev::SEVERITY_INFO, 0);
632 e.labels.insert("a".to_string(), "simple".to_string());
633 e.labels.insert("b".to_string(), "with space".to_string());
634 e.labels
635 .insert("c".to_string(), "with \"quote\"".to_string());
636 let s = tracing_style_fields(&e);
637 assert!(s.contains("a=simple"));
638 assert!(s.contains("b=\"with space\""));
639 assert!(s.contains(r#"c="with \"quote\"""#));
640 }
641
642 #[test]
643 fn test_civil_from_days_round_trip_recent_dates() {
644 assert_eq!(civil_from_days(0), (1970, 1, 1));
646 assert_eq!(civil_from_days(20_580), (2026, 5, 7));
648 assert_eq!(civil_from_days(19_782), (2024, 2, 29));
650 }
651}