1#[cfg(feature = "async")]
139use tokio::sync::mpsc::{self, error::TrySendError};
140
141#[cfg(feature = "accumulator")]
142use std::sync::{Arc, RwLock};
143
144use {
145 std::fmt::{self, Display},
146 tracing::{
147 Event, Subscriber,
148 field::{Field, Visit},
149 span,
150 },
151 tracing_subscriber::{Layer, layer::Context, registry::LookupSpan},
152};
153
154#[cfg(feature = "macros")]
155pub use tracing_async2_macros::*;
156
157#[cfg(feature = "span")]
158type JsonMap = serde_json::Map<String, serde_json::Value>;
159
160pub fn callback_layer<F>(callback: F) -> CallbackLayer
165where
166 F: Fn(&Event<'_>) + Send + Sync + 'static,
167{
168 CallbackLayer::new(callback)
169}
170
171#[cfg(feature = "span")]
176pub fn callback_layer_with_spans<F>(callback: F) -> CallbackLayerWithSpan
177where
178 F: Fn(&Event<'_>, Option<Vec<JsonMap>>) + Send + Sync + 'static,
179{
180 CallbackLayerWithSpan::new(callback)
181}
182
183#[cfg(feature = "async")]
188pub fn channel_layer(tx: mpsc::Sender<OwnedEvent>) -> CallbackLayer {
189 CallbackLayer::new(move |event: &Event<'_>| {
190 tx.try_send(event.into()).ok();
191 })
192}
193
194#[cfg(all(feature = "async", feature = "span"))]
199pub fn channel_layer_with_spans(tx: mpsc::Sender<OwnedEventWithSpans>) -> CallbackLayerWithSpan {
200 CallbackLayerWithSpan::new(move |event, spans| {
201 if let Err(e) = tx.try_send(OwnedEventWithSpans::new(event, spans)) {
202 match e {
203 TrySendError::Full(o) => {
204 eprintln!("dropping tracing event: {:?}", o);
205 }
206 TrySendError::Closed(o) => {
207 eprintln!("channel closed for tracing event: {:?}", o);
208 }
209 }
210 }
211 })
212}
213
214#[cfg(feature = "async")]
224pub fn async_layer<F, Fut>(buffer_size: usize, callback: F) -> CallbackLayer
225where
226 F: Fn(OwnedEvent) -> Fut + Send + Sync + 'static,
227 Fut: Future<Output = ()> + Send + Sync + 'static,
228{
229 let (tx, mut rx) = mpsc::channel(buffer_size);
230 tokio::spawn(async move {
231 while let Some(event) = rx.recv().await {
232 callback(event).await;
233 }
234 });
235 channel_layer(tx)
236}
237
238#[cfg(all(feature = "async", feature = "span"))]
248pub fn async_layer_with_spans<F, Fut>(buffer_size: usize, callback: F) -> CallbackLayerWithSpan
249where
250 F: Fn(OwnedEventWithSpans) -> Fut + Send + Sync + 'static,
251 Fut: Future<Output = ()> + Send + 'static,
252{
253 let (tx, mut rx) = mpsc::channel(buffer_size);
254 tokio::spawn(async move {
255 while let Some(owned_with_span) = rx.recv().await {
256 callback(owned_with_span).await;
257 }
258 });
259 channel_layer_with_spans(tx)
260}
261
262#[cfg(feature = "accumulator")]
263pub type AccumulatingLog = Arc<RwLock<Vec<OwnedEvent>>>;
264
265#[cfg(feature = "accumulator")]
271pub fn accumulating_layer(log: AccumulatingLog) -> CallbackLayer {
272 CallbackLayer::new(move |event: &Event<'_>| {
273 if let Ok(mut log) = log.write() {
274 log.push(event.into());
275 }
276 })
277}
278
279#[cfg(all(feature = "accumulator", feature = "span"))]
285pub fn accumulating_layer_with_spans(
286 log: Arc<RwLock<Vec<OwnedEventWithSpans>>>,
287) -> CallbackLayerWithSpan {
288 CallbackLayerWithSpan::new(move |event: &Event<'_>, spans| {
289 if let Ok(mut log) = log.write() {
290 log.push(OwnedEventWithSpans::new(event, spans));
291 }
292 })
293}
294
295pub struct CallbackLayer {
302 callback: Box<dyn Fn(&Event<'_>) + Send + Sync + 'static>,
303}
304
305impl CallbackLayer {
306 pub fn new<F: Fn(&Event<'_>) + Send + Sync + 'static>(callback: F) -> Self {
307 let callback = Box::new(callback);
308 Self { callback }
309 }
310}
311
312impl<S> Layer<S> for CallbackLayer
313where
314 S: Subscriber,
315 S: for<'lookup> LookupSpan<'lookup>,
316{
317 fn on_event(&self, event: &Event<'_>, _ctx: Context<'_, S>) {
318 (self.callback)(event);
319 }
320}
321
322#[cfg(feature = "span")]
329pub struct CallbackLayerWithSpan {
330 #[allow(clippy::type_complexity)]
331 callback: Box<dyn Fn(&Event<'_>, Option<Vec<JsonMap>>) + Send + Sync + 'static>,
332}
333
334#[cfg(feature = "span")]
335impl CallbackLayerWithSpan {
336 pub fn new<F: Fn(&Event<'_>, Option<Vec<JsonMap>>) + Send + Sync + 'static>(
337 callback: F,
338 ) -> Self {
339 let callback = Box::new(callback);
340 Self { callback }
341 }
342}
343
344#[cfg(feature = "span")]
345impl<S> Layer<S> for CallbackLayerWithSpan
346where
347 S: Subscriber,
348 S: for<'lookup> tracing_subscriber::registry::LookupSpan<'lookup>,
349{
350 fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &tracing::Id, ctx: Context<'_, S>) {
351 let mut visitor = FieldVisitor::default();
352 attrs.record(&mut visitor);
353 if let Some(span) = ctx.span(id) {
354 span.extensions_mut().insert(visitor);
355 }
356 }
357 fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
358 let spans: Option<Vec<_>> = ctx.event_scope(event).map(|scope| {
359 scope
360 .from_root()
361 .map(|span| {
362 let fields: Option<JsonMap> = span
363 .extensions()
364 .get::<FieldVisitor>()
365 .cloned()
366 .map(|FieldVisitor(json_map)| json_map);
367
368 let meta = span.metadata();
369 let mut map = JsonMap::default();
370 map.insert("level".into(), format!("{}", meta.level()).into());
371 map.insert("file".into(), meta.file().into());
372 map.insert("target".into(), meta.target().into());
373 map.insert("line".into(), meta.line().into());
374 map.insert("name".into(), span.name().into());
375 map.insert("fields".into(), fields.into());
376 map
377 })
378 .collect()
379 });
380 (self.callback)(event, spans);
381 }
382}
383
384#[derive(Debug, Clone, serde::Serialize)]
391pub struct OwnedEvent {
392 pub level: TracingLevel,
393 pub file: Option<String>,
394 pub target: String,
395 pub line: Option<u32>,
396 pub name: &'static str,
397 pub message: Option<String>,
398 pub fields: serde_json::Map<String, serde_json::Value>,
399}
400
401#[derive(Debug, Clone, serde::Serialize)]
408pub struct OwnedEventWithSpans {
409 pub event: OwnedEvent,
410 pub spans: Option<Vec<JsonMap>>,
411}
412
413impl OwnedEventWithSpans {
414 pub fn new(event: &Event<'_>, spans: Option<Vec<JsonMap>>) -> Self {
415 OwnedEventWithSpans {
416 event: event.into(),
417 spans,
418 }
419 }
420}
421
422#[derive(Copy, Clone, Debug, Hash, Eq, PartialEq, serde::Serialize)]
426pub enum TracingLevel {
427 Trace = 0,
428 Debug = 1,
429 Info = 2,
430 Warn = 3,
431 Error = 4,
432}
433
434impl From<&tracing::Level> for TracingLevel {
435 fn from(value: &tracing::Level) -> Self {
436 match *value {
437 tracing::Level::TRACE => TracingLevel::Trace,
438 tracing::Level::DEBUG => TracingLevel::Debug,
439 tracing::Level::INFO => TracingLevel::Info,
440 tracing::Level::WARN => TracingLevel::Warn,
441 tracing::Level::ERROR => TracingLevel::Error,
442 }
443 }
444}
445
446impl AsRef<str> for TracingLevel {
447 fn as_ref(&self) -> &str {
448 use TracingLevel::*;
449 match self {
450 Trace => "TRACE",
451 Debug => "DEBUG",
452 Info => "INFO",
453 Warn => "WARN",
454 Error => "ERROR",
455 }
456 }
457}
458
459impl Display for TracingLevel {
460 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
461 write!(f, "{}", self.as_ref())
462 }
463}
464
465impl From<&Event<'_>> for OwnedEvent {
472 fn from(event: &Event<'_>) -> Self {
473 let mut visitor = FieldVisitor::default();
474 event.record(&mut visitor);
475
476 let message = visitor.0.remove("message").and_then(|v| {
479 if let serde_json::Value::String(s) = v {
480 Some(s)
481 } else {
482 None
483 }
484 });
485
486 let meta = event.metadata();
487 Self {
488 name: meta.name(),
489 target: meta.target().into(),
490 level: meta.level().into(),
491 file: meta.file().map(String::from),
492 line: meta.line(),
493 message,
494 fields: visitor.0,
495 }
496 }
497}
498
499#[cfg(feature = "span")]
505#[derive(Default, Clone)]
506struct FieldVisitor(serde_json::Map<String, serde_json::Value>);
507
508impl Visit for FieldVisitor {
509 fn record_bool(&mut self, field: &Field, value: bool) {
510 self.0.insert(field.name().into(), value.into());
511 }
512
513 fn record_f64(&mut self, field: &Field, value: f64) {
514 self.0.insert(field.name().into(), value.into());
515 }
516 fn record_i64(&mut self, field: &Field, value: i64) {
517 self.0.insert(field.name().into(), value.into());
518 }
519 fn record_u64(&mut self, field: &Field, value: u64) {
520 self.0.insert(field.name().into(), value.into());
521 }
522 fn record_str(&mut self, field: &Field, value: &str) {
523 self.0.insert(field.name().into(), value.into());
524 }
525 fn record_debug(&mut self, field: &Field, value: &dyn fmt::Debug) {
526 let text = format!("{:?}", value);
527 self.0.insert(field.name().into(), text.into());
528 }
529}
530
531#[cfg(test)]
538mod tests {
539
540 #[cfg(feature = "async")]
541 use {
542 std::sync::{Arc, RwLock},
543 tokio::sync::mpsc,
544 };
545
546 use {
547 super::*,
548 insta::{
549 internals::{Content, ContentPath},
550 *,
551 },
552 tracing_subscriber::{EnvFilter, prelude::*},
553 };
554
555 fn run_trace_events() {
560 let span = tracing::info_span!("root-info", recurse = 0);
561 span.in_scope(|| {
562 tracing::trace!(foo = 1, bar = 2, "this is a trace message");
563 tracing::debug!(pi = 3.14159265, "this is a debug message");
564 tracing::info!(job = "foo", "this is an info message");
565 tracing::warn!(job = "foo", "this is a warning message");
566 tracing::error!(job = "foo", "this is an error message");
567 });
568 }
569
570 fn extract_events<T: Clone>(logs: &Arc<RwLock<Vec<T>>>) -> Vec<T> {
571 let events = logs.read().expect("could not read events");
572 events.clone()
573 }
574
575 fn redact_name(value: Content, _path: ContentPath) -> String {
576 let s = value.as_str().unwrap_or_default();
577 if s.contains(":") {
578 s.split_once(":")
579 .map(|p| format!("{}:<line>", p.0))
580 .unwrap_or_default()
581 } else {
582 s.to_string()
583 }
584 }
585
586 #[test]
591 fn test_callback_layer() {
592 let events = Arc::new(RwLock::new(Vec::<OwnedEvent>::new()));
595 let cb_events = events.clone();
596
597 let _guard = tracing_subscriber::registry()
598 .with(EnvFilter::new("tracing_async2=trace"))
599 .with(callback_layer(move |event| {
600 if let Ok(mut events) = cb_events.write() {
601 events.push(event.into());
602 }
603 }))
604 .set_default();
605
606 run_trace_events();
607
608 assert_json_snapshot!("callback-layer", extract_events(&events), {
609 "[].line" => "<line>",
610 "[].name" => dynamic_redaction(redact_name),
611 });
612 }
613
614 #[cfg(feature = "span")]
615 #[test]
616 fn test_callback_layer_with_spans() {
617 let events = Arc::new(RwLock::new(Vec::<OwnedEventWithSpans>::new()));
620 let cb_events = events.clone();
621
622 let _guard = tracing_subscriber::registry()
623 .with(EnvFilter::new("tracing_async2=trace"))
624 .with(callback_layer_with_spans(move |event, spans| {
625 if let Ok(mut events) = cb_events.write() {
626 events.push(OwnedEventWithSpans::new(event, spans));
627 }
628 }))
629 .set_default();
630
631 run_trace_events();
632
633 assert_json_snapshot!("callback-layer-with-spans", extract_events(&events), {
634 "[].event.line" => "<line>",
635 "[].event.name" => dynamic_redaction(redact_name),
636 "[].spans[].line" => "<line>",
637 "[].spans[].name" => dynamic_redaction(redact_name),
638 });
639 }
640
641 #[cfg(feature = "async")]
642 #[tokio::test]
643 async fn test_channel_layer() {
644 use std::time::Duration;
646 use tokio::time::sleep;
647
648 let events = Arc::new(RwLock::new(Vec::<OwnedEvent>::new()));
651 let cb_events = events.clone();
652
653 let (tx, mut rx) = mpsc::channel(100);
654
655 let _guard = tracing_subscriber::registry()
656 .with(EnvFilter::new("tracing_async2=trace"))
657 .with(channel_layer(tx))
658 .set_default();
659
660 let handle = tokio::spawn(async move {
661 while let Some(event) = rx.recv().await {
662 if let Ok(mut events) = cb_events.write() {
663 events.push(event);
664 }
665 }
666 });
667
668 run_trace_events();
669 sleep(Duration::from_millis(100)).await;
670 handle.abort();
671
672 assert_json_snapshot!("channel-layer", extract_events(&events), {
673 "[].line" => "<line>",
674 "[].name" => dynamic_redaction(redact_name),
675 });
676 }
677
678 #[cfg(all(feature = "async", feature = "span"))]
679 #[tokio::test]
680 async fn test_channel_layer_with_spans() {
681 use std::time::Duration;
683 use tokio::time::sleep;
684
685 let events = Arc::new(RwLock::new(Vec::<OwnedEventWithSpans>::new()));
688 let cb_events = events.clone();
689
690 let (tx, mut rx) = mpsc::channel(100);
691
692 let _guard = tracing_subscriber::registry()
693 .with(EnvFilter::new("tracing_async2=trace"))
694 .with(channel_layer_with_spans(tx))
695 .set_default();
696
697 let handle = tokio::spawn(async move {
698 while let Some(event) = rx.recv().await {
699 if let Ok(mut events) = cb_events.write() {
700 events.push(event);
701 }
702 }
703 });
704
705 run_trace_events();
706 sleep(Duration::from_millis(100)).await;
707 handle.abort();
708
709 assert_json_snapshot!("channel-layer-with-spans", extract_events(&events), {
710 "[].event.line" => "<line>",
711 "[].event.name" => dynamic_redaction(redact_name),
712 "[].spans[].line" => "<line>",
713 "[].spans[].name" => dynamic_redaction(redact_name),
714 });
715 }
716
717 #[cfg(feature = "async")]
718 #[tokio::test]
719 async fn test_async_layer() {
720 use std::time::Duration;
722 use tokio::time::sleep;
723
724 let events = Arc::new(RwLock::new(Vec::<OwnedEvent>::new()));
727 let cb_events = events.clone();
728
729 let _guard = tracing_subscriber::registry()
730 .with(EnvFilter::new("tracing_async2=trace"))
731 .with(async_layer(16, move |event| {
732 let f_events = cb_events.clone();
733 async move {
734 if let Ok(mut events) = f_events.write() {
735 events.push(event);
736 }
737 }
738 }))
739 .set_default();
740
741 run_trace_events();
742 sleep(Duration::from_millis(100)).await;
743
744 assert_json_snapshot!("async-layer", extract_events(&events), {
745 "[].line" => "<line>",
746 "[].name" => dynamic_redaction(redact_name),
747 });
748 }
749
750 #[cfg(all(feature = "async", feature = "span"))]
751 #[tokio::test]
752 async fn test_async_layer_with_spans() {
753 use std::time::Duration;
755 use tokio::time::sleep;
756
757 let events = Arc::new(RwLock::new(Vec::<OwnedEventWithSpans>::new()));
760 let cb_events = events.clone();
761
762 let _guard = tracing_subscriber::registry()
763 .with(EnvFilter::new("tracing_async2=trace"))
764 .with(async_layer_with_spans(16, move |event_with_span| {
765 let f_events = cb_events.clone();
766 async move {
767 if let Ok(mut events) = f_events.write() {
768 events.push(event_with_span);
769 }
770 }
771 }))
772 .set_default();
773
774 run_trace_events();
775 sleep(Duration::from_millis(100)).await;
776
777 assert_json_snapshot!("async-layer-with-spans", extract_events(&events), {
778 "[].event.line" => "<line>",
779 "[].event.name" => dynamic_redaction(redact_name),
780 "[].spans[].line" => "<line>",
781 "[].spans[].name" => dynamic_redaction(redact_name),
782 });
783 }
784
785 #[cfg(feature = "accumulator")]
786 #[test]
787 fn test_accumulating_layer() {
788 let events = Arc::new(RwLock::new(Vec::<OwnedEvent>::new()));
791
792 let _guard = tracing_subscriber::registry()
793 .with(EnvFilter::new("tracing_async2=trace"))
794 .with(accumulating_layer(events.clone()))
795 .set_default();
796
797 run_trace_events();
798
799 assert_json_snapshot!("accumulating-layer", extract_events(&events), {
800 "[].line" => "<line>",
801 "[].name" => dynamic_redaction(redact_name),
802 });
803 }
804
805 #[cfg(all(feature = "span", feature = "accumulator"))]
806 #[test]
807 fn test_accumulating_layer_with_spans() {
808 let events = Arc::new(RwLock::new(Vec::<OwnedEventWithSpans>::new()));
811
812 let _guard = tracing_subscriber::registry()
813 .with(EnvFilter::new("tracing_async2=trace"))
814 .with(accumulating_layer_with_spans(events.clone()))
815 .set_default();
816
817 run_trace_events();
818
819 assert_json_snapshot!("accumulating-layer-with-spans", extract_events(&events), {
820 "[].event.line" => "<line>",
821 "[].event.name" => dynamic_redaction(redact_name),
822 "[].spans[].line" => "<line>",
823 "[].spans[].name" => dynamic_redaction(redact_name),
824 });
825 }
826}