1use crate::{initialize_transport, TelemetryWorker};
20use futures::channel::mpsc;
21use tetsy_libp2p::wasm_ext::ExtTransport;
22use parking_lot::Mutex;
23use std::convert::TryInto;
24use std::io;
25use tracing::{Event, Id, Subscriber};
26use tracing_subscriber::{layer::Context, registry::LookupSpan, Layer};
27
28pub const TELEMETRY_LOG_SPAN: &str = "telemetry-logger";
30
31#[derive(Debug)]
33pub struct TelemetryLayer(Mutex<mpsc::Sender<(Id, u8, String)>>);
34
35impl TelemetryLayer {
36 pub fn new(
47 buffer_size: Option<usize>,
48 telemetry_external_transport: Option<ExtTransport>,
49 ) -> io::Result<(Self, TelemetryWorker)> {
50 let transport = initialize_transport(telemetry_external_transport)?;
51 let worker = TelemetryWorker::new(buffer_size.unwrap_or(16), transport);
52 let sender = worker.message_sender();
53 Ok((Self(Mutex::new(sender)), worker))
54 }
55}
56
57impl<S> Layer<S> for TelemetryLayer
58where
59 S: Subscriber + for<'a> LookupSpan<'a>,
60{
61 fn on_event(&self, event: &Event<'_>, ctx: Context<S>) {
62 if event.metadata().target() != TELEMETRY_LOG_SPAN {
63 return;
64 }
65
66 if let Some(span) = ctx.lookup_current() {
67 let parents = span.parents();
68
69 if let Some(span) = std::iter::once(span)
70 .chain(parents)
71 .find(|x| x.name() == TELEMETRY_LOG_SPAN)
72 {
73 let id = span.id();
74 let mut attrs = TelemetryAttrs::new(id.clone());
75 let mut vis = TelemetryAttrsVisitor(&mut attrs);
76 event.record(&mut vis);
77
78 if let TelemetryAttrs {
79 verbosity: Some(verbosity),
80 json: Some(json),
81 ..
82 } = attrs
83 {
84 match self.0.lock().try_send((
85 id,
86 verbosity
87 .try_into()
88 .expect("telemetry log message verbosity are u8; qed"),
89 json,
90 )) {
91 Err(err) if err.is_full() => eprintln!("Telemetry buffer overflowed!"),
92 _ => {}
93 }
94 } else {
95 eprintln!(
97 "missing fields in telemetry log: {:?}. This can happen if \
98 `tracing::info_span!` is (mis-)used with the telemetry target \
99 directly; you should use the `telemetry!` macro.",
100 event,
101 );
102 }
103 }
104 }
105 }
106}
107
108#[derive(Debug)]
109struct TelemetryAttrs {
110 verbosity: Option<u64>,
111 json: Option<String>,
112 id: Id,
113}
114
115impl TelemetryAttrs {
116 fn new(id: Id) -> Self {
117 Self {
118 verbosity: None,
119 json: None,
120 id,
121 }
122 }
123}
124
125#[derive(Debug)]
126struct TelemetryAttrsVisitor<'a>(&'a mut TelemetryAttrs);
127
128impl<'a> tracing::field::Visit for TelemetryAttrsVisitor<'a> {
129 fn record_debug(&mut self, _field: &tracing::field::Field, _value: &dyn std::fmt::Debug) {
130 }
132
133 fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
134 if field.name() == "verbosity" {
135 (*self.0).verbosity = Some(value);
136 }
137 }
138
139 fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
140 if field.name() == "json" {
141 (*self.0).json = Some(format!(
142 r#"{{"id":{},"ts":{:?},"payload":{}}}"#,
143 self.0.id.into_u64(),
144 chrono::Local::now().to_rfc3339().to_string(),
145 value,
146 ));
147 }
148 }
149}