1#[cfg(feature = "async")]
95use tokio::sync::mpsc::{self, error::TrySendError};
96
97#[cfg(feature = "accumulator")]
98use std::sync::{Arc, RwLock};
99
100use {
101 std::fmt,
102 tracing::{
103 Event, Subscriber,
104 field::{Field, Visit},
105 span,
106 },
107 tracing_subscriber::{Layer, layer::Context, registry::LookupSpan},
108};
109
110#[cfg(feature = "span")]
111type JsonMap = serde_json::Map<String, serde_json::Value>;
112
113pub fn callback_layer<F>(callback: F) -> CallbackLayer
118where
119 F: Fn(&Event<'_>) + Send + Sync + 'static,
120{
121 CallbackLayer::new(callback)
122}
123
124#[cfg(feature = "span")]
129pub fn callback_layer_with_spans<F>(callback: F) -> CallbackLayerWithSpan
130where
131 F: Fn(&Event<'_>, Option<Vec<JsonMap>>) + Send + Sync + 'static,
132{
133 CallbackLayerWithSpan::new(callback)
134}
135
136#[cfg(feature = "async")]
141pub fn channel_layer(tx: mpsc::Sender<OwnedEvent>) -> CallbackLayer {
142 CallbackLayer::new(move |event: &Event<'_>| {
143 tx.try_send(event.into()).ok();
144 })
145}
146
147#[cfg(all(feature = "async", feature = "span"))]
152pub fn channel_layer_with_spans(tx: mpsc::Sender<OwnedEventWithSpans>) -> CallbackLayerWithSpan {
153 CallbackLayerWithSpan::new(move |event, spans| {
154 if let Err(e) = tx.try_send(OwnedEventWithSpans::new(event, spans)) {
155 match e {
156 TrySendError::Full(o) => {
157 eprintln!("dropping tracing event: {:?}", o);
158 }
159 TrySendError::Closed(o) => {
160 eprintln!("channel closed for tracing event: {:?}", o);
161 }
162 }
163 }
164 })
165}
166
167#[cfg(feature = "async")]
177pub fn async_layer<F, Fut>(buffer_size: usize, callback: F) -> CallbackLayer
178where
179 F: Fn(OwnedEvent) -> Fut + Send + Sync + 'static,
180 Fut: Future<Output = ()> + Send + Sync + 'static,
181{
182 let (tx, mut rx) = mpsc::channel(buffer_size);
183 tokio::spawn(async move {
184 while let Some(event) = rx.recv().await {
185 callback(event).await;
186 }
187 });
188 channel_layer(tx)
189}
190
191#[cfg(all(feature = "async", feature = "span"))]
201pub fn async_layer_with_spans<F, Fut>(buffer_size: usize, callback: F) -> CallbackLayerWithSpan
202where
203 F: Fn(OwnedEventWithSpans) -> Fut + Send + Sync + 'static,
204 Fut: Future<Output = ()> + Send + 'static,
205{
206 let (tx, mut rx) = mpsc::channel(buffer_size);
207 tokio::spawn(async move {
208 while let Some(owned_with_span) = rx.recv().await {
209 callback(owned_with_span).await;
210 }
211 });
212 channel_layer_with_spans(tx)
213}
214
215#[cfg(feature = "accumulator")]
216pub type AccumulatingLog = Arc<RwLock<Vec<OwnedEvent>>>;
217
218#[cfg(feature = "accumulator")]
224pub fn accumulating_layer(log: AccumulatingLog) -> CallbackLayer {
225 CallbackLayer::new(move |event: &Event<'_>| {
226 if let Ok(mut log) = log.write() {
227 log.push(event.into());
228 }
229 })
230}
231
232#[cfg(all(feature = "accumulator", feature = "span"))]
238pub fn accumulating_layer_with_spans(
239 log: Arc<RwLock<Vec<OwnedEventWithSpans>>>,
240) -> CallbackLayerWithSpan {
241 CallbackLayerWithSpan::new(move |event: &Event<'_>, spans| {
242 if let Ok(mut log) = log.write() {
243 log.push(OwnedEventWithSpans::new(event, spans));
244 }
245 })
246}
247
248pub struct CallbackLayer {
255 callback: Box<dyn Fn(&Event<'_>) + Send + Sync + 'static>,
256}
257
258impl CallbackLayer {
259 pub fn new<F: Fn(&Event<'_>) + Send + Sync + 'static>(callback: F) -> Self {
260 let callback = Box::new(callback);
261 Self { callback }
262 }
263}
264
265impl<S> Layer<S> for CallbackLayer
266where
267 S: Subscriber,
268 S: for<'lookup> LookupSpan<'lookup>,
269{
270 fn on_event(&self, event: &Event<'_>, _ctx: Context<'_, S>) {
271 (self.callback)(event);
272 }
273}
274
275#[cfg(feature = "span")]
282pub struct CallbackLayerWithSpan {
283 #[allow(clippy::type_complexity)]
284 callback: Box<dyn Fn(&Event<'_>, Option<Vec<JsonMap>>) + Send + Sync + 'static>,
285}
286
287#[cfg(feature = "span")]
288impl CallbackLayerWithSpan {
289 pub fn new<F: Fn(&Event<'_>, Option<Vec<JsonMap>>) + Send + Sync + 'static>(
290 callback: F,
291 ) -> Self {
292 let callback = Box::new(callback);
293 Self { callback }
294 }
295}
296
297#[cfg(feature = "span")]
298impl<S> Layer<S> for CallbackLayerWithSpan
299where
300 S: Subscriber,
301 S: for<'lookup> tracing_subscriber::registry::LookupSpan<'lookup>,
302{
303 fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &tracing::Id, ctx: Context<'_, S>) {
304 let mut visitor = FieldVisitor::default();
305 attrs.record(&mut visitor);
306 if let Some(span) = ctx.span(id) {
307 span.extensions_mut().insert(visitor);
308 }
309 }
310 fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
311 let spans: Option<Vec<_>> = ctx.event_scope(event).map(|scope| {
312 scope
313 .from_root()
314 .map(|span| {
315 let fields: Option<JsonMap> = span
316 .extensions()
317 .get::<FieldVisitor>()
318 .cloned()
319 .map(|FieldVisitor(json_map)| json_map);
320
321 let meta = span.metadata();
322 let mut map = JsonMap::default();
323 map.insert("level".into(), format!("{}", meta.level()).into());
324 map.insert("file".into(), meta.file().into());
325 map.insert("target".into(), meta.target().into());
326 map.insert("line".into(), meta.line().into());
327 map.insert("name".into(), span.name().into());
328 map.insert("fields".into(), fields.into());
329 map
330 })
331 .collect()
332 });
333 (self.callback)(event, spans);
334 }
335}
336
337#[derive(Debug, Clone, serde::Serialize)]
344pub struct OwnedEvent {
345 pub level: TracingLevel,
346 pub file: Option<String>,
347 pub target: String,
348 pub line: Option<u32>,
349 pub name: &'static str,
350 pub message: Option<String>,
351 pub fields: serde_json::Map<String, serde_json::Value>,
352}
353
354#[derive(Debug, Clone, serde::Serialize)]
361pub struct OwnedEventWithSpans {
362 pub event: OwnedEvent,
363 pub spans: Option<Vec<JsonMap>>,
364}
365
366impl OwnedEventWithSpans {
367 pub fn new(event: &Event<'_>, spans: Option<Vec<JsonMap>>) -> Self {
368 OwnedEventWithSpans {
369 event: event.into(),
370 spans,
371 }
372 }
373}
374
375#[derive(Copy, Clone, Debug, Hash, Eq, PartialEq, serde::Serialize)]
379pub enum TracingLevel {
380 Trace = 0,
381 Debug = 1,
382 Info = 2,
383 Warn = 3,
384 Error = 4,
385}
386
387impl From<&tracing::Level> for TracingLevel {
388 fn from(value: &tracing::Level) -> Self {
389 match *value {
390 tracing::Level::TRACE => TracingLevel::Trace,
391 tracing::Level::DEBUG => TracingLevel::Debug,
392 tracing::Level::INFO => TracingLevel::Info,
393 tracing::Level::WARN => TracingLevel::Warn,
394 tracing::Level::ERROR => TracingLevel::Error,
395 }
396 }
397}
398
399impl From<&Event<'_>> for OwnedEvent {
406 fn from(event: &Event<'_>) -> Self {
407 let mut visitor = FieldVisitor::default();
408 event.record(&mut visitor);
409
410 let message = visitor.0.remove("message").and_then(|v| {
413 if let serde_json::Value::String(s) = v {
414 Some(s)
415 } else {
416 None
417 }
418 });
419
420 let meta = event.metadata();
421 Self {
422 name: meta.name(),
423 target: meta.target().into(),
424 level: meta.level().into(),
425 file: meta.file().map(String::from),
426 line: meta.line(),
427 message,
428 fields: visitor.0,
429 }
430 }
431}
432
433#[cfg(feature = "span")]
439#[derive(Default, Clone)]
440struct FieldVisitor(serde_json::Map<String, serde_json::Value>);
441
442impl Visit for FieldVisitor {
443 fn record_bool(&mut self, field: &Field, value: bool) {
444 self.0.insert(field.name().into(), value.into());
445 }
446
447 fn record_f64(&mut self, field: &Field, value: f64) {
448 self.0.insert(field.name().into(), value.into());
449 }
450 fn record_i64(&mut self, field: &Field, value: i64) {
451 self.0.insert(field.name().into(), value.into());
452 }
453 fn record_u64(&mut self, field: &Field, value: u64) {
454 self.0.insert(field.name().into(), value.into());
455 }
456 fn record_str(&mut self, field: &Field, value: &str) {
457 self.0.insert(field.name().into(), value.into());
458 }
459 fn record_debug(&mut self, field: &Field, value: &dyn fmt::Debug) {
460 let text = format!("{:?}", value);
461 self.0.insert(field.name().into(), text.into());
462 }
463}
464
465#[cfg(test)]
472mod tests {
473
474 use {
475 super::*,
476 insta::{
477 internals::{Content, ContentPath},
478 *,
479 },
480 std::sync::{Arc, RwLock},
481 tokio::sync::mpsc,
482 tracing_subscriber::{EnvFilter, prelude::*},
483 };
484
485 fn run_trace_events() {
490 let span = tracing::info_span!("root-info", recurse = 0);
491 span.in_scope(|| {
492 tracing::trace!(foo = 1, bar = 2, "this is a trace message");
493 tracing::debug!(pi = 3.14159265, "this is a debug message");
494 tracing::info!(job = "foo", "this is an info message");
495 tracing::warn!(job = "foo", "this is a warning message");
496 tracing::error!(job = "foo", "this is an error message");
497 });
498 }
499
500 fn extract_events<T: Clone>(logs: &Arc<RwLock<Vec<T>>>) -> Vec<T> {
501 let events = logs.read().expect("could not read events");
502 events.clone()
503 }
504
505 fn redact_name(value: Content, _path: ContentPath) -> String {
506 let s = value.as_str().unwrap_or_default();
507 if s.contains(":") {
508 s.split_once(":")
509 .map(|p| format!("{}:<line>", p.0))
510 .unwrap_or_default()
511 } else {
512 s.to_string()
513 }
514 }
515
516 #[tokio::test]
521 async fn test_callback_layer() {
522 let events = Arc::new(RwLock::new(Vec::<OwnedEvent>::new()));
525 let cb_events = events.clone();
526
527 let _guard = tracing_subscriber::registry()
528 .with(EnvFilter::new("tracing_async2=trace"))
529 .with(callback_layer(move |event| {
530 if let Ok(mut events) = cb_events.write() {
531 events.push(event.into());
532 }
533 }))
534 .set_default();
535
536 run_trace_events();
537
538 assert_json_snapshot!("callback-layer", extract_events(&events), {
539 "[].line" => "<line>",
540 "[].name" => dynamic_redaction(redact_name),
541 });
542 }
543
544 #[cfg(feature = "span")]
545 #[tokio::test]
546 async fn test_callback_layer_with_spans() {
547 let events = Arc::new(RwLock::new(Vec::<OwnedEventWithSpans>::new()));
550 let cb_events = events.clone();
551
552 let _guard = tracing_subscriber::registry()
553 .with(EnvFilter::new("tracing_async2=trace"))
554 .with(callback_layer_with_spans(move |event, spans| {
555 if let Ok(mut events) = cb_events.write() {
556 events.push(OwnedEventWithSpans::new(event, spans));
557 }
558 }))
559 .set_default();
560
561 run_trace_events();
562
563 assert_json_snapshot!("callback-layer-with-spans", extract_events(&events), {
564 "[].event.line" => "<line>",
565 "[].event.name" => dynamic_redaction(redact_name),
566 "[].spans[].line" => "<line>",
567 "[].spans[].name" => dynamic_redaction(redact_name),
568 });
569 }
570
571 #[tokio::test]
572 async fn test_channel_layer() {
573 use std::time::Duration;
575 use tokio::time::sleep;
576
577 let events = Arc::new(RwLock::new(Vec::<OwnedEvent>::new()));
580 let cb_events = events.clone();
581
582 let (tx, mut rx) = mpsc::channel(100);
583
584 let _guard = tracing_subscriber::registry()
585 .with(EnvFilter::new("tracing_async2=trace"))
586 .with(channel_layer(tx))
587 .set_default();
588
589 let handle = tokio::spawn(async move {
590 while let Some(event) = rx.recv().await {
591 if let Ok(mut events) = cb_events.write() {
592 events.push(event);
593 }
594 }
595 });
596
597 run_trace_events();
598 sleep(Duration::from_millis(100)).await;
599 handle.abort();
600
601 assert_json_snapshot!("channel-layer", extract_events(&events), {
602 "[].line" => "<line>",
603 "[].name" => dynamic_redaction(redact_name),
604 });
605 }
606
607 #[cfg(feature = "span")]
608 #[tokio::test]
609 async fn test_channel_layer_with_spans() {
610 use std::time::Duration;
612 use tokio::time::sleep;
613
614 let events = Arc::new(RwLock::new(Vec::<OwnedEventWithSpans>::new()));
617 let cb_events = events.clone();
618
619 let (tx, mut rx) = mpsc::channel(100);
620
621 let _guard = tracing_subscriber::registry()
622 .with(EnvFilter::new("tracing_async2=trace"))
623 .with(channel_layer_with_spans(tx))
624 .set_default();
625
626 let handle = tokio::spawn(async move {
627 while let Some(event) = rx.recv().await {
628 if let Ok(mut events) = cb_events.write() {
629 events.push(event);
630 }
631 }
632 });
633
634 run_trace_events();
635 sleep(Duration::from_millis(100)).await;
636 handle.abort();
637
638 assert_json_snapshot!("channel-layer-with-spans", extract_events(&events), {
639 "[].event.line" => "<line>",
640 "[].event.name" => dynamic_redaction(redact_name),
641 "[].spans[].line" => "<line>",
642 "[].spans[].name" => dynamic_redaction(redact_name),
643 });
644 }
645
646 #[tokio::test]
647 async fn test_async_layer() {
648 use std::time::Duration;
650 use tokio::time::sleep;
651
652 let events = Arc::new(RwLock::new(Vec::<OwnedEvent>::new()));
655 let cb_events = events.clone();
656
657 let _guard = tracing_subscriber::registry()
658 .with(EnvFilter::new("tracing_async2=trace"))
659 .with(async_layer(16, move |event| {
660 let f_events = cb_events.clone();
661 async move {
662 if let Ok(mut events) = f_events.write() {
663 events.push(event);
664 }
665 }
666 }))
667 .set_default();
668
669 run_trace_events();
670 sleep(Duration::from_millis(100)).await;
671
672 assert_json_snapshot!("async-layer", extract_events(&events), {
673 "[].line" => "<line>",
674 "[].name" => dynamic_redaction(redact_name),
675 });
676 }
677
678 #[cfg(feature = "span")]
679 #[tokio::test]
680 async fn test_async_layer_with_spans() {
681 use std::time::Duration;
683 use tokio::time::sleep;
684
685 let events = Arc::new(RwLock::new(Vec::<OwnedEventWithSpans>::new()));
688 let cb_events = events.clone();
689
690 let _guard = tracing_subscriber::registry()
691 .with(EnvFilter::new("tracing_async2=trace"))
692 .with(async_layer_with_spans(16, move |event_with_span| {
693 let f_events = cb_events.clone();
694 async move {
695 if let Ok(mut events) = f_events.write() {
696 events.push(event_with_span);
697 }
698 }
699 }))
700 .set_default();
701
702 run_trace_events();
703 sleep(Duration::from_millis(100)).await;
704
705 assert_json_snapshot!("async-layer-with-spans", extract_events(&events), {
706 "[].event.line" => "<line>",
707 "[].event.name" => dynamic_redaction(redact_name),
708 "[].spans[].line" => "<line>",
709 "[].spans[].name" => dynamic_redaction(redact_name),
710 });
711 }
712
713 #[tokio::test]
714 async fn test_accumulating_layer() {
715 use std::time::Duration;
717 use tokio::time::sleep;
718
719 let events = Arc::new(RwLock::new(Vec::<OwnedEvent>::new()));
722
723 let _guard = tracing_subscriber::registry()
724 .with(EnvFilter::new("tracing_async2=trace"))
725 .with(accumulating_layer(events.clone()))
726 .set_default();
727
728 run_trace_events();
729 sleep(Duration::from_millis(100)).await;
730
731 assert_json_snapshot!("accumulating-layer", extract_events(&events), {
732 "[].line" => "<line>",
733 "[].name" => dynamic_redaction(redact_name),
734 });
735 }
736
737 #[cfg(feature = "span")]
738 #[tokio::test]
739 async fn test_accumulating_layer_with_spans() {
740 use std::time::Duration;
742 use tokio::time::sleep;
743
744 let events = Arc::new(RwLock::new(Vec::<OwnedEventWithSpans>::new()));
747
748 let _guard = tracing_subscriber::registry()
749 .with(EnvFilter::new("tracing_async2=trace"))
750 .with(accumulating_layer_with_spans(events.clone()))
751 .set_default();
752
753 run_trace_events();
754 sleep(Duration::from_millis(100)).await;
755
756 assert_json_snapshot!("accumulating-layer-with-spans", extract_events(&events), {
757 "[].event.line" => "<line>",
758 "[].event.name" => dynamic_redaction(redact_name),
759 "[].spans[].line" => "<line>",
760 "[].spans[].name" => dynamic_redaction(redact_name),
761 });
762 }
763}