1#[cfg(feature = "async")]
142use tokio::sync::mpsc::{self, error::TrySendError};
143
144#[cfg(feature = "accumulator")]
145use std::sync::{Arc, RwLock};
146
147use {
148 std::fmt::{self, Display},
149 tracing::{
150 Event, Subscriber,
151 field::{Field, Visit},
152 span,
153 },
154 tracing_subscriber::{Layer, layer::Context, registry::LookupSpan},
155};
156
157#[cfg(feature = "macros")]
158pub use tracing_async2_macros::*;
159
160#[cfg(feature = "span")]
161type JsonMap = serde_json::Map<String, serde_json::Value>;
162
163pub fn callback_layer<F>(callback: F) -> CallbackLayer
168where
169 F: Fn(&Event<'_>) + Send + Sync + 'static,
170{
171 CallbackLayer::new(callback)
172}
173
174#[cfg(feature = "span")]
179pub fn callback_layer_with_spans<F>(callback: F) -> CallbackLayerWithSpan
180where
181 F: Fn(&Event<'_>, Option<Vec<JsonMap>>) + Send + Sync + 'static,
182{
183 CallbackLayerWithSpan::new(callback)
184}
185
186#[cfg(feature = "async")]
191pub fn channel_layer(tx: mpsc::Sender<OwnedEvent>) -> CallbackLayer {
192 CallbackLayer::new(move |event: &Event<'_>| {
193 tx.try_send(event.into()).ok();
194 })
195}
196
197#[cfg(all(feature = "async", feature = "span"))]
202pub fn channel_layer_with_spans(tx: mpsc::Sender<OwnedEventWithSpans>) -> CallbackLayerWithSpan {
203 CallbackLayerWithSpan::new(move |event, spans| {
204 if let Err(e) = tx.try_send(OwnedEventWithSpans::new(event, spans)) {
205 match e {
206 TrySendError::Full(o) => {
207 eprintln!("dropping tracing event: {:?}", o);
208 }
209 TrySendError::Closed(o) => {
210 eprintln!("channel closed for tracing event: {:?}", o);
211 }
212 }
213 }
214 })
215}
216
217#[cfg(feature = "async")]
227pub fn async_layer<F, Fut>(buffer_size: usize, callback: F) -> CallbackLayer
228where
229 F: Fn(OwnedEvent) -> Fut + Send + Sync + 'static,
230 Fut: Future<Output = ()> + Send + Sync + 'static,
231{
232 let (tx, mut rx) = mpsc::channel(buffer_size);
233 tokio::spawn(async move {
234 while let Some(event) = rx.recv().await {
235 callback(event).await;
236 }
237 });
238 channel_layer(tx)
239}
240
241#[cfg(all(feature = "async", feature = "span"))]
251pub fn async_layer_with_spans<F, Fut>(buffer_size: usize, callback: F) -> CallbackLayerWithSpan
252where
253 F: Fn(OwnedEventWithSpans) -> Fut + Send + Sync + 'static,
254 Fut: Future<Output = ()> + Send + 'static,
255{
256 let (tx, mut rx) = mpsc::channel(buffer_size);
257 tokio::spawn(async move {
258 while let Some(owned_with_span) = rx.recv().await {
259 callback(owned_with_span).await;
260 }
261 });
262 channel_layer_with_spans(tx)
263}
264
265#[cfg(feature = "accumulator")]
266pub type AccumulatingLog = Arc<RwLock<Vec<OwnedEvent>>>;
267
268#[cfg(feature = "accumulator")]
274pub fn accumulating_layer(log: AccumulatingLog) -> CallbackLayer {
275 CallbackLayer::new(move |event: &Event<'_>| {
276 if let Ok(mut log) = log.write() {
277 log.push(event.into());
278 }
279 })
280}
281
282#[cfg(all(feature = "accumulator", feature = "span"))]
288pub fn accumulating_layer_with_spans(
289 log: Arc<RwLock<Vec<OwnedEventWithSpans>>>,
290) -> CallbackLayerWithSpan {
291 CallbackLayerWithSpan::new(move |event: &Event<'_>, spans| {
292 if let Ok(mut log) = log.write() {
293 log.push(OwnedEventWithSpans::new(event, spans));
294 }
295 })
296}
297
298pub struct CallbackLayer {
305 callback: Box<dyn Fn(&Event<'_>) + Send + Sync + 'static>,
306}
307
308impl CallbackLayer {
309 pub fn new<F: Fn(&Event<'_>) + Send + Sync + 'static>(callback: F) -> Self {
310 let callback = Box::new(callback);
311 Self { callback }
312 }
313}
314
315impl<S> Layer<S> for CallbackLayer
316where
317 S: Subscriber,
318 S: for<'lookup> LookupSpan<'lookup>,
319{
320 fn on_event(&self, event: &Event<'_>, _ctx: Context<'_, S>) {
321 (self.callback)(event);
322 }
323}
324
325#[cfg(feature = "span")]
332pub struct CallbackLayerWithSpan {
333 #[allow(clippy::type_complexity)]
334 callback: Box<dyn Fn(&Event<'_>, Option<Vec<JsonMap>>) + Send + Sync + 'static>,
335}
336
337#[cfg(feature = "span")]
338impl CallbackLayerWithSpan {
339 pub fn new<F: Fn(&Event<'_>, Option<Vec<JsonMap>>) + Send + Sync + 'static>(
340 callback: F,
341 ) -> Self {
342 let callback = Box::new(callback);
343 Self { callback }
344 }
345}
346
347#[cfg(feature = "span")]
348impl<S> Layer<S> for CallbackLayerWithSpan
349where
350 S: Subscriber,
351 S: for<'lookup> tracing_subscriber::registry::LookupSpan<'lookup>,
352{
353 fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &tracing::Id, ctx: Context<'_, S>) {
354 let mut visitor = FieldVisitor::default();
355 attrs.record(&mut visitor);
356 if let Some(span) = ctx.span(id) {
357 span.extensions_mut().insert(visitor);
358 }
359 }
360 fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
361 let spans: Option<Vec<_>> = ctx.event_scope(event).map(|scope| {
362 scope
363 .from_root()
364 .map(|span| {
365 let fields: Option<JsonMap> = span
366 .extensions()
367 .get::<FieldVisitor>()
368 .cloned()
369 .map(|FieldVisitor(json_map)| json_map);
370
371 let meta = span.metadata();
372 let mut map = JsonMap::default();
373 map.insert("level".into(), format!("{}", meta.level()).into());
374 map.insert("file".into(), meta.file().into());
375 map.insert("target".into(), meta.target().into());
376 map.insert("line".into(), meta.line().into());
377 map.insert("name".into(), span.name().into());
378 map.insert("fields".into(), fields.into());
379 map
380 })
381 .collect()
382 });
383 (self.callback)(event, spans);
384 }
385}
386
387#[derive(Debug, Clone, serde::Serialize)]
394pub struct OwnedEvent {
395 pub level: TracingLevel,
396 pub file: Option<String>,
397 pub target: String,
398 pub line: Option<u32>,
399 pub name: &'static str,
400 pub message: Option<String>,
401 pub fields: serde_json::Map<String, serde_json::Value>,
402}
403
404#[derive(Debug, Clone, serde::Serialize)]
411pub struct OwnedEventWithSpans {
412 pub event: OwnedEvent,
413 pub spans: Option<Vec<JsonMap>>,
414}
415
416impl OwnedEventWithSpans {
417 pub fn new(event: &Event<'_>, spans: Option<Vec<JsonMap>>) -> Self {
418 OwnedEventWithSpans {
419 event: event.into(),
420 spans,
421 }
422 }
423}
424
425#[derive(Copy, Clone, Debug, Hash, Eq, PartialEq, serde::Serialize)]
429pub enum TracingLevel {
430 Trace = 0,
431 Debug = 1,
432 Info = 2,
433 Warn = 3,
434 Error = 4,
435}
436
437impl From<&tracing::Level> for TracingLevel {
438 fn from(value: &tracing::Level) -> Self {
439 match *value {
440 tracing::Level::TRACE => TracingLevel::Trace,
441 tracing::Level::DEBUG => TracingLevel::Debug,
442 tracing::Level::INFO => TracingLevel::Info,
443 tracing::Level::WARN => TracingLevel::Warn,
444 tracing::Level::ERROR => TracingLevel::Error,
445 }
446 }
447}
448
449impl AsRef<str> for TracingLevel {
450 fn as_ref(&self) -> &str {
451 use TracingLevel::*;
452 match self {
453 Trace => "TRACE",
454 Debug => "DEBUG",
455 Info => "INFO",
456 Warn => "WARN",
457 Error => "ERROR",
458 }
459 }
460}
461
462impl Display for TracingLevel {
463 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
464 write!(f, "{}", self.as_ref())
465 }
466}
467
468impl From<&Event<'_>> for OwnedEvent {
475 fn from(event: &Event<'_>) -> Self {
476 let mut visitor = FieldVisitor::default();
477 event.record(&mut visitor);
478
479 let message = visitor.0.remove("message").and_then(|v| {
482 if let serde_json::Value::String(s) = v {
483 Some(s)
484 } else {
485 None
486 }
487 });
488
489 let meta = event.metadata();
490 Self {
491 name: meta.name(),
492 target: meta.target().into(),
493 level: meta.level().into(),
494 file: meta.file().map(String::from),
495 line: meta.line(),
496 message,
497 fields: visitor.0,
498 }
499 }
500}
501
502#[cfg(feature = "span")]
508#[derive(Default, Clone)]
509struct FieldVisitor(serde_json::Map<String, serde_json::Value>);
510
511impl Visit for FieldVisitor {
512 fn record_bool(&mut self, field: &Field, value: bool) {
513 self.0.insert(field.name().into(), value.into());
514 }
515
516 fn record_f64(&mut self, field: &Field, value: f64) {
517 self.0.insert(field.name().into(), value.into());
518 }
519 fn record_i64(&mut self, field: &Field, value: i64) {
520 self.0.insert(field.name().into(), value.into());
521 }
522 fn record_u64(&mut self, field: &Field, value: u64) {
523 self.0.insert(field.name().into(), value.into());
524 }
525 fn record_str(&mut self, field: &Field, value: &str) {
526 self.0.insert(field.name().into(), value.into());
527 }
528 fn record_debug(&mut self, field: &Field, value: &dyn fmt::Debug) {
529 let text = format!("{:?}", value);
530 self.0.insert(field.name().into(), text.into());
531 }
532}
533
534#[cfg(test)]
541mod tests {
542
543 #[cfg(feature = "async")]
544 use {
545 std::sync::{Arc, RwLock},
546 tokio::sync::mpsc,
547 };
548
549 use {
550 super::*,
551 insta::{
552 internals::{Content, ContentPath},
553 *,
554 },
555 tracing_subscriber::{EnvFilter, prelude::*},
556 };
557
558 fn run_trace_events() {
563 let span = tracing::info_span!("root-info", recurse = 0);
564 span.in_scope(|| {
565 tracing::trace!(foo = 1, bar = 2, "this is a trace message");
566 tracing::debug!(pi = 3.14159265, "this is a debug message");
567 tracing::info!(job = "foo", "this is an info message");
568 tracing::warn!(job = "foo", "this is a warning message");
569 tracing::error!(job = "foo", "this is an error message");
570 });
571 }
572
573 fn extract_events<T: Clone>(logs: &Arc<RwLock<Vec<T>>>) -> Vec<T> {
574 let events = logs.read().expect("could not read events");
575 events.clone()
576 }
577
578 fn redact_name(value: Content, _path: ContentPath) -> String {
579 let s = value.as_str().unwrap_or_default();
580 if s.contains(":") {
581 s.split_once(":")
582 .map(|p| format!("{}:<line>", p.0))
583 .unwrap_or_default()
584 } else {
585 s.to_string()
586 }
587 }
588
589 #[test]
594 fn test_callback_layer() {
595 let events = Arc::new(RwLock::new(Vec::<OwnedEvent>::new()));
598 let cb_events = events.clone();
599
600 let _guard = tracing_subscriber::registry()
601 .with(EnvFilter::new("tracing_async2=trace"))
602 .with(callback_layer(move |event| {
603 if let Ok(mut events) = cb_events.write() {
604 events.push(event.into());
605 }
606 }))
607 .set_default();
608
609 run_trace_events();
610
611 assert_json_snapshot!("callback-layer", extract_events(&events), {
612 "[].line" => "<line>",
613 "[].name" => dynamic_redaction(redact_name),
614 });
615 }
616
617 #[cfg(feature = "span")]
618 #[test]
619 fn test_callback_layer_with_spans() {
620 let events = Arc::new(RwLock::new(Vec::<OwnedEventWithSpans>::new()));
623 let cb_events = events.clone();
624
625 let _guard = tracing_subscriber::registry()
626 .with(EnvFilter::new("tracing_async2=trace"))
627 .with(callback_layer_with_spans(move |event, spans| {
628 if let Ok(mut events) = cb_events.write() {
629 events.push(OwnedEventWithSpans::new(event, spans));
630 }
631 }))
632 .set_default();
633
634 run_trace_events();
635
636 assert_json_snapshot!("callback-layer-with-spans", extract_events(&events), {
637 "[].event.line" => "<line>",
638 "[].event.name" => dynamic_redaction(redact_name),
639 "[].spans[].line" => "<line>",
640 "[].spans[].name" => dynamic_redaction(redact_name),
641 });
642 }
643
644 #[cfg(feature = "async")]
645 #[tokio::test]
646 async fn test_channel_layer() {
647 use std::time::Duration;
649 use tokio::time::sleep;
650
651 let events = Arc::new(RwLock::new(Vec::<OwnedEvent>::new()));
654 let cb_events = events.clone();
655
656 let (tx, mut rx) = mpsc::channel(100);
657
658 let _guard = tracing_subscriber::registry()
659 .with(EnvFilter::new("tracing_async2=trace"))
660 .with(channel_layer(tx))
661 .set_default();
662
663 let handle = tokio::spawn(async move {
664 while let Some(event) = rx.recv().await {
665 if let Ok(mut events) = cb_events.write() {
666 events.push(event);
667 }
668 }
669 });
670
671 run_trace_events();
672 sleep(Duration::from_millis(100)).await;
673 handle.abort();
674
675 assert_json_snapshot!("channel-layer", extract_events(&events), {
676 "[].line" => "<line>",
677 "[].name" => dynamic_redaction(redact_name),
678 });
679 }
680
681 #[cfg(all(feature = "async", feature = "span"))]
682 #[tokio::test]
683 async fn test_channel_layer_with_spans() {
684 use std::time::Duration;
686 use tokio::time::sleep;
687
688 let events = Arc::new(RwLock::new(Vec::<OwnedEventWithSpans>::new()));
691 let cb_events = events.clone();
692
693 let (tx, mut rx) = mpsc::channel(100);
694
695 let _guard = tracing_subscriber::registry()
696 .with(EnvFilter::new("tracing_async2=trace"))
697 .with(channel_layer_with_spans(tx))
698 .set_default();
699
700 let handle = tokio::spawn(async move {
701 while let Some(event) = rx.recv().await {
702 if let Ok(mut events) = cb_events.write() {
703 events.push(event);
704 }
705 }
706 });
707
708 run_trace_events();
709 sleep(Duration::from_millis(100)).await;
710 handle.abort();
711
712 assert_json_snapshot!("channel-layer-with-spans", extract_events(&events), {
713 "[].event.line" => "<line>",
714 "[].event.name" => dynamic_redaction(redact_name),
715 "[].spans[].line" => "<line>",
716 "[].spans[].name" => dynamic_redaction(redact_name),
717 });
718 }
719
720 #[cfg(feature = "async")]
721 #[tokio::test]
722 async fn test_async_layer() {
723 use std::time::Duration;
725 use tokio::time::sleep;
726
727 let events = Arc::new(RwLock::new(Vec::<OwnedEvent>::new()));
730 let cb_events = events.clone();
731
732 let _guard = tracing_subscriber::registry()
733 .with(EnvFilter::new("tracing_async2=trace"))
734 .with(async_layer(16, move |event| {
735 let f_events = cb_events.clone();
736 async move {
737 if let Ok(mut events) = f_events.write() {
738 events.push(event);
739 }
740 }
741 }))
742 .set_default();
743
744 run_trace_events();
745 sleep(Duration::from_millis(100)).await;
746
747 assert_json_snapshot!("async-layer", extract_events(&events), {
748 "[].line" => "<line>",
749 "[].name" => dynamic_redaction(redact_name),
750 });
751 }
752
753 #[cfg(all(feature = "async", feature = "span"))]
754 #[tokio::test]
755 async fn test_async_layer_with_spans() {
756 use std::time::Duration;
758 use tokio::time::sleep;
759
760 let events = Arc::new(RwLock::new(Vec::<OwnedEventWithSpans>::new()));
763 let cb_events = events.clone();
764
765 let _guard = tracing_subscriber::registry()
766 .with(EnvFilter::new("tracing_async2=trace"))
767 .with(async_layer_with_spans(16, move |event_with_span| {
768 let f_events = cb_events.clone();
769 async move {
770 if let Ok(mut events) = f_events.write() {
771 events.push(event_with_span);
772 }
773 }
774 }))
775 .set_default();
776
777 run_trace_events();
778 sleep(Duration::from_millis(100)).await;
779
780 assert_json_snapshot!("async-layer-with-spans", extract_events(&events), {
781 "[].event.line" => "<line>",
782 "[].event.name" => dynamic_redaction(redact_name),
783 "[].spans[].line" => "<line>",
784 "[].spans[].name" => dynamic_redaction(redact_name),
785 });
786 }
787
788 #[cfg(feature = "accumulator")]
789 #[test]
790 fn test_accumulating_layer() {
791 let events = Arc::new(RwLock::new(Vec::<OwnedEvent>::new()));
794
795 let _guard = tracing_subscriber::registry()
796 .with(EnvFilter::new("tracing_async2=trace"))
797 .with(accumulating_layer(events.clone()))
798 .set_default();
799
800 run_trace_events();
801
802 assert_json_snapshot!("accumulating-layer", extract_events(&events), {
803 "[].line" => "<line>",
804 "[].name" => dynamic_redaction(redact_name),
805 });
806 }
807
808 #[cfg(all(feature = "span", feature = "accumulator"))]
809 #[test]
810 fn test_accumulating_layer_with_spans() {
811 let events = Arc::new(RwLock::new(Vec::<OwnedEventWithSpans>::new()));
814
815 let _guard = tracing_subscriber::registry()
816 .with(EnvFilter::new("tracing_async2=trace"))
817 .with(accumulating_layer_with_spans(events.clone()))
818 .set_default();
819
820 run_trace_events();
821
822 assert_json_snapshot!("accumulating-layer-with-spans", extract_events(&events), {
823 "[].event.line" => "<line>",
824 "[].event.name" => dynamic_redaction(redact_name),
825 "[].spans[].line" => "<line>",
826 "[].spans[].name" => dynamic_redaction(redact_name),
827 });
828 }
829}