1#[cfg(feature = "async")]
95use tokio::sync::mpsc::{self, error::TrySendError};
96
97#[cfg(feature = "accumulator")]
98use std::sync::{Arc, RwLock};
99
100use {
101 std::fmt,
102 tracing::{
103 Event, Subscriber,
104 field::{Field, Visit},
105 span,
106 },
107 tracing_subscriber::{Layer, layer::Context, registry::LookupSpan},
108};
109
110#[cfg(feature = "span")]
111type JsonMap = serde_json::Map<String, serde_json::Value>;
112
113pub fn callback_layer<F>(callback: F) -> CallbackLayer
118where
119 F: Fn(&Event<'_>) + Send + Sync + 'static,
120{
121 CallbackLayer::new(callback)
122}
123
124#[cfg(feature = "span")]
129pub fn callback_layer_with_spans<F>(callback: F) -> CallbackLayerWithSpan
130where
131 F: Fn(&Event<'_>, Option<Vec<JsonMap>>) + Send + Sync + 'static,
132{
133 CallbackLayerWithSpan::new(callback)
134}
135
136#[cfg(feature = "async")]
141pub fn channel_layer(tx: mpsc::Sender<OwnedEvent>) -> CallbackLayer {
142 CallbackLayer::new(move |event: &Event<'_>| {
143 tx.try_send(event.into()).ok();
144 })
145}
146
147#[cfg(all(feature = "async", feature = "span"))]
152pub fn channel_layer_with_spans(tx: mpsc::Sender<OwnedEventWithSpans>) -> CallbackLayerWithSpan {
153 CallbackLayerWithSpan::new(move |event, spans| {
154 if let Err(e) = tx.try_send(OwnedEventWithSpans::new(event, spans)) {
155 match e {
156 TrySendError::Full(o) => {
157 eprintln!("dropping tracing event: {:?}", o);
158 }
159 TrySendError::Closed(o) => {
160 eprintln!("channel closed for tracing event: {:?}", o);
161 }
162 }
163 }
164 })
165}
166
167#[cfg(feature = "async")]
177pub fn async_layer<F, Fut>(buffer_size: usize, callback: F) -> CallbackLayer
178where
179 F: Fn(OwnedEvent) -> Fut + Send + Sync + 'static,
180 Fut: Future<Output = ()> + Send + Sync + 'static,
181{
182 let (tx, mut rx) = mpsc::channel(buffer_size);
183 tokio::spawn(async move {
184 while let Some(event) = rx.recv().await {
185 callback(OwnedEvent::from(event)).await;
186 }
187 });
188 channel_layer(tx)
189}
190
191#[cfg(all(feature = "async", feature = "span"))]
201pub fn async_layer_with_spans<F, Fut>(buffer_size: usize, callback: F) -> CallbackLayerWithSpan
202where
203 F: Fn(OwnedEventWithSpans) -> Fut + Send + Sync + 'static,
204 Fut: Future<Output = ()> + Send + 'static,
205{
206 let (tx, mut rx) = mpsc::channel(buffer_size);
207 tokio::spawn(async move {
208 while let Some(owned_with_span) = rx.recv().await {
209 callback(owned_with_span).await;
210 }
211 });
212 channel_layer_with_spans(tx)
213}
214
215#[cfg(feature = "accumulator")]
216pub type AccumulatingLog = Arc<RwLock<Vec<OwnedEvent>>>;
217
218#[cfg(feature = "accumulator")]
224pub fn accumulating_layer(log: AccumulatingLog) -> CallbackLayer {
225 CallbackLayer::new(move |event: &Event<'_>| {
226 if let Ok(mut log) = log.write() {
227 log.push(event.into());
228 }
229 })
230}
231
232#[cfg(all(feature = "accumulator", feature = "span"))]
238pub fn accumulating_layer_with_spans(
239 log: Arc<RwLock<Vec<OwnedEventWithSpans>>>,
240) -> CallbackLayerWithSpan {
241 CallbackLayerWithSpan::new(move |event: &Event<'_>, spans| {
242 if let Ok(mut log) = log.write() {
243 log.push(OwnedEventWithSpans::new(event, spans));
244 }
245 })
246}
247
248pub struct CallbackLayer {
255 callback: Box<dyn Fn(&Event<'_>) + Send + Sync + 'static>,
256}
257
258impl CallbackLayer {
259 pub fn new<F: Fn(&Event<'_>) + Send + Sync + 'static>(callback: F) -> Self {
260 let callback = Box::new(callback);
261 Self { callback }
262 }
263}
264
265impl<S> Layer<S> for CallbackLayer
266where
267 S: Subscriber,
268 S: for<'lookup> LookupSpan<'lookup>,
269{
270 fn on_event(&self, event: &Event<'_>, _ctx: Context<'_, S>) {
271 (self.callback)(event);
272 }
273}
274
275#[cfg(feature = "span")]
282pub struct CallbackLayerWithSpan {
283 callback: Box<dyn Fn(&Event<'_>, Option<Vec<JsonMap>>) + Send + Sync + 'static>,
284}
285
286#[cfg(feature = "span")]
287impl CallbackLayerWithSpan {
288 pub fn new<F: Fn(&Event<'_>, Option<Vec<JsonMap>>) + Send + Sync + 'static>(
289 callback: F,
290 ) -> Self {
291 let callback = Box::new(callback);
292 Self { callback }
293 }
294}
295
296#[cfg(feature = "span")]
297impl<S> Layer<S> for CallbackLayerWithSpan
298where
299 S: Subscriber,
300 S: for<'lookup> tracing_subscriber::registry::LookupSpan<'lookup>,
301{
302 fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &tracing::Id, ctx: Context<'_, S>) {
303 let mut visitor = FieldVisitor::default();
304 attrs.record(&mut visitor);
305 if let Some(span) = ctx.span(id) {
306 span.extensions_mut().insert(visitor);
307 }
308 }
309 fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
310 let spans: Option<Vec<_>> = ctx.event_scope(event).map(|scope| {
311 scope
312 .from_root()
313 .map(|span| {
314 let fields: Option<JsonMap> = span
315 .extensions()
316 .get::<FieldVisitor>()
317 .cloned()
318 .map(|FieldVisitor(json_map)| json_map);
319
320 let meta = span.metadata();
321 let mut map = JsonMap::default();
322 map.insert("level".into(), format!("{}", meta.level()).into());
323 map.insert("file".into(), meta.file().into());
324 map.insert("target".into(), meta.target().into());
325 map.insert("line".into(), meta.line().into());
326 map.insert("name".into(), span.name().into());
327 map.insert("fields".into(), fields.into());
328 map
329 })
330 .collect()
331 });
332 (self.callback)(event, spans);
333 }
334}
335
336#[derive(Debug, Clone, serde::Serialize)]
343pub struct OwnedEvent {
344 pub level: TracingLevel,
345 pub file: Option<String>,
346 pub target: String,
347 pub line: Option<u32>,
348 pub name: &'static str,
349 pub message: Option<String>,
350 pub fields: serde_json::Map<String, serde_json::Value>,
351}
352
353#[derive(Debug, Clone, serde::Serialize)]
360pub struct OwnedEventWithSpans {
361 pub event: OwnedEvent,
362 pub spans: Option<Vec<JsonMap>>,
363}
364
365impl OwnedEventWithSpans {
366 pub fn new(event: &Event<'_>, spans: Option<Vec<JsonMap>>) -> Self {
367 OwnedEventWithSpans {
368 event: event.into(),
369 spans,
370 }
371 }
372}
373
374#[derive(Copy, Clone, Debug, Hash, Eq, PartialEq, serde::Serialize)]
378pub enum TracingLevel {
379 Trace = 0,
380 Debug = 1,
381 Info = 2,
382 Warn = 3,
383 Error = 4,
384}
385
386impl From<&tracing::Level> for TracingLevel {
387 fn from(value: &tracing::Level) -> Self {
388 match *value {
389 tracing::Level::TRACE => TracingLevel::Trace,
390 tracing::Level::DEBUG => TracingLevel::Debug,
391 tracing::Level::INFO => TracingLevel::Info,
392 tracing::Level::WARN => TracingLevel::Warn,
393 tracing::Level::ERROR => TracingLevel::Error,
394 }
395 }
396}
397
398impl From<&Event<'_>> for OwnedEvent {
405 fn from(event: &Event<'_>) -> Self {
406 let mut visitor = FieldVisitor::default();
407 event.record(&mut visitor);
408
409 let message = visitor.0.remove("message").and_then(|v| {
412 if let serde_json::Value::String(s) = v {
413 Some(s)
414 } else {
415 None
416 }
417 });
418
419 let meta = event.metadata();
420 Self {
421 name: meta.name(),
422 target: meta.target().into(),
423 level: meta.level().into(),
424 file: meta.file().map(String::from),
425 line: meta.line().clone(),
426 message,
427 fields: visitor.0,
428 }
429 }
430}
431
432#[cfg(feature = "span")]
438#[derive(Default, Clone)]
439struct FieldVisitor(serde_json::Map<String, serde_json::Value>);
440
441impl Visit for FieldVisitor {
442 fn record_bool(&mut self, field: &Field, value: bool) {
443 self.0.insert(field.name().into(), value.into());
444 }
445
446 fn record_f64(&mut self, field: &Field, value: f64) {
447 self.0.insert(field.name().into(), value.into());
448 }
449 fn record_i64(&mut self, field: &Field, value: i64) {
450 self.0.insert(field.name().into(), value.into());
451 }
452 fn record_u64(&mut self, field: &Field, value: u64) {
453 self.0.insert(field.name().into(), value.into());
454 }
455 fn record_str(&mut self, field: &Field, value: &str) {
456 self.0.insert(field.name().into(), value.into());
457 }
458 fn record_debug(&mut self, field: &Field, value: &dyn fmt::Debug) {
459 let text = format!("{:?}", value);
460 self.0.insert(field.name().into(), text.into());
461 }
462}
463
464#[cfg(test)]
471mod tests {
472
473 use {
474 super::*,
475 insta::{
476 internals::{Content, ContentPath},
477 *,
478 },
479 std::sync::{Arc, RwLock},
480 tokio::sync::mpsc,
481 tracing_subscriber::{EnvFilter, prelude::*},
482 };
483
484 fn run_trace_events() {
489 let span = tracing::info_span!("root-info", recurse = 0);
490 span.in_scope(|| {
491 tracing::trace!(foo = 1, bar = 2, "this is a trace message");
492 tracing::debug!(pi = 3.14159265, "this is a debug message");
493 tracing::info!(job = "foo", "this is an info message");
494 tracing::warn!(job = "foo", "this is a warning message");
495 tracing::error!(job = "foo", "this is an error message");
496 });
497 }
498
499 fn extract_events<T: Clone>(logs: &Arc<RwLock<Vec<T>>>) -> Vec<T> {
500 let events = logs.read().expect("could not read events");
501 events.clone()
502 }
503
504 fn redact_name(value: Content, _path: ContentPath) -> String {
505 let s = value.as_str().unwrap_or_default();
506 if s.contains(":") {
507 s.split_once(":")
508 .map(|p| format!("{}:<line>", p.0))
509 .unwrap_or_default()
510 } else {
511 s.to_string()
512 }
513 }
514
515 #[tokio::test]
520 async fn test_callback_layer() {
521 let events = Arc::new(RwLock::new(Vec::<OwnedEvent>::new()));
524 let cb_events = events.clone();
525
526 let _guard = tracing_subscriber::registry()
527 .with(EnvFilter::new("tracing_async2=trace"))
528 .with(callback_layer(move |event| {
529 if let Ok(mut events) = cb_events.write() {
530 events.push(event.into());
531 }
532 }))
533 .set_default();
534
535 run_trace_events();
536
537 assert_json_snapshot!("callback-layer", extract_events(&events), {
538 "[].line" => "<line>",
539 "[].name" => dynamic_redaction(redact_name),
540 });
541 }
542
543 #[cfg(feature = "span")]
544 #[tokio::test]
545 async fn test_callback_layer_with_spans() {
546 let events = Arc::new(RwLock::new(Vec::<OwnedEventWithSpans>::new()));
549 let cb_events = events.clone();
550
551 let _guard = tracing_subscriber::registry()
552 .with(EnvFilter::new("tracing_async2=trace"))
553 .with(callback_layer_with_spans(move |event, spans| {
554 if let Ok(mut events) = cb_events.write() {
555 events.push(OwnedEventWithSpans::new(event, spans));
556 }
557 }))
558 .set_default();
559
560 run_trace_events();
561
562 assert_json_snapshot!("callback-layer-with-spans", extract_events(&events), {
563 "[].event.line" => "<line>",
564 "[].event.name" => dynamic_redaction(redact_name),
565 "[].spans[].line" => "<line>",
566 "[].spans[].name" => dynamic_redaction(redact_name),
567 });
568 }
569
570 #[tokio::test]
571 async fn test_channel_layer() {
572 use std::time::Duration;
574 use tokio::time::sleep;
575
576 let events = Arc::new(RwLock::new(Vec::<OwnedEvent>::new()));
579 let cb_events = events.clone();
580
581 let (tx, mut rx) = mpsc::channel(100);
582
583 let _guard = tracing_subscriber::registry()
584 .with(EnvFilter::new("tracing_async2=trace"))
585 .with(channel_layer(tx))
586 .set_default();
587
588 let handle = tokio::spawn(async move {
589 while let Some(event) = rx.recv().await {
590 if let Ok(mut events) = cb_events.write() {
591 events.push(event);
592 }
593 }
594 });
595
596 run_trace_events();
597 sleep(Duration::from_millis(100)).await;
598 handle.abort();
599
600 assert_json_snapshot!("channel-layer", extract_events(&events), {
601 "[].line" => "<line>",
602 "[].name" => dynamic_redaction(redact_name),
603 });
604 }
605
606 #[cfg(feature = "span")]
607 #[tokio::test]
608 async fn test_channel_layer_with_spans() {
609 use std::time::Duration;
611 use tokio::time::sleep;
612
613 let events = Arc::new(RwLock::new(Vec::<OwnedEventWithSpans>::new()));
616 let cb_events = events.clone();
617
618 let (tx, mut rx) = mpsc::channel(100);
619
620 let _guard = tracing_subscriber::registry()
621 .with(EnvFilter::new("tracing_async2=trace"))
622 .with(channel_layer_with_spans(tx))
623 .set_default();
624
625 let handle = tokio::spawn(async move {
626 while let Some(event) = rx.recv().await {
627 if let Ok(mut events) = cb_events.write() {
628 events.push(event);
629 }
630 }
631 });
632
633 run_trace_events();
634 sleep(Duration::from_millis(100)).await;
635 handle.abort();
636
637 assert_json_snapshot!("channel-layer-with-spans", extract_events(&events), {
638 "[].event.line" => "<line>",
639 "[].event.name" => dynamic_redaction(redact_name),
640 "[].spans[].line" => "<line>",
641 "[].spans[].name" => dynamic_redaction(redact_name),
642 });
643 }
644
645 #[tokio::test]
646 async fn test_async_layer() {
647 use std::time::Duration;
649 use tokio::time::sleep;
650
651 let events = Arc::new(RwLock::new(Vec::<OwnedEvent>::new()));
654 let cb_events = events.clone();
655
656 let _guard = tracing_subscriber::registry()
657 .with(EnvFilter::new("tracing_async2=trace"))
658 .with(async_layer(16, move |event| {
659 let f_events = cb_events.clone();
660 async move {
661 if let Ok(mut events) = f_events.write() {
662 events.push(event);
663 }
664 }
665 }))
666 .set_default();
667
668 run_trace_events();
669 sleep(Duration::from_millis(100)).await;
670
671 assert_json_snapshot!("async-layer", extract_events(&events), {
672 "[].line" => "<line>",
673 "[].name" => dynamic_redaction(redact_name),
674 });
675 }
676
677 #[cfg(feature = "span")]
678 #[tokio::test]
679 async fn test_async_layer_with_spans() {
680 use std::time::Duration;
682 use tokio::time::sleep;
683
684 let events = Arc::new(RwLock::new(Vec::<OwnedEventWithSpans>::new()));
687 let cb_events = events.clone();
688
689 let _guard = tracing_subscriber::registry()
690 .with(EnvFilter::new("tracing_async2=trace"))
691 .with(async_layer_with_spans(16, move |event_with_span| {
692 let f_events = cb_events.clone();
693 async move {
694 if let Ok(mut events) = f_events.write() {
695 events.push(event_with_span);
696 }
697 }
698 }))
699 .set_default();
700
701 run_trace_events();
702 sleep(Duration::from_millis(100)).await;
703
704 assert_json_snapshot!("async-layer-with-spans", extract_events(&events), {
705 "[].event.line" => "<line>",
706 "[].event.name" => dynamic_redaction(redact_name),
707 "[].spans[].line" => "<line>",
708 "[].spans[].name" => dynamic_redaction(redact_name),
709 });
710 }
711
712 #[tokio::test]
713 async fn test_accumulating_layer() {
714 use std::time::Duration;
716 use tokio::time::sleep;
717
718 let events = Arc::new(RwLock::new(Vec::<OwnedEvent>::new()));
721
722 let _guard = tracing_subscriber::registry()
723 .with(EnvFilter::new("tracing_async2=trace"))
724 .with(accumulating_layer(events.clone()))
725 .set_default();
726
727 run_trace_events();
728 sleep(Duration::from_millis(100)).await;
729
730 assert_json_snapshot!("accumulating-layer", extract_events(&events), {
731 "[].line" => "<line>",
732 "[].name" => dynamic_redaction(redact_name),
733 });
734 }
735
736 #[cfg(feature = "span")]
737 #[tokio::test]
738 async fn test_accumulating_layer_with_spans() {
739 use std::time::Duration;
741 use tokio::time::sleep;
742
743 let events = Arc::new(RwLock::new(Vec::<OwnedEventWithSpans>::new()));
746
747 let _guard = tracing_subscriber::registry()
748 .with(EnvFilter::new("tracing_async2=trace"))
749 .with(accumulating_layer_with_spans(events.clone()))
750 .set_default();
751
752 run_trace_events();
753 sleep(Duration::from_millis(100)).await;
754
755 assert_json_snapshot!("accumulating-layer-with-spans", extract_events(&events), {
756 "[].event.line" => "<line>",
757 "[].event.name" => dynamic_redaction(redact_name),
758 "[].spans[].line" => "<line>",
759 "[].spans[].name" => dynamic_redaction(redact_name),
760 });
761 }
762}