1#[cfg(feature = "async")]
95use tokio::sync::mpsc::{self, error::TrySendError};
96
97#[cfg(feature = "accumulator")]
98use std::sync::{Arc, RwLock};
99
100use {
101 std::fmt,
102 tracing::{
103 Event, Subscriber,
104 field::{Field, Visit},
105 span,
106 },
107 tracing_subscriber::{Layer, layer::Context, registry::LookupSpan},
108};
109
110#[cfg(feature = "span")]
111type JsonMap = serde_json::Map<String, serde_json::Value>;
112
113pub fn callback_layer<F>(callback: F) -> CallbackLayer
118where
119 F: Fn(&Event<'_>) + Send + Sync + 'static,
120{
121 CallbackLayer::new(callback)
122}
123
124#[cfg(feature = "span")]
129pub fn callback_layer_with_spans<F>(callback: F) -> CallbackLayerWithSpan
130where
131 F: Fn(&Event<'_>, Option<Vec<JsonMap>>) + Send + Sync + 'static,
132{
133 CallbackLayerWithSpan::new(callback)
134}
135
136#[cfg(feature = "async")]
141pub fn channel_layer(tx: mpsc::Sender<OwnedEvent>) -> CallbackLayer {
142 CallbackLayer::new(move |event: &Event<'_>| {
143 tx.try_send(event.into()).ok();
144 })
145}
146
147#[cfg(all(feature = "async", feature = "span"))]
152pub fn channel_layer_with_spans(tx: mpsc::Sender<OwnedEventWithSpans>) -> CallbackLayerWithSpan {
153 CallbackLayerWithSpan::new(move |event, spans| {
154 if let Err(e) = tx.try_send(OwnedEventWithSpans::new(event, spans)) {
155 match e {
156 TrySendError::Full(o) => {
157 eprintln!("dropping tracing event: {:?}", o);
158 }
159 TrySendError::Closed(o) => {
160 eprintln!("channel closed for tracing event: {:?}", o);
161 }
162 }
163 }
164 })
165}
166
167#[cfg(feature = "async")]
177pub fn async_layer<F, Fut>(buffer_size: usize, callback: F) -> CallbackLayer
178where
179 F: Fn(OwnedEvent) -> Fut + Send + Sync + 'static,
180 Fut: Future<Output = ()> + Send + Sync + 'static,
181{
182 let (tx, mut rx) = mpsc::channel(buffer_size);
183 tokio::spawn(async move {
184 while let Some(event) = rx.recv().await {
185 callback(event).await;
186 }
187 });
188 channel_layer(tx)
189}
190
191#[cfg(all(feature = "async", feature = "span"))]
201pub fn async_layer_with_spans<F, Fut>(buffer_size: usize, callback: F) -> CallbackLayerWithSpan
202where
203 F: Fn(OwnedEventWithSpans) -> Fut + Send + Sync + 'static,
204 Fut: Future<Output = ()> + Send + 'static,
205{
206 let (tx, mut rx) = mpsc::channel(buffer_size);
207 tokio::spawn(async move {
208 while let Some(owned_with_span) = rx.recv().await {
209 callback(owned_with_span).await;
210 }
211 });
212 channel_layer_with_spans(tx)
213}
214
215#[cfg(feature = "accumulator")]
216pub type AccumulatingLog = Arc<RwLock<Vec<OwnedEvent>>>;
217
218#[cfg(feature = "accumulator")]
224pub fn accumulating_layer(log: AccumulatingLog) -> CallbackLayer {
225 CallbackLayer::new(move |event: &Event<'_>| {
226 if let Ok(mut log) = log.write() {
227 log.push(event.into());
228 }
229 })
230}
231
232#[cfg(all(feature = "accumulator", feature = "span"))]
238pub fn accumulating_layer_with_spans(
239 log: Arc<RwLock<Vec<OwnedEventWithSpans>>>,
240) -> CallbackLayerWithSpan {
241 CallbackLayerWithSpan::new(move |event: &Event<'_>, spans| {
242 if let Ok(mut log) = log.write() {
243 log.push(OwnedEventWithSpans::new(event, spans));
244 }
245 })
246}
247
248pub struct CallbackLayer {
255 callback: Box<dyn Fn(&Event<'_>) + Send + Sync + 'static>,
256}
257
258impl CallbackLayer {
259 pub fn new<F: Fn(&Event<'_>) + Send + Sync + 'static>(callback: F) -> Self {
260 let callback = Box::new(callback);
261 Self { callback }
262 }
263}
264
265impl<S> Layer<S> for CallbackLayer
266where
267 S: Subscriber,
268 S: for<'lookup> LookupSpan<'lookup>,
269{
270 fn on_event(&self, event: &Event<'_>, _ctx: Context<'_, S>) {
271 (self.callback)(event);
272 }
273}
274
275#[cfg(feature = "span")]
282pub struct CallbackLayerWithSpan {
283 #[allow(clippy::type_complexity)]
284 callback: Box<dyn Fn(&Event<'_>, Option<Vec<JsonMap>>) + Send + Sync + 'static>,
285}
286
287#[cfg(feature = "span")]
288impl CallbackLayerWithSpan {
289 pub fn new<F: Fn(&Event<'_>, Option<Vec<JsonMap>>) + Send + Sync + 'static>(
290 callback: F,
291 ) -> Self {
292 let callback = Box::new(callback);
293 Self { callback }
294 }
295}
296
297#[cfg(feature = "span")]
298impl<S> Layer<S> for CallbackLayerWithSpan
299where
300 S: Subscriber,
301 S: for<'lookup> tracing_subscriber::registry::LookupSpan<'lookup>,
302{
303 fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &tracing::Id, ctx: Context<'_, S>) {
304 let mut visitor = FieldVisitor::default();
305 attrs.record(&mut visitor);
306 if let Some(span) = ctx.span(id) {
307 span.extensions_mut().insert(visitor);
308 }
309 }
310 fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
311 let spans: Option<Vec<_>> = ctx.event_scope(event).map(|scope| {
312 scope
313 .from_root()
314 .map(|span| {
315 let fields: Option<JsonMap> = span
316 .extensions()
317 .get::<FieldVisitor>()
318 .cloned()
319 .map(|FieldVisitor(json_map)| json_map);
320
321 let meta = span.metadata();
322 let mut map = JsonMap::default();
323 map.insert("level".into(), format!("{}", meta.level()).into());
324 map.insert("file".into(), meta.file().into());
325 map.insert("target".into(), meta.target().into());
326 map.insert("line".into(), meta.line().into());
327 map.insert("name".into(), span.name().into());
328 map.insert("fields".into(), fields.into());
329 map
330 })
331 .collect()
332 });
333 (self.callback)(event, spans);
334 }
335}
336
337#[derive(Debug, Clone, serde::Serialize)]
344pub struct OwnedEvent {
345 pub level: TracingLevel,
346 pub file: Option<String>,
347 pub target: String,
348 pub line: Option<u32>,
349 pub name: &'static str,
350 pub message: Option<String>,
351 pub fields: serde_json::Map<String, serde_json::Value>,
352}
353
354#[derive(Debug, Clone, serde::Serialize)]
361pub struct OwnedEventWithSpans {
362 pub event: OwnedEvent,
363 pub spans: Option<Vec<JsonMap>>,
364}
365
366impl OwnedEventWithSpans {
367 pub fn new(event: &Event<'_>, spans: Option<Vec<JsonMap>>) -> Self {
368 OwnedEventWithSpans {
369 event: event.into(),
370 spans,
371 }
372 }
373}
374
375#[derive(Copy, Clone, Debug, Hash, Eq, PartialEq, serde::Serialize)]
379pub enum TracingLevel {
380 Trace = 0,
381 Debug = 1,
382 Info = 2,
383 Warn = 3,
384 Error = 4,
385}
386
387impl From<&tracing::Level> for TracingLevel {
388 fn from(value: &tracing::Level) -> Self {
389 match *value {
390 tracing::Level::TRACE => TracingLevel::Trace,
391 tracing::Level::DEBUG => TracingLevel::Debug,
392 tracing::Level::INFO => TracingLevel::Info,
393 tracing::Level::WARN => TracingLevel::Warn,
394 tracing::Level::ERROR => TracingLevel::Error,
395 }
396 }
397}
398
399impl From<&Event<'_>> for OwnedEvent {
406 fn from(event: &Event<'_>) -> Self {
407 let mut visitor = FieldVisitor::default();
408 event.record(&mut visitor);
409
410 let message = visitor.0.remove("message").and_then(|v| {
413 if let serde_json::Value::String(s) = v {
414 Some(s)
415 } else {
416 None
417 }
418 });
419
420 let meta = event.metadata();
421 Self {
422 name: meta.name(),
423 target: meta.target().into(),
424 level: meta.level().into(),
425 file: meta.file().map(String::from),
426 line: meta.line(),
427 message,
428 fields: visitor.0,
429 }
430 }
431}
432
433#[cfg(feature = "span")]
439#[derive(Default, Clone)]
440struct FieldVisitor(serde_json::Map<String, serde_json::Value>);
441
442impl Visit for FieldVisitor {
443 fn record_bool(&mut self, field: &Field, value: bool) {
444 self.0.insert(field.name().into(), value.into());
445 }
446
447 fn record_f64(&mut self, field: &Field, value: f64) {
448 self.0.insert(field.name().into(), value.into());
449 }
450 fn record_i64(&mut self, field: &Field, value: i64) {
451 self.0.insert(field.name().into(), value.into());
452 }
453 fn record_u64(&mut self, field: &Field, value: u64) {
454 self.0.insert(field.name().into(), value.into());
455 }
456 fn record_str(&mut self, field: &Field, value: &str) {
457 self.0.insert(field.name().into(), value.into());
458 }
459 fn record_debug(&mut self, field: &Field, value: &dyn fmt::Debug) {
460 let text = format!("{:?}", value);
461 self.0.insert(field.name().into(), text.into());
462 }
463}
464
465#[cfg(test)]
472mod tests {
473
474 #[cfg(feature = "async")]
475 use {
476 std::sync::{Arc, RwLock},
477 tokio::sync::mpsc,
478 };
479
480 use {
481 super::*,
482 insta::{
483 internals::{Content, ContentPath},
484 *,
485 },
486 tracing_subscriber::{EnvFilter, prelude::*},
487 };
488
489 fn run_trace_events() {
494 let span = tracing::info_span!("root-info", recurse = 0);
495 span.in_scope(|| {
496 tracing::trace!(foo = 1, bar = 2, "this is a trace message");
497 tracing::debug!(pi = 3.14159265, "this is a debug message");
498 tracing::info!(job = "foo", "this is an info message");
499 tracing::warn!(job = "foo", "this is a warning message");
500 tracing::error!(job = "foo", "this is an error message");
501 });
502 }
503
504 fn extract_events<T: Clone>(logs: &Arc<RwLock<Vec<T>>>) -> Vec<T> {
505 let events = logs.read().expect("could not read events");
506 events.clone()
507 }
508
509 fn redact_name(value: Content, _path: ContentPath) -> String {
510 let s = value.as_str().unwrap_or_default();
511 if s.contains(":") {
512 s.split_once(":")
513 .map(|p| format!("{}:<line>", p.0))
514 .unwrap_or_default()
515 } else {
516 s.to_string()
517 }
518 }
519
520 #[test]
525 fn test_callback_layer() {
526 let events = Arc::new(RwLock::new(Vec::<OwnedEvent>::new()));
529 let cb_events = events.clone();
530
531 let _guard = tracing_subscriber::registry()
532 .with(EnvFilter::new("tracing_async2=trace"))
533 .with(callback_layer(move |event| {
534 if let Ok(mut events) = cb_events.write() {
535 events.push(event.into());
536 }
537 }))
538 .set_default();
539
540 run_trace_events();
541
542 assert_json_snapshot!("callback-layer", extract_events(&events), {
543 "[].line" => "<line>",
544 "[].name" => dynamic_redaction(redact_name),
545 });
546 }
547
548 #[cfg(feature = "span")]
549 #[test]
550 fn test_callback_layer_with_spans() {
551 let events = Arc::new(RwLock::new(Vec::<OwnedEventWithSpans>::new()));
554 let cb_events = events.clone();
555
556 let _guard = tracing_subscriber::registry()
557 .with(EnvFilter::new("tracing_async2=trace"))
558 .with(callback_layer_with_spans(move |event, spans| {
559 if let Ok(mut events) = cb_events.write() {
560 events.push(OwnedEventWithSpans::new(event, spans));
561 }
562 }))
563 .set_default();
564
565 run_trace_events();
566
567 assert_json_snapshot!("callback-layer-with-spans", extract_events(&events), {
568 "[].event.line" => "<line>",
569 "[].event.name" => dynamic_redaction(redact_name),
570 "[].spans[].line" => "<line>",
571 "[].spans[].name" => dynamic_redaction(redact_name),
572 });
573 }
574
575 #[cfg(feature = "async")]
576 #[tokio::test]
577 async fn test_channel_layer() {
578 use std::time::Duration;
580 use tokio::time::sleep;
581
582 let events = Arc::new(RwLock::new(Vec::<OwnedEvent>::new()));
585 let cb_events = events.clone();
586
587 let (tx, mut rx) = mpsc::channel(100);
588
589 let _guard = tracing_subscriber::registry()
590 .with(EnvFilter::new("tracing_async2=trace"))
591 .with(channel_layer(tx))
592 .set_default();
593
594 let handle = tokio::spawn(async move {
595 while let Some(event) = rx.recv().await {
596 if let Ok(mut events) = cb_events.write() {
597 events.push(event);
598 }
599 }
600 });
601
602 run_trace_events();
603 sleep(Duration::from_millis(100)).await;
604 handle.abort();
605
606 assert_json_snapshot!("channel-layer", extract_events(&events), {
607 "[].line" => "<line>",
608 "[].name" => dynamic_redaction(redact_name),
609 });
610 }
611
612 #[cfg(all(feature = "async", feature = "span"))]
613 #[tokio::test]
614 async fn test_channel_layer_with_spans() {
615 use std::time::Duration;
617 use tokio::time::sleep;
618
619 let events = Arc::new(RwLock::new(Vec::<OwnedEventWithSpans>::new()));
622 let cb_events = events.clone();
623
624 let (tx, mut rx) = mpsc::channel(100);
625
626 let _guard = tracing_subscriber::registry()
627 .with(EnvFilter::new("tracing_async2=trace"))
628 .with(channel_layer_with_spans(tx))
629 .set_default();
630
631 let handle = tokio::spawn(async move {
632 while let Some(event) = rx.recv().await {
633 if let Ok(mut events) = cb_events.write() {
634 events.push(event);
635 }
636 }
637 });
638
639 run_trace_events();
640 sleep(Duration::from_millis(100)).await;
641 handle.abort();
642
643 assert_json_snapshot!("channel-layer-with-spans", extract_events(&events), {
644 "[].event.line" => "<line>",
645 "[].event.name" => dynamic_redaction(redact_name),
646 "[].spans[].line" => "<line>",
647 "[].spans[].name" => dynamic_redaction(redact_name),
648 });
649 }
650
651 #[cfg(feature = "async")]
652 #[tokio::test]
653 async fn test_async_layer() {
654 use std::time::Duration;
656 use tokio::time::sleep;
657
658 let events = Arc::new(RwLock::new(Vec::<OwnedEvent>::new()));
661 let cb_events = events.clone();
662
663 let _guard = tracing_subscriber::registry()
664 .with(EnvFilter::new("tracing_async2=trace"))
665 .with(async_layer(16, move |event| {
666 let f_events = cb_events.clone();
667 async move {
668 if let Ok(mut events) = f_events.write() {
669 events.push(event);
670 }
671 }
672 }))
673 .set_default();
674
675 run_trace_events();
676 sleep(Duration::from_millis(100)).await;
677
678 assert_json_snapshot!("async-layer", extract_events(&events), {
679 "[].line" => "<line>",
680 "[].name" => dynamic_redaction(redact_name),
681 });
682 }
683
684 #[cfg(all(feature = "async", feature = "span"))]
685 #[tokio::test]
686 async fn test_async_layer_with_spans() {
687 use std::time::Duration;
689 use tokio::time::sleep;
690
691 let events = Arc::new(RwLock::new(Vec::<OwnedEventWithSpans>::new()));
694 let cb_events = events.clone();
695
696 let _guard = tracing_subscriber::registry()
697 .with(EnvFilter::new("tracing_async2=trace"))
698 .with(async_layer_with_spans(16, move |event_with_span| {
699 let f_events = cb_events.clone();
700 async move {
701 if let Ok(mut events) = f_events.write() {
702 events.push(event_with_span);
703 }
704 }
705 }))
706 .set_default();
707
708 run_trace_events();
709 sleep(Duration::from_millis(100)).await;
710
711 assert_json_snapshot!("async-layer-with-spans", extract_events(&events), {
712 "[].event.line" => "<line>",
713 "[].event.name" => dynamic_redaction(redact_name),
714 "[].spans[].line" => "<line>",
715 "[].spans[].name" => dynamic_redaction(redact_name),
716 });
717 }
718
719 #[cfg(feature = "accumulator")]
720 #[test]
721 fn test_accumulating_layer() {
722 let events = Arc::new(RwLock::new(Vec::<OwnedEvent>::new()));
725
726 let _guard = tracing_subscriber::registry()
727 .with(EnvFilter::new("tracing_async2=trace"))
728 .with(accumulating_layer(events.clone()))
729 .set_default();
730
731 run_trace_events();
732
733 assert_json_snapshot!("accumulating-layer", extract_events(&events), {
734 "[].line" => "<line>",
735 "[].name" => dynamic_redaction(redact_name),
736 });
737 }
738
739 #[cfg(all(feature = "span", feature = "accumulator"))]
740 #[test]
741 fn test_accumulating_layer_with_spans() {
742 let events = Arc::new(RwLock::new(Vec::<OwnedEventWithSpans>::new()));
745
746 let _guard = tracing_subscriber::registry()
747 .with(EnvFilter::new("tracing_async2=trace"))
748 .with(accumulating_layer_with_spans(events.clone()))
749 .set_default();
750
751 run_trace_events();
752
753 assert_json_snapshot!("accumulating-layer-with-spans", extract_events(&events), {
754 "[].event.line" => "<line>",
755 "[].event.name" => dynamic_redaction(redact_name),
756 "[].spans[].line" => "<line>",
757 "[].spans[].name" => dynamic_redaction(redact_name),
758 });
759 }
760}