use crate::{initialize_transport, TelemetryWorker};
use futures::channel::mpsc;
use libp2p::wasm_ext::ExtTransport;
use parking_lot::Mutex;
use std::convert::TryInto;
use std::io;
use tracing::{Event, Id, Subscriber};
use tracing_subscriber::{layer::Context, registry::LookupSpan, Layer};
pub const TELEMETRY_LOG_SPAN: &str = "telemetry-logger";
#[derive(Debug)]
pub struct TelemetryLayer(Mutex<mpsc::Sender<(Id, u8, String)>>);
impl TelemetryLayer {
pub fn new(
buffer_size: Option<usize>,
telemetry_external_transport: Option<ExtTransport>,
) -> io::Result<(Self, TelemetryWorker)> {
let transport = initialize_transport(telemetry_external_transport)?;
let worker = TelemetryWorker::new(buffer_size.unwrap_or(16), transport);
let sender = worker.message_sender();
Ok((Self(Mutex::new(sender)), worker))
}
}
impl<S> Layer<S> for TelemetryLayer
where
S: Subscriber + for<'a> LookupSpan<'a>,
{
fn on_event(&self, event: &Event<'_>, ctx: Context<S>) {
if event.metadata().target() != TELEMETRY_LOG_SPAN {
return;
}
if let Some(span) = ctx.lookup_current() {
let parents = span.parents();
if let Some(span) = std::iter::once(span)
.chain(parents)
.find(|x| x.name() == TELEMETRY_LOG_SPAN)
{
let id = span.id();
let mut attrs = TelemetryAttrs::new(id.clone());
let mut vis = TelemetryAttrsVisitor(&mut attrs);
event.record(&mut vis);
if let TelemetryAttrs {
verbosity: Some(verbosity),
json: Some(json),
..
} = attrs
{
match self.0.lock().try_send((
id,
verbosity
.try_into()
.expect("telemetry log message verbosity are u8; qed"),
json,
)) {
Err(err) if err.is_full() => eprintln!("Telemetry buffer overflowed!"),
_ => {}
}
} else {
eprintln!(
"missing fields in telemetry log: {:?}. This can happen if \
`tracing::info_span!` is (mis-)used with the telemetry target \
directly; you should use the `telemetry!` macro.",
event,
);
}
}
}
}
}
#[derive(Debug)]
struct TelemetryAttrs {
verbosity: Option<u64>,
json: Option<String>,
id: Id,
}
impl TelemetryAttrs {
fn new(id: Id) -> Self {
Self {
verbosity: None,
json: None,
id,
}
}
}
#[derive(Debug)]
struct TelemetryAttrsVisitor<'a>(&'a mut TelemetryAttrs);
impl<'a> tracing::field::Visit for TelemetryAttrsVisitor<'a> {
fn record_debug(&mut self, _field: &tracing::field::Field, _value: &dyn std::fmt::Debug) {
}
fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
if field.name() == "verbosity" {
(*self.0).verbosity = Some(value);
}
}
fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
if field.name() == "json" {
(*self.0).json = Some(format!(
r#"{{"id":{},"ts":{:?},"payload":{}}}"#,
self.0.id.into_u64(),
chrono::Local::now().to_rfc3339().to_string(),
value,
));
}
}
}