tc_telemetry/
lib.rs

1// This file is part of Tetcore.
2
3// Copyright (C) 2017-2021 Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Tetcore's client telemetry is a part of tetcore that allows ingesting telemetry data
20//! with for example [Polkadot telemetry](https://github.com/tetcoin/tetcore-telemetry).
21//!
22//! It works using Tokio's [tracing](https://github.com/tokio-rs/tracing/) library. The telemetry
23//! information uses tracing's logging to report the telemetry data which is then retrieved by a
24//! tracing `Layer`. This layer will then send the data through an asynchronous channel to a
25//! background task called [`TelemetryWorker`] which will send the information to the configured
26//! remote telemetry servers.
27//!
28//! If multiple tetcore nodes are running in the same process, it uses a `tracing::Span` to
29//! identify which tetcore node is reporting the telemetry. Every task spawned using tc-service's
30//! `TaskManager` automatically inherit this span.
31//!
32//! Tetcore's nodes initialize/register with the [`TelemetryWorker`] using a [`TelemetryHandle`].
33//! This handle can be cloned and passed around. It uses an asynchronous channel to communicate with
34//! the running [`TelemetryWorker`] dedicated to registration. Registering can happen at any point
35//! in time during the process execution.
36
37#![warn(missing_docs)]
38
39use futures::{channel::mpsc, prelude::*};
40use tetsy_libp2p::Multiaddr;
41use log::{error, warn};
42use serde::Serialize;
43use tetcore_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver};
44use std::collections::HashMap;
45use tracing::Id;
46
47pub use tetsy_libp2p::wasm_ext::ExtTransport;
48pub use serde_json;
49pub use tracing;
50
51mod endpoints;
52mod layer;
53mod node;
54mod transport;
55
56pub use endpoints::*;
57pub use layer::*;
58use node::*;
59use transport::*;
60
61/// Tetcore DEBUG log level.
62pub const TETCORE_DEBUG: u8 = 9;
63/// Tetcore INFO log level.
64pub const TETCORE_INFO: u8 = 0;
65
66/// Consensus TRACE log level.
67pub const CONSENSUS_TRACE: u8 = 9;
68/// Consensus DEBUG log level.
69pub const CONSENSUS_DEBUG: u8 = 5;
70/// Consensus WARN log level.
71pub const CONSENSUS_WARN: u8 = 4;
72/// Consensus INFO log level.
73pub const CONSENSUS_INFO: u8 = 1;
74
75pub(crate) type TelemetryMessage = (Id, u8, String);
76
77/// A handle representing a telemetry span, with the capability to enter the span if it exists.
78#[derive(Debug, Clone)]
79pub struct TelemetrySpan(tracing::Span);
80
81impl TelemetrySpan {
82	/// Enters this span, returning a guard that will exit the span when dropped.
83	pub fn enter(&self) -> tracing::span::Entered {
84		self.0.enter()
85	}
86
87	/// Constructs a new [`TelemetrySpan`].
88	pub fn new() -> Self {
89		Self(tracing::info_span!(TELEMETRY_LOG_SPAN))
90	}
91
92	/// Return a clone of the underlying `tracing::Span` instance.
93	pub fn span(&self) -> tracing::Span {
94		self.0.clone()
95	}
96}
97
98/// Message sent when the connection (re-)establishes.
99#[derive(Debug, Serialize)]
100pub struct ConnectionMessage {
101	/// Node's name.
102	pub name: String,
103	/// Node's implementation.
104	pub implementation: String,
105	/// Node's version.
106	pub version: String,
107	/// Node's configuration.
108	pub config: String,
109	/// Node's chain.
110	pub chain: String,
111	/// Node's genesis hash.
112	pub genesis_hash: String,
113	/// Node is an authority.
114	pub authority: bool,
115	/// Node's startup time.
116	pub startup_time: String,
117	/// Node's network ID.
118	pub network_id: String,
119}
120
121/// Telemetry worker.
122///
123/// It should run as a background task using the [`TelemetryWorker::run`] method. This method
124/// will consume the object and any further attempts of initializing a new telemetry through its
125/// handle will fail (without being fatal).
126#[derive(Debug)]
127pub struct TelemetryWorker {
128	message_receiver: mpsc::Receiver<TelemetryMessage>,
129	message_sender: mpsc::Sender<TelemetryMessage>,
130	register_receiver: mpsc::UnboundedReceiver<Register>,
131	register_sender: mpsc::UnboundedSender<Register>,
132	transport: WsTrans,
133}
134
135impl TelemetryWorker {
136	pub(crate) fn new(buffer_size: usize, transport: WsTrans) -> Self {
137		let (message_sender, message_receiver) = mpsc::channel(buffer_size);
138		let (register_sender, register_receiver) = mpsc::unbounded();
139
140		Self {
141			message_receiver,
142			message_sender,
143			register_receiver,
144			register_sender,
145			transport,
146		}
147	}
148
149	/// Get a new [`TelemetryHandle`].
150	///
151	/// This is used when you want to register with the [`TelemetryWorker`].
152	pub fn handle(&self) -> TelemetryHandle {
153		TelemetryHandle {
154			message_sender: self.register_sender.clone(),
155		}
156	}
157
158	/// Get a clone of the channel's `Sender` used to send telemetry events.
159	pub(crate) fn message_sender(&self) -> mpsc::Sender<TelemetryMessage> {
160		self.message_sender.clone()
161	}
162
163	/// Run the telemetry worker.
164	///
165	/// This should be run in a background task.
166	pub async fn run(self) {
167		let Self {
168			mut message_receiver,
169			message_sender: _,
170			mut register_receiver,
171			register_sender: _,
172			transport,
173		} = self;
174
175		let mut node_map: HashMap<Id, Vec<(u8, Multiaddr)>> = HashMap::new();
176		let mut node_pool: HashMap<Multiaddr, _> = HashMap::new();
177
178		loop {
179			futures::select! {
180				message = message_receiver.next() => Self::process_message(
181					message,
182					&mut node_pool,
183					&node_map,
184				).await,
185				init_payload = register_receiver.next() => Self::process_register(
186					init_payload,
187					&mut node_pool,
188					&mut node_map,
189					transport.clone(),
190				).await,
191			}
192		}
193	}
194
195	async fn process_register(
196		input: Option<Register>,
197		node_pool: &mut HashMap<Multiaddr, Node<WsTrans>>,
198		node_map: &mut HashMap<Id, Vec<(u8, Multiaddr)>>,
199		transport: WsTrans,
200	) {
201		let input = input.expect("the stream is never closed; qed");
202
203		match input {
204			Register::Telemetry {
205				id,
206				endpoints,
207				connection_message,
208			} => {
209				let endpoints = endpoints.0;
210
211				let connection_message = match serde_json::to_value(&connection_message) {
212					Ok(serde_json::Value::Object(mut value)) => {
213						value.insert("msg".into(), "system.connected".into());
214						let mut obj = serde_json::Map::new();
215						obj.insert("id".to_string(), id.into_u64().into());
216						obj.insert("payload".to_string(), value.into());
217						Some(obj)
218					}
219					Ok(_) => {
220						unreachable!("ConnectionMessage always serialize to an object; qed")
221					}
222					Err(err) => {
223						log::error!(
224							target: "telemetry",
225							"Could not serialize connection message: {}",
226							err,
227						);
228						None
229					}
230				};
231
232				for (addr, verbosity) in endpoints {
233					node_map
234						.entry(id.clone())
235						.or_default()
236						.push((verbosity, addr.clone()));
237
238					let node = node_pool.entry(addr.clone()).or_insert_with(|| {
239						Node::new(transport.clone(), addr.clone(), Vec::new(), Vec::new())
240					});
241
242					node.connection_messages.extend(connection_message.clone());
243				}
244			}
245			Register::Notifier {
246				addresses,
247				connection_notifier,
248			} => {
249				for addr in addresses {
250					if let Some(node) = node_pool.get_mut(&addr) {
251						node.telemetry_connection_notifier
252							.push(connection_notifier.clone());
253					} else {
254						log::error!(
255							target: "telemetry",
256							"Received connection notifier for unknown node ({}). This is a bug.",
257							addr,
258						);
259					}
260				}
261			}
262		}
263	}
264
265	// dispatch messages to the telemetry nodes
266	async fn process_message(
267		input: Option<TelemetryMessage>,
268		node_pool: &mut HashMap<Multiaddr, Node<WsTrans>>,
269		node_map: &HashMap<Id, Vec<(u8, Multiaddr)>>,
270	) {
271		let (id, verbosity, message) = input.expect("the stream is never closed; qed");
272
273		let nodes = if let Some(nodes) = node_map.get(&id) {
274			nodes
275		} else {
276			// This is a normal error because the telemetry span is entered before the telemetry
277			// is initialized so it is possible that some messages in the beginning don't get
278			// through.
279			log::trace!(
280				target: "telemetry",
281				"Received telemetry log for unknown id ({:?}): {}",
282				id,
283				message,
284			);
285			return;
286		};
287
288		for (node_max_verbosity, addr) in nodes {
289			if verbosity > *node_max_verbosity {
290				log::trace!(
291					target: "telemetry",
292					"Skipping {} for log entry with verbosity {:?}",
293					addr,
294					verbosity,
295				);
296				continue;
297			}
298
299			if let Some(node) = node_pool.get_mut(&addr) {
300				let _ = node.send(message.clone()).await;
301			} else {
302				log::error!(
303					target: "telemetry",
304					"Received message for unknown node ({}). This is a bug. \
305					Message sent: {}",
306					addr,
307					message,
308				);
309			}
310		}
311	}
312}
313
314/// Handle to the [`TelemetryWorker`] thats allows initializing the telemetry for a Tetcore node.
315#[derive(Debug, Clone)]
316pub struct TelemetryHandle {
317	message_sender: mpsc::UnboundedSender<Register>,
318}
319
320impl TelemetryHandle {
321	/// Initialize the telemetry with the endpoints provided in argument for the current tetcore
322	/// node.
323	///
324	/// This method must be called during the tetcore node initialization.
325	///
326	/// The `endpoints` argument is a collection of telemetry WebSocket servers with a corresponding
327	/// verbosity level.
328	///
329	/// The `connection_message` argument is a JSON object that is sent every time the connection
330	/// (re-)establishes.
331	pub fn start_telemetry(
332		&mut self,
333		span: TelemetrySpan,
334		endpoints: TelemetryEndpoints,
335		connection_message: ConnectionMessage,
336	) -> TelemetryConnectionNotifier {
337		let Self { message_sender } = self;
338
339		let connection_notifier = TelemetryConnectionNotifier {
340			message_sender: message_sender.clone(),
341			addresses: endpoints.0.iter().map(|(addr, _)| addr.clone()).collect(),
342		};
343
344		match span.0.id() {
345			Some(id) => {
346				match message_sender.unbounded_send(Register::Telemetry {
347					id,
348					endpoints,
349					connection_message,
350				}) {
351					Ok(()) => {}
352					Err(err) => error!(
353						target: "telemetry",
354						"Could not initialize telemetry: \
355						the telemetry is probably already running: {}",
356						err,
357					),
358				}
359			}
360			None => error!(
361				target: "telemetry",
362				"Could not initialize telemetry: the span could not be entered",
363			),
364		}
365
366		connection_notifier
367	}
368}
369
370/// Used to create a stream of events with only one event: when a telemetry connection
371/// (re-)establishes.
372#[derive(Clone, Debug)]
373pub struct TelemetryConnectionNotifier {
374	message_sender: mpsc::UnboundedSender<Register>,
375	addresses: Vec<Multiaddr>,
376}
377
378impl TelemetryConnectionNotifier {
379	/// Get event stream for telemetry connection established events.
380	///
381	/// This function will return an error if the telemetry has already been started by
382	/// [`TelemetryHandle::start_telemetry`].
383	pub fn on_connect_stream(&self) -> TracingUnboundedReceiver<()> {
384		let (message_sender, message_receiver) = tracing_unbounded("mptc_telemetry_on_connect");
385		if let Err(err) = self.message_sender.unbounded_send(Register::Notifier {
386			addresses: self.addresses.clone(),
387			connection_notifier: message_sender,
388		}) {
389			error!(
390				target: "telemetry",
391				"Could not create a telemetry connection notifier: \
392				the telemetry is probably already running: {}",
393				err,
394			);
395		}
396		message_receiver
397	}
398}
399
400#[derive(Debug)]
401enum Register {
402	Telemetry {
403		id: Id,
404		endpoints: TelemetryEndpoints,
405		connection_message: ConnectionMessage,
406	},
407	Notifier {
408		addresses: Vec<Multiaddr>,
409		connection_notifier: ConnectionNotifierSender,
410	},
411}
412
413/// Report a telemetry.
414///
415/// Translates to [`tracing::info`], but contains an additional verbosity parameter which the log
416/// record is tagged with. Additionally the verbosity parameter is added to the record as a
417/// key-value pair.
418///
419/// # Example
420///
421/// ```no_run
422/// # use tc_telemetry::*;
423/// # let authority_id = 42_u64;
424/// # let set_id = (43_u64, 44_u64);
425/// # let authorities = vec![45_u64];
426/// telemetry!(CONSENSUS_INFO; "afg.authority_set";
427/// 	"authority_id" => authority_id.to_string(),
428/// 	"authority_set_id" => ?set_id,
429/// 	"authorities" => authorities,
430/// );
431/// ```
432#[macro_export(local_inner_macros)]
433macro_rules! telemetry {
434	( $verbosity:expr; $msg:expr; $( $t:tt )* ) => {{
435		let verbosity: u8 = $verbosity;
436		match format_fields_to_json!($($t)*) {
437			Err(err) => {
438				$crate::tracing::error!(
439					target: "telemetry",
440					"Could not serialize value for telemetry: {}",
441					err,
442				);
443			},
444			Ok(mut json) => {
445				// NOTE: the span id will be added later in the JSON for the greater good
446				json.insert("msg".into(), $msg.into());
447				let serialized_json = $crate::serde_json::to_string(&json)
448					.expect("contains only string keys; qed");
449				$crate::tracing::info!(target: $crate::TELEMETRY_LOG_SPAN,
450					verbosity,
451					json = serialized_json.as_str(),
452				);
453			},
454		}
455	}};
456}
457
458#[macro_export(local_inner_macros)]
459#[doc(hidden)]
460macro_rules! format_fields_to_json {
461	( $k:literal => $v:expr $(,)? $(, $($t:tt)+ )? ) => {{
462		$crate::serde_json::to_value(&$v)
463			.map(|value| {
464				let mut map = $crate::serde_json::Map::new();
465				map.insert($k.into(), value);
466				map
467			})
468			$(
469				.and_then(|mut prev_map| {
470					format_fields_to_json!($($t)*)
471						.map(move |mut other_map| {
472							prev_map.append(&mut other_map);
473							prev_map
474						})
475				})
476			)*
477	}};
478	( $k:literal => ? $v:expr $(,)? $(, $($t:tt)+ )? ) => {{
479		let mut map = $crate::serde_json::Map::new();
480		map.insert($k.into(), std::format!("{:?}", &$v).into());
481		$crate::serde_json::Result::Ok(map)
482		$(
483			.and_then(|mut prev_map| {
484				format_fields_to_json!($($t)*)
485					.map(move |mut other_map| {
486						prev_map.append(&mut other_map);
487						prev_map
488					})
489			})
490		)*
491	}};
492}