1use std::io::Write;
4
5use obs_proto::obs::v1::{ObsEnvelope, SamplingReason, Severity, Tier};
6use parking_lot::Mutex;
7
8use super::{
9 Sink,
10 writer::{ErasedWriter, MakeWriter, StdoutWriter},
11};
12use crate::registry::ScrubbedEnvelope;
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
16#[non_exhaustive]
17pub enum FormatterStyle {
18 #[default]
21 Compact,
22 Full,
24 Pretty,
26 Json,
28}
29
30pub struct StdoutSink {
32 style: FormatterStyle,
33 writer: Mutex<ErasedWriterMaker>,
34 severity_floor: Severity,
35}
36
37impl std::fmt::Debug for StdoutSink {
38 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39 f.debug_struct("StdoutSink")
40 .field("style", &self.style)
41 .field("severity_floor", &self.severity_floor)
42 .finish_non_exhaustive()
43 }
44}
45
46struct ErasedWriterMaker {
50 make: Box<dyn FnMut(Severity) -> ErasedWriter + Send>,
51}
52
53impl StdoutSink {
54 #[must_use]
57 pub fn new(style: FormatterStyle) -> Self {
58 Self::with_make_writer(style, StdoutWriter)
59 }
60
61 pub fn with_make_writer<M: MakeWriter>(style: FormatterStyle, mw: M) -> Self {
65 let mw = std::sync::Arc::new(mw);
66 let make = Box::new(move |sev: Severity| {
67 let m = std::sync::Arc::clone(&mw);
68 ErasedWriter::new(m.make_writer_for(sev))
69 });
70 Self {
71 style,
72 writer: Mutex::new(ErasedWriterMaker { make }),
73 severity_floor: Severity::Trace,
74 }
75 }
76
77 #[must_use]
79 pub fn severity_floor(mut self, sev: Severity) -> Self {
80 self.severity_floor = sev;
81 self
82 }
83
84 pub fn with_writer<W: Write + Send + 'static>(style: FormatterStyle, writer: W) -> Self {
87 struct OneShot<W>(parking_lot::Mutex<Option<W>>);
88 impl<W: Write + Send + 'static> MakeWriter for OneShot<W> {
89 type Writer = ErasedWriter;
90 fn make_writer(&self) -> ErasedWriter {
91 if let Some(w) = self.0.lock().take() {
92 ErasedWriter::new(w)
93 } else {
94 ErasedWriter::new(std::io::sink())
97 }
98 }
99 }
100 let shared = std::sync::Arc::new(parking_lot::Mutex::new(Some(writer)));
104 struct Shared<W>(std::sync::Arc<parking_lot::Mutex<Option<W>>>);
105 impl<W: Write + Send + 'static> MakeWriter for Shared<W> {
106 type Writer = ErasedWriter;
107 fn make_writer(&self) -> ErasedWriter {
108 let mut g = self.0.lock();
109 if let Some(w) = g.take() {
110 ErasedWriter::new(SharedWriter {
111 slot: Some(w),
112 back: std::sync::Arc::clone(&self.0),
113 })
114 } else {
115 ErasedWriter::new(std::io::sink())
116 }
117 }
118 }
119 struct SharedWriter<W: Write> {
120 slot: Option<W>,
121 back: std::sync::Arc<parking_lot::Mutex<Option<W>>>,
122 }
123 impl<W: Write> Write for SharedWriter<W> {
124 fn write(&mut self, b: &[u8]) -> std::io::Result<usize> {
125 match self.slot.as_mut() {
126 Some(w) => w.write(b),
127 None => Ok(b.len()),
128 }
129 }
130 fn flush(&mut self) -> std::io::Result<()> {
131 match self.slot.as_mut() {
132 Some(w) => w.flush(),
133 None => Ok(()),
134 }
135 }
136 }
137 impl<W: Write> Drop for SharedWriter<W> {
138 fn drop(&mut self) {
139 if let Some(w) = self.slot.take() {
140 *self.back.lock() = Some(w);
141 }
142 }
143 }
144 let _ = std::any::type_name::<OneShot<()>>();
147 Self::with_make_writer(style, Shared(shared))
148 }
149}
150
151impl Default for StdoutSink {
152 fn default() -> Self {
153 Self::new(FormatterStyle::default())
154 }
155}
156
157impl Sink for StdoutSink {
158 fn deliver(&self, env: ScrubbedEnvelope<'_>) {
159 let envelope = env.envelope();
160 let sev = native_sev(envelope);
161 if sev < self.severity_floor {
162 return;
163 }
164 let mut maker = self.writer.lock();
165 let mut w = (maker.make)(sev);
166 match self.style {
167 FormatterStyle::Compact => render_compact(&mut w, envelope),
168 FormatterStyle::Full => render_full(&mut w, envelope, env.payload().len()),
169 FormatterStyle::Pretty => render_pretty(&mut w, envelope, env.payload().len()),
170 FormatterStyle::Json => render_json(&mut w, envelope),
171 }
172 }
173}
174
175fn native_sev(env: &ObsEnvelope) -> Severity {
176 match env.sev {
177 ::buffa::EnumValue::Known(s) => s,
178 ::buffa::EnumValue::Unknown(_) => Severity::Unspecified,
179 }
180}
181
182fn render_compact<W: Write>(w: &mut W, env: &ObsEnvelope) {
183 let iso = iso8601_utc(env.ts_ns);
195 let lvl = sev_upper(env);
196
197 let scope_leaf = env
202 .full_name
203 .rsplit_once('.')
204 .map(|(_, leaf)| leaf)
205 .unwrap_or(env.full_name.as_str());
206
207 let fields = tracing_style_fields(env);
208 let scope = if fields.is_empty() {
209 String::new()
210 } else {
211 format!("{scope_leaf}{{{fields}}}: ")
212 };
213
214 let target = &env.full_name;
218
219 if !env.trace_id.is_empty() || !env.span_id.is_empty() {
225 let _ = writeln!(
226 w,
227 "{iso} {lvl:>5} {scope}{target}: trace_id={} span_id={}",
228 dash_or(&env.trace_id),
229 dash_or(&env.span_id),
230 );
231 } else {
232 let _ = writeln!(w, "{iso} {lvl:>5} {scope}{target}");
233 }
234 let _ = w.flush();
235}
236
237fn tracing_style_fields(env: &ObsEnvelope) -> String {
241 if env.labels.is_empty() {
242 return String::new();
243 }
244 let mut keys: Vec<_> = env.labels.keys().collect();
245 keys.sort();
246 let mut s = String::with_capacity(env.labels.len() * 16);
247 for (i, k) in keys.iter().enumerate() {
248 if i > 0 {
249 s.push(' ');
250 }
251 if let Some(v) = env.labels.get(*k) {
252 s.push_str(k);
253 s.push('=');
254 if needs_quoting(v) {
255 s.push('"');
256 for ch in v.chars() {
259 if ch == '"' || ch == '\\' {
260 s.push('\\');
261 }
262 s.push(ch);
263 }
264 s.push('"');
265 } else {
266 s.push_str(v);
267 }
268 }
269 }
270 s
271}
272
273fn needs_quoting(v: &str) -> bool {
274 v.is_empty()
275 || v.chars()
276 .any(|c| c.is_whitespace() || c == '=' || c == '"' || c == '{' || c == '}')
277}
278
279fn sev_upper(env: &ObsEnvelope) -> &'static str {
280 match env.sev {
281 ::buffa::EnumValue::Known(s) => match s {
282 Severity::Trace => "TRACE",
283 Severity::Debug => "DEBUG",
284 Severity::Info => "INFO",
285 Severity::Warn => "WARN",
286 Severity::Error => "ERROR",
287 Severity::Fatal => "FATAL",
288 _ => "?",
289 },
290 ::buffa::EnumValue::Unknown(_) => "?",
291 }
292}
293
294fn iso8601_utc(ts_ns: u64) -> String {
298 let secs = (ts_ns / 1_000_000_000) as i64;
302 let nanos = (ts_ns % 1_000_000_000) as u32;
303 let micros = nanos / 1_000;
304
305 let days = secs.div_euclid(86_400);
306 let sec_of_day = secs.rem_euclid(86_400);
307 let hour = (sec_of_day / 3600) as u32;
308 let minute = ((sec_of_day / 60) % 60) as u32;
309 let second = (sec_of_day % 60) as u32;
310
311 let (year, month, day) = civil_from_days(days);
312
313 format!("{year:04}-{month:02}-{day:02}T{hour:02}:{minute:02}:{second:02}.{micros:06}Z")
314}
315
316fn civil_from_days(z: i64) -> (i32, u32, u32) {
320 let z = z + 719_468;
321 let era = z.div_euclid(146_097);
322 let doe = z.rem_euclid(146_097) as u64; let yoe = (doe - doe / 1460 + doe / 36_524 - doe / 146_096) / 365;
324 let y = (yoe as i64) + era * 400;
325 let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
326 let mp = (5 * doy + 2) / 153;
327 let d = (doy - (153 * mp + 2) / 5 + 1) as u32;
328 let m = (if mp < 10 { mp + 3 } else { mp - 9 }) as u32;
329 let y = if m <= 2 { y + 1 } else { y };
330 (y as i32, m, d)
331}
332
333fn render_full<W: Write>(w: &mut W, env: &ObsEnvelope, payload_len: usize) {
334 let _ = writeln!(
335 w,
336 "[{ts:>10}.{ns:09} {sev:<5}] {tier:<6} {full_name}",
337 ts = env.ts_ns / 1_000_000_000,
338 ns = env.ts_ns % 1_000_000_000,
339 sev = sev_str(env),
340 tier = tier_str(env),
341 full_name = env.full_name,
342 );
343 let _ = writeln!(
344 w,
345 " service={} instance={} version={} reason={}",
346 dash_or(&env.service),
347 dash_or(&env.instance),
348 dash_or(&env.version),
349 sampling_reason_str(env),
350 );
351 if !env.trace_id.is_empty() || !env.span_id.is_empty() {
352 let _ = writeln!(
353 w,
354 " trace_id={} span_id={} parent={}",
355 dash_or(&env.trace_id),
356 dash_or(&env.span_id),
357 dash_or(&env.parent_span_id),
358 );
359 }
360 if !env.labels.is_empty() {
361 let mut keys: Vec<_> = env.labels.keys().collect();
362 keys.sort();
363 for k in keys {
364 if let Some(v) = env.labels.get(k) {
365 let _ = writeln!(w, " label.{k}={v}");
366 }
367 }
368 }
369 if payload_len > 0 {
370 let _ = writeln!(w, " payload_bytes={payload_len}");
371 }
372 let _ = w.flush();
373}
374
375fn render_pretty<W: Write>(w: &mut W, env: &ObsEnvelope, payload_len: usize) {
376 let _ = writeln!(
377 w,
378 "─── {full_name} @ {ts}.{ns:09} {sev} {tier} ───",
379 full_name = env.full_name,
380 ts = env.ts_ns / 1_000_000_000,
381 ns = env.ts_ns % 1_000_000_000,
382 sev = sev_str(env),
383 tier = tier_str(env),
384 );
385 let _ = writeln!(
386 w,
387 " service: {} ({}) instance: {}",
388 env.service, env.version, env.instance
389 );
390 if !env.trace_id.is_empty() {
391 let _ = writeln!(
392 w,
393 " trace: {}/{} parent={}",
394 env.trace_id, env.span_id, env.parent_span_id
395 );
396 }
397 if !env.labels.is_empty() {
398 let _ = writeln!(w, " labels:");
399 let mut keys: Vec<_> = env.labels.keys().collect();
400 keys.sort();
401 for k in keys {
402 if let Some(v) = env.labels.get(k) {
403 let _ = writeln!(w, " {k} = {v}");
404 }
405 }
406 }
407 if payload_len > 0 {
408 let _ = writeln!(w, " payload: {payload_len} bytes");
409 }
410 let _ = w.flush();
411}
412
413fn render_json<W: Write>(w: &mut W, env: &ObsEnvelope) {
414 use serde_json::{Map, Value};
415 let mut root = Map::new();
416 root.insert("ts_ns".into(), Value::from(env.ts_ns));
417 root.insert("sev".into(), Value::from(sev_str(env)));
418 root.insert("tier".into(), Value::from(tier_str(env)));
419 root.insert("full_name".into(), Value::from(env.full_name.clone()));
420 if !env.service.is_empty() {
421 root.insert("service".into(), Value::from(env.service.clone()));
422 }
423 if !env.instance.is_empty() {
424 root.insert("instance".into(), Value::from(env.instance.clone()));
425 }
426 if !env.version.is_empty() {
427 root.insert("version".into(), Value::from(env.version.clone()));
428 }
429 if !env.trace_id.is_empty() {
430 root.insert("trace_id".into(), Value::from(env.trace_id.clone()));
431 }
432 if !env.span_id.is_empty() {
433 root.insert("span_id".into(), Value::from(env.span_id.clone()));
434 }
435 if !env.parent_span_id.is_empty() {
436 root.insert(
437 "parent_span_id".into(),
438 Value::from(env.parent_span_id.clone()),
439 );
440 }
441 if env.schema_hash != 0 {
442 root.insert("schema_hash".into(), Value::from(env.schema_hash));
443 }
444 if env.callsite_id != 0 {
445 root.insert("callsite_id".into(), Value::from(env.callsite_id));
446 }
447 if !env.labels.is_empty() {
448 let mut labels = Map::new();
449 for (k, v) in env.labels.iter() {
450 labels.insert(k.clone(), Value::from(v.clone()));
451 }
452 root.insert("labels".into(), Value::Object(labels));
453 }
454 let value = Value::Object(root);
455 let _ = writeln!(w, "{value}");
456 let _ = w.flush();
457}
458
459fn dash_or(s: &str) -> &str {
460 if s.is_empty() { "-" } else { s }
461}
462
463#[allow(dead_code)]
464fn compact_labels(env: &ObsEnvelope) -> String {
465 if env.labels.is_empty() {
466 return "{}".to_string();
467 }
468 let mut keys: Vec<_> = env.labels.keys().collect();
469 keys.sort();
470 let mut s = String::with_capacity(env.labels.len() * 16);
471 s.push('{');
472 for (i, k) in keys.iter().enumerate() {
473 if i > 0 {
474 s.push_str(", ");
475 }
476 if let Some(v) = env.labels.get(*k) {
477 s.push_str(k);
478 s.push('=');
479 s.push_str(v);
480 }
481 }
482 s.push('}');
483 s
484}
485
486fn sev_str(env: &ObsEnvelope) -> &'static str {
487 match env.sev {
488 ::buffa::EnumValue::Known(s) => s.as_str(),
489 ::buffa::EnumValue::Unknown(_) => Severity::Unspecified.as_str(),
490 }
491}
492
493fn tier_str(env: &ObsEnvelope) -> &'static str {
494 match env.tier {
495 ::buffa::EnumValue::Known(t) => t.as_str(),
496 ::buffa::EnumValue::Unknown(_) => Tier::Unspecified.as_str(),
497 }
498}
499
500fn sampling_reason_str(env: &ObsEnvelope) -> &'static str {
501 match env.sampling_reason {
502 ::buffa::EnumValue::Known(r) => r.as_str(),
503 ::buffa::EnumValue::Unknown(_) => SamplingReason::Unspecified.as_str(),
504 }
505}
506
507#[cfg(test)]
508mod tests {
509 use obs_proto::obs::v1::Severity as PSev;
510
511 use super::*;
512
513 fn env(full_name: &str, sev: PSev, ts_ns: u64) -> ObsEnvelope {
514 ObsEnvelope {
515 full_name: full_name.to_string(),
516 sev: ::buffa::EnumValue::Known(sev),
517 ts_ns,
518 ..Default::default()
519 }
520 }
521
522 const REF_TS_NS: u64 = 1_778_167_860_000_000_000 + 123_456_000;
526
527 #[test]
528 fn test_iso8601_utc_matches_tracing_fmt_shape() {
529 let s = iso8601_utc(REF_TS_NS);
530 assert_eq!(s, "2026-05-07T15:31:00.123456Z");
531 }
532
533 #[test]
534 fn test_render_compact_mirrors_tracing_fmt_compact() {
535 let mut e = env("my_crate.process_order", PSev::SEVERITY_INFO, REF_TS_NS);
539 e.labels.insert("id".to_string(), "42".to_string());
540 e.labels.insert("item".to_string(), "Rust Book".to_string());
541 let mut buf: Vec<u8> = Vec::new();
542 render_compact(&mut buf, &e);
543 let line = String::from_utf8(buf).expect("utf-8");
544 assert_eq!(
545 line,
546 "2026-05-07T15:31:00.123456Z INFO process_order{id=42 item=\"Rust Book\"}: \
547 my_crate.process_order\n"
548 );
549 }
550
551 #[test]
552 fn test_render_compact_appends_trace_context_when_present() {
553 let mut e = env("x.y", PSev::SEVERITY_INFO, REF_TS_NS);
554 e.trace_id = "0123456789abcdef0123456789abcdef".to_string();
555 e.span_id = "0123456789abcdef".to_string();
556 let mut buf: Vec<u8> = Vec::new();
557 render_compact(&mut buf, &e);
558 let line = String::from_utf8(buf).expect("utf-8");
559 assert_eq!(
560 line,
561 "2026-05-07T15:31:00.123456Z INFO x.y: trace_id=0123456789abcdef0123456789abcdef \
562 span_id=0123456789abcdef\n"
563 );
564 }
565
566 #[test]
567 fn test_render_compact_drops_scope_block_when_no_labels() {
568 let e = env("x.y.Z", PSev::SEVERITY_INFO, REF_TS_NS);
570 let mut buf: Vec<u8> = Vec::new();
571 render_compact(&mut buf, &e);
572 let line = String::from_utf8(buf).expect("utf-8");
573 assert_eq!(line, "2026-05-07T15:31:00.123456Z INFO x.y.Z\n");
574 }
575
576 #[test]
577 fn test_render_compact_pads_severity_to_five() {
578 let e = env("x.y", PSev::SEVERITY_WARN, 0);
579 let mut buf: Vec<u8> = Vec::new();
580 render_compact(&mut buf, &e);
581 let line = String::from_utf8(buf).expect("utf-8");
582 assert!(line.contains(" WARN "), "line: {line}");
585 }
586
587 #[test]
588 fn test_tracing_style_fields_quotes_when_needed() {
589 let mut e = env("x.y", PSev::SEVERITY_INFO, 0);
590 e.labels.insert("a".to_string(), "simple".to_string());
591 e.labels.insert("b".to_string(), "with space".to_string());
592 e.labels
593 .insert("c".to_string(), "with \"quote\"".to_string());
594 let s = tracing_style_fields(&e);
595 assert!(s.contains("a=simple"));
596 assert!(s.contains("b=\"with space\""));
597 assert!(s.contains(r#"c="with \"quote\"""#));
598 }
599
600 #[test]
601 fn test_civil_from_days_round_trip_recent_dates() {
602 assert_eq!(civil_from_days(0), (1970, 1, 1));
604 assert_eq!(civil_from_days(20_580), (2026, 5, 7));
606 assert_eq!(civil_from_days(19_782), (2024, 2, 29));
608 }
609}