1#[cfg(feature = "async")]
95use tokio::sync::mpsc::{self, error::TrySendError};
96
97#[cfg(feature = "accumulator")]
98use std::sync::{Arc, RwLock};
99
100use {
101 std::fmt::{self, Display},
102 tracing::{
103 Event, Subscriber,
104 field::{Field, Visit},
105 span,
106 },
107 tracing_subscriber::{Layer, layer::Context, registry::LookupSpan},
108};
109
110#[cfg(feature = "span")]
111type JsonMap = serde_json::Map<String, serde_json::Value>;
112
113pub fn callback_layer<F>(callback: F) -> CallbackLayer
118where
119 F: Fn(&Event<'_>) + Send + Sync + 'static,
120{
121 CallbackLayer::new(callback)
122}
123
124#[cfg(feature = "span")]
129pub fn callback_layer_with_spans<F>(callback: F) -> CallbackLayerWithSpan
130where
131 F: Fn(&Event<'_>, Option<Vec<JsonMap>>) + Send + Sync + 'static,
132{
133 CallbackLayerWithSpan::new(callback)
134}
135
136#[cfg(feature = "async")]
141pub fn channel_layer(tx: mpsc::Sender<OwnedEvent>) -> CallbackLayer {
142 CallbackLayer::new(move |event: &Event<'_>| {
143 tx.try_send(event.into()).ok();
144 })
145}
146
147#[cfg(all(feature = "async", feature = "span"))]
152pub fn channel_layer_with_spans(tx: mpsc::Sender<OwnedEventWithSpans>) -> CallbackLayerWithSpan {
153 CallbackLayerWithSpan::new(move |event, spans| {
154 if let Err(e) = tx.try_send(OwnedEventWithSpans::new(event, spans)) {
155 match e {
156 TrySendError::Full(o) => {
157 eprintln!("dropping tracing event: {:?}", o);
158 }
159 TrySendError::Closed(o) => {
160 eprintln!("channel closed for tracing event: {:?}", o);
161 }
162 }
163 }
164 })
165}
166
167#[cfg(feature = "async")]
177pub fn async_layer<F, Fut>(buffer_size: usize, callback: F) -> CallbackLayer
178where
179 F: Fn(OwnedEvent) -> Fut + Send + Sync + 'static,
180 Fut: Future<Output = ()> + Send + Sync + 'static,
181{
182 let (tx, mut rx) = mpsc::channel(buffer_size);
183 tokio::spawn(async move {
184 while let Some(event) = rx.recv().await {
185 callback(event).await;
186 }
187 });
188 channel_layer(tx)
189}
190
191#[cfg(all(feature = "async", feature = "span"))]
201pub fn async_layer_with_spans<F, Fut>(buffer_size: usize, callback: F) -> CallbackLayerWithSpan
202where
203 F: Fn(OwnedEventWithSpans) -> Fut + Send + Sync + 'static,
204 Fut: Future<Output = ()> + Send + 'static,
205{
206 let (tx, mut rx) = mpsc::channel(buffer_size);
207 tokio::spawn(async move {
208 while let Some(owned_with_span) = rx.recv().await {
209 callback(owned_with_span).await;
210 }
211 });
212 channel_layer_with_spans(tx)
213}
214
215#[cfg(feature = "accumulator")]
216pub type AccumulatingLog = Arc<RwLock<Vec<OwnedEvent>>>;
217
218#[cfg(feature = "accumulator")]
224pub fn accumulating_layer(log: AccumulatingLog) -> CallbackLayer {
225 CallbackLayer::new(move |event: &Event<'_>| {
226 if let Ok(mut log) = log.write() {
227 log.push(event.into());
228 }
229 })
230}
231
232#[cfg(all(feature = "accumulator", feature = "span"))]
238pub fn accumulating_layer_with_spans(
239 log: Arc<RwLock<Vec<OwnedEventWithSpans>>>,
240) -> CallbackLayerWithSpan {
241 CallbackLayerWithSpan::new(move |event: &Event<'_>, spans| {
242 if let Ok(mut log) = log.write() {
243 log.push(OwnedEventWithSpans::new(event, spans));
244 }
245 })
246}
247
248pub struct CallbackLayer {
255 callback: Box<dyn Fn(&Event<'_>) + Send + Sync + 'static>,
256}
257
258impl CallbackLayer {
259 pub fn new<F: Fn(&Event<'_>) + Send + Sync + 'static>(callback: F) -> Self {
260 let callback = Box::new(callback);
261 Self { callback }
262 }
263}
264
265impl<S> Layer<S> for CallbackLayer
266where
267 S: Subscriber,
268 S: for<'lookup> LookupSpan<'lookup>,
269{
270 fn on_event(&self, event: &Event<'_>, _ctx: Context<'_, S>) {
271 (self.callback)(event);
272 }
273}
274
275#[cfg(feature = "span")]
282pub struct CallbackLayerWithSpan {
283 #[allow(clippy::type_complexity)]
284 callback: Box<dyn Fn(&Event<'_>, Option<Vec<JsonMap>>) + Send + Sync + 'static>,
285}
286
287#[cfg(feature = "span")]
288impl CallbackLayerWithSpan {
289 pub fn new<F: Fn(&Event<'_>, Option<Vec<JsonMap>>) + Send + Sync + 'static>(
290 callback: F,
291 ) -> Self {
292 let callback = Box::new(callback);
293 Self { callback }
294 }
295}
296
297#[cfg(feature = "span")]
298impl<S> Layer<S> for CallbackLayerWithSpan
299where
300 S: Subscriber,
301 S: for<'lookup> tracing_subscriber::registry::LookupSpan<'lookup>,
302{
303 fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &tracing::Id, ctx: Context<'_, S>) {
304 let mut visitor = FieldVisitor::default();
305 attrs.record(&mut visitor);
306 if let Some(span) = ctx.span(id) {
307 span.extensions_mut().insert(visitor);
308 }
309 }
310 fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
311 let spans: Option<Vec<_>> = ctx.event_scope(event).map(|scope| {
312 scope
313 .from_root()
314 .map(|span| {
315 let fields: Option<JsonMap> = span
316 .extensions()
317 .get::<FieldVisitor>()
318 .cloned()
319 .map(|FieldVisitor(json_map)| json_map);
320
321 let meta = span.metadata();
322 let mut map = JsonMap::default();
323 map.insert("level".into(), format!("{}", meta.level()).into());
324 map.insert("file".into(), meta.file().into());
325 map.insert("target".into(), meta.target().into());
326 map.insert("line".into(), meta.line().into());
327 map.insert("name".into(), span.name().into());
328 map.insert("fields".into(), fields.into());
329 map
330 })
331 .collect()
332 });
333 (self.callback)(event, spans);
334 }
335}
336
337#[derive(Debug, Clone, serde::Serialize)]
344pub struct OwnedEvent {
345 pub level: TracingLevel,
346 pub file: Option<String>,
347 pub target: String,
348 pub line: Option<u32>,
349 pub name: &'static str,
350 pub message: Option<String>,
351 pub fields: serde_json::Map<String, serde_json::Value>,
352}
353
354#[derive(Debug, Clone, serde::Serialize)]
361pub struct OwnedEventWithSpans {
362 pub event: OwnedEvent,
363 pub spans: Option<Vec<JsonMap>>,
364}
365
366impl OwnedEventWithSpans {
367 pub fn new(event: &Event<'_>, spans: Option<Vec<JsonMap>>) -> Self {
368 OwnedEventWithSpans {
369 event: event.into(),
370 spans,
371 }
372 }
373}
374
375#[derive(Copy, Clone, Debug, Hash, Eq, PartialEq, serde::Serialize)]
379pub enum TracingLevel {
380 Trace = 0,
381 Debug = 1,
382 Info = 2,
383 Warn = 3,
384 Error = 4,
385}
386
387impl From<&tracing::Level> for TracingLevel {
388 fn from(value: &tracing::Level) -> Self {
389 match *value {
390 tracing::Level::TRACE => TracingLevel::Trace,
391 tracing::Level::DEBUG => TracingLevel::Debug,
392 tracing::Level::INFO => TracingLevel::Info,
393 tracing::Level::WARN => TracingLevel::Warn,
394 tracing::Level::ERROR => TracingLevel::Error,
395 }
396 }
397}
398
399impl AsRef<str> for TracingLevel {
400 fn as_ref(&self) -> &str {
401 use TracingLevel::*;
402 match self {
403 Trace => "TRACE",
404 Debug => "DEBUG",
405 Info => "INFO",
406 Warn => "WARN",
407 Error => "ERROR",
408 }
409 }
410}
411
412impl Display for TracingLevel {
413 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
414 write!(f, "{}", self.as_ref())
415 }
416}
417
418impl From<&Event<'_>> for OwnedEvent {
425 fn from(event: &Event<'_>) -> Self {
426 let mut visitor = FieldVisitor::default();
427 event.record(&mut visitor);
428
429 let message = visitor.0.remove("message").and_then(|v| {
432 if let serde_json::Value::String(s) = v {
433 Some(s)
434 } else {
435 None
436 }
437 });
438
439 let meta = event.metadata();
440 Self {
441 name: meta.name(),
442 target: meta.target().into(),
443 level: meta.level().into(),
444 file: meta.file().map(String::from),
445 line: meta.line(),
446 message,
447 fields: visitor.0,
448 }
449 }
450}
451
452#[cfg(feature = "span")]
458#[derive(Default, Clone)]
459struct FieldVisitor(serde_json::Map<String, serde_json::Value>);
460
461impl Visit for FieldVisitor {
462 fn record_bool(&mut self, field: &Field, value: bool) {
463 self.0.insert(field.name().into(), value.into());
464 }
465
466 fn record_f64(&mut self, field: &Field, value: f64) {
467 self.0.insert(field.name().into(), value.into());
468 }
469 fn record_i64(&mut self, field: &Field, value: i64) {
470 self.0.insert(field.name().into(), value.into());
471 }
472 fn record_u64(&mut self, field: &Field, value: u64) {
473 self.0.insert(field.name().into(), value.into());
474 }
475 fn record_str(&mut self, field: &Field, value: &str) {
476 self.0.insert(field.name().into(), value.into());
477 }
478 fn record_debug(&mut self, field: &Field, value: &dyn fmt::Debug) {
479 let text = format!("{:?}", value);
480 self.0.insert(field.name().into(), text.into());
481 }
482}
483
484#[cfg(test)]
491mod tests {
492
493 #[cfg(feature = "async")]
494 use {
495 std::sync::{Arc, RwLock},
496 tokio::sync::mpsc,
497 };
498
499 use {
500 super::*,
501 insta::{
502 internals::{Content, ContentPath},
503 *,
504 },
505 tracing_subscriber::{EnvFilter, prelude::*},
506 };
507
508 fn run_trace_events() {
513 let span = tracing::info_span!("root-info", recurse = 0);
514 span.in_scope(|| {
515 tracing::trace!(foo = 1, bar = 2, "this is a trace message");
516 tracing::debug!(pi = 3.14159265, "this is a debug message");
517 tracing::info!(job = "foo", "this is an info message");
518 tracing::warn!(job = "foo", "this is a warning message");
519 tracing::error!(job = "foo", "this is an error message");
520 });
521 }
522
523 fn extract_events<T: Clone>(logs: &Arc<RwLock<Vec<T>>>) -> Vec<T> {
524 let events = logs.read().expect("could not read events");
525 events.clone()
526 }
527
528 fn redact_name(value: Content, _path: ContentPath) -> String {
529 let s = value.as_str().unwrap_or_default();
530 if s.contains(":") {
531 s.split_once(":")
532 .map(|p| format!("{}:<line>", p.0))
533 .unwrap_or_default()
534 } else {
535 s.to_string()
536 }
537 }
538
539 #[test]
544 fn test_callback_layer() {
545 let events = Arc::new(RwLock::new(Vec::<OwnedEvent>::new()));
548 let cb_events = events.clone();
549
550 let _guard = tracing_subscriber::registry()
551 .with(EnvFilter::new("tracing_async2=trace"))
552 .with(callback_layer(move |event| {
553 if let Ok(mut events) = cb_events.write() {
554 events.push(event.into());
555 }
556 }))
557 .set_default();
558
559 run_trace_events();
560
561 assert_json_snapshot!("callback-layer", extract_events(&events), {
562 "[].line" => "<line>",
563 "[].name" => dynamic_redaction(redact_name),
564 });
565 }
566
567 #[cfg(feature = "span")]
568 #[test]
569 fn test_callback_layer_with_spans() {
570 let events = Arc::new(RwLock::new(Vec::<OwnedEventWithSpans>::new()));
573 let cb_events = events.clone();
574
575 let _guard = tracing_subscriber::registry()
576 .with(EnvFilter::new("tracing_async2=trace"))
577 .with(callback_layer_with_spans(move |event, spans| {
578 if let Ok(mut events) = cb_events.write() {
579 events.push(OwnedEventWithSpans::new(event, spans));
580 }
581 }))
582 .set_default();
583
584 run_trace_events();
585
586 assert_json_snapshot!("callback-layer-with-spans", extract_events(&events), {
587 "[].event.line" => "<line>",
588 "[].event.name" => dynamic_redaction(redact_name),
589 "[].spans[].line" => "<line>",
590 "[].spans[].name" => dynamic_redaction(redact_name),
591 });
592 }
593
594 #[cfg(feature = "async")]
595 #[tokio::test]
596 async fn test_channel_layer() {
597 use std::time::Duration;
599 use tokio::time::sleep;
600
601 let events = Arc::new(RwLock::new(Vec::<OwnedEvent>::new()));
604 let cb_events = events.clone();
605
606 let (tx, mut rx) = mpsc::channel(100);
607
608 let _guard = tracing_subscriber::registry()
609 .with(EnvFilter::new("tracing_async2=trace"))
610 .with(channel_layer(tx))
611 .set_default();
612
613 let handle = tokio::spawn(async move {
614 while let Some(event) = rx.recv().await {
615 if let Ok(mut events) = cb_events.write() {
616 events.push(event);
617 }
618 }
619 });
620
621 run_trace_events();
622 sleep(Duration::from_millis(100)).await;
623 handle.abort();
624
625 assert_json_snapshot!("channel-layer", extract_events(&events), {
626 "[].line" => "<line>",
627 "[].name" => dynamic_redaction(redact_name),
628 });
629 }
630
631 #[cfg(all(feature = "async", feature = "span"))]
632 #[tokio::test]
633 async fn test_channel_layer_with_spans() {
634 use std::time::Duration;
636 use tokio::time::sleep;
637
638 let events = Arc::new(RwLock::new(Vec::<OwnedEventWithSpans>::new()));
641 let cb_events = events.clone();
642
643 let (tx, mut rx) = mpsc::channel(100);
644
645 let _guard = tracing_subscriber::registry()
646 .with(EnvFilter::new("tracing_async2=trace"))
647 .with(channel_layer_with_spans(tx))
648 .set_default();
649
650 let handle = tokio::spawn(async move {
651 while let Some(event) = rx.recv().await {
652 if let Ok(mut events) = cb_events.write() {
653 events.push(event);
654 }
655 }
656 });
657
658 run_trace_events();
659 sleep(Duration::from_millis(100)).await;
660 handle.abort();
661
662 assert_json_snapshot!("channel-layer-with-spans", extract_events(&events), {
663 "[].event.line" => "<line>",
664 "[].event.name" => dynamic_redaction(redact_name),
665 "[].spans[].line" => "<line>",
666 "[].spans[].name" => dynamic_redaction(redact_name),
667 });
668 }
669
670 #[cfg(feature = "async")]
671 #[tokio::test]
672 async fn test_async_layer() {
673 use std::time::Duration;
675 use tokio::time::sleep;
676
677 let events = Arc::new(RwLock::new(Vec::<OwnedEvent>::new()));
680 let cb_events = events.clone();
681
682 let _guard = tracing_subscriber::registry()
683 .with(EnvFilter::new("tracing_async2=trace"))
684 .with(async_layer(16, move |event| {
685 let f_events = cb_events.clone();
686 async move {
687 if let Ok(mut events) = f_events.write() {
688 events.push(event);
689 }
690 }
691 }))
692 .set_default();
693
694 run_trace_events();
695 sleep(Duration::from_millis(100)).await;
696
697 assert_json_snapshot!("async-layer", extract_events(&events), {
698 "[].line" => "<line>",
699 "[].name" => dynamic_redaction(redact_name),
700 });
701 }
702
703 #[cfg(all(feature = "async", feature = "span"))]
704 #[tokio::test]
705 async fn test_async_layer_with_spans() {
706 use std::time::Duration;
708 use tokio::time::sleep;
709
710 let events = Arc::new(RwLock::new(Vec::<OwnedEventWithSpans>::new()));
713 let cb_events = events.clone();
714
715 let _guard = tracing_subscriber::registry()
716 .with(EnvFilter::new("tracing_async2=trace"))
717 .with(async_layer_with_spans(16, move |event_with_span| {
718 let f_events = cb_events.clone();
719 async move {
720 if let Ok(mut events) = f_events.write() {
721 events.push(event_with_span);
722 }
723 }
724 }))
725 .set_default();
726
727 run_trace_events();
728 sleep(Duration::from_millis(100)).await;
729
730 assert_json_snapshot!("async-layer-with-spans", extract_events(&events), {
731 "[].event.line" => "<line>",
732 "[].event.name" => dynamic_redaction(redact_name),
733 "[].spans[].line" => "<line>",
734 "[].spans[].name" => dynamic_redaction(redact_name),
735 });
736 }
737
738 #[cfg(feature = "accumulator")]
739 #[test]
740 fn test_accumulating_layer() {
741 let events = Arc::new(RwLock::new(Vec::<OwnedEvent>::new()));
744
745 let _guard = tracing_subscriber::registry()
746 .with(EnvFilter::new("tracing_async2=trace"))
747 .with(accumulating_layer(events.clone()))
748 .set_default();
749
750 run_trace_events();
751
752 assert_json_snapshot!("accumulating-layer", extract_events(&events), {
753 "[].line" => "<line>",
754 "[].name" => dynamic_redaction(redact_name),
755 });
756 }
757
758 #[cfg(all(feature = "span", feature = "accumulator"))]
759 #[test]
760 fn test_accumulating_layer_with_spans() {
761 let events = Arc::new(RwLock::new(Vec::<OwnedEventWithSpans>::new()));
764
765 let _guard = tracing_subscriber::registry()
766 .with(EnvFilter::new("tracing_async2=trace"))
767 .with(accumulating_layer_with_spans(events.clone()))
768 .set_default();
769
770 run_trace_events();
771
772 assert_json_snapshot!("accumulating-layer-with-spans", extract_events(&events), {
773 "[].event.line" => "<line>",
774 "[].event.name" => dynamic_redaction(redact_name),
775 "[].spans[].line" => "<line>",
776 "[].spans[].name" => dynamic_redaction(redact_name),
777 });
778 }
779}