tc_telemetry/
layer.rs

1// This file is part of Tetcore.
2
3// Copyright (C) 2021 Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19use crate::{initialize_transport, TelemetryWorker};
20use futures::channel::mpsc;
21use tetsy_libp2p::wasm_ext::ExtTransport;
22use parking_lot::Mutex;
23use std::convert::TryInto;
24use std::io;
25use tracing::{Event, Id, Subscriber};
26use tracing_subscriber::{layer::Context, registry::LookupSpan, Layer};
27
28/// Span name used to report the telemetry.
29pub const TELEMETRY_LOG_SPAN: &str = "telemetry-logger";
30
31/// `Layer` that handles the logs for telemetries.
32#[derive(Debug)]
33pub struct TelemetryLayer(Mutex<mpsc::Sender<(Id, u8, String)>>);
34
35impl TelemetryLayer {
36	/// Create a new [`TelemetryLayer`] and [`TelemetryWorker`].
37	///
38	/// The `buffer_size` defaults to 16.
39	///
40	/// The [`ExtTransport`] is used in WASM contexts where we need some binding between the
41	/// networking provided by the operating system or environment and libp2p.
42	///
43	/// > **Important**: Each individual call to `write` corresponds to one message. There is no
44	/// >                internal buffering going on. In the context of WebSockets, each `write`
45	/// >                must be one individual WebSockets frame.
46	pub fn new(
47		buffer_size: Option<usize>,
48		telemetry_external_transport: Option<ExtTransport>,
49	) -> io::Result<(Self, TelemetryWorker)> {
50		let transport = initialize_transport(telemetry_external_transport)?;
51		let worker = TelemetryWorker::new(buffer_size.unwrap_or(16), transport);
52		let sender = worker.message_sender();
53		Ok((Self(Mutex::new(sender)), worker))
54	}
55}
56
57impl<S> Layer<S> for TelemetryLayer
58where
59	S: Subscriber + for<'a> LookupSpan<'a>,
60{
61	fn on_event(&self, event: &Event<'_>, ctx: Context<S>) {
62		if event.metadata().target() != TELEMETRY_LOG_SPAN {
63			return;
64		}
65
66		if let Some(span) = ctx.lookup_current() {
67			let parents = span.parents();
68
69			if let Some(span) = std::iter::once(span)
70				.chain(parents)
71				.find(|x| x.name() == TELEMETRY_LOG_SPAN)
72			{
73				let id = span.id();
74				let mut attrs = TelemetryAttrs::new(id.clone());
75				let mut vis = TelemetryAttrsVisitor(&mut attrs);
76				event.record(&mut vis);
77
78				if let TelemetryAttrs {
79					verbosity: Some(verbosity),
80					json: Some(json),
81					..
82				} = attrs
83				{
84					match self.0.lock().try_send((
85						id,
86						verbosity
87							.try_into()
88							.expect("telemetry log message verbosity are u8; qed"),
89						json,
90					)) {
91						Err(err) if err.is_full() => eprintln!("Telemetry buffer overflowed!"),
92						_ => {}
93					}
94				} else {
95					// NOTE: logging in this function doesn't work
96					eprintln!(
97						"missing fields in telemetry log: {:?}. This can happen if \
98						`tracing::info_span!` is (mis-)used with the telemetry target \
99						directly; you should use the `telemetry!` macro.",
100						event,
101					);
102				}
103			}
104		}
105	}
106}
107
108#[derive(Debug)]
109struct TelemetryAttrs {
110	verbosity: Option<u64>,
111	json: Option<String>,
112	id: Id,
113}
114
115impl TelemetryAttrs {
116	fn new(id: Id) -> Self {
117		Self {
118			verbosity: None,
119			json: None,
120			id,
121		}
122	}
123}
124
125#[derive(Debug)]
126struct TelemetryAttrsVisitor<'a>(&'a mut TelemetryAttrs);
127
128impl<'a> tracing::field::Visit for TelemetryAttrsVisitor<'a> {
129	fn record_debug(&mut self, _field: &tracing::field::Field, _value: &dyn std::fmt::Debug) {
130		// noop
131	}
132
133	fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
134		if field.name() == "verbosity" {
135			(*self.0).verbosity = Some(value);
136		}
137	}
138
139	fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
140		if field.name() == "json" {
141			(*self.0).json = Some(format!(
142				r#"{{"id":{},"ts":{:?},"payload":{}}}"#,
143				self.0.id.into_u64(),
144				chrono::Local::now().to_rfc3339().to_string(),
145				value,
146			));
147		}
148	}
149}