Skip to main content

soil_telemetry/
lib.rs

1// This file is part of Soil.
2
3// Copyright (C) Soil contributors.
4// Copyright (C) Parity Technologies (UK) Ltd.
5// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
6
7//! Substrate's client telemetry is a part of substrate that allows ingesting telemetry data
8//! with for example [Polkadot telemetry](https://github.com/paritytech/substrate-telemetry).
9//!
10//! It works using Tokio's [tracing](https://github.com/tokio-rs/tracing/) library. The telemetry
11//! information uses tracing's logging to report the telemetry data which is then retrieved by a
12//! tracing `Layer`. This layer will then send the data through an asynchronous channel to a
13//! background task called [`TelemetryWorker`] which will send the information to the configured
14//! remote telemetry servers.
15//!
16//! If multiple substrate nodes are running in the same process, it uses a `tracing::Span` to
17//! identify which substrate node is reporting the telemetry. Every task spawned using soil-service's
18//! `TaskManager` automatically inherit this span.
19//!
20//! Substrate's nodes initialize/register with the [`TelemetryWorker`] using a
21//! [`TelemetryWorkerHandle`]. This handle can be cloned and passed around. It uses an asynchronous
22//! channel to communicate with the running [`TelemetryWorker`] dedicated to registration.
23//! Registering can happen at any point in time during the process execution.
24
25#![warn(missing_docs)]
26#![cfg_attr(not(feature = "std"), no_std)]
27
28#[cfg(feature = "std")]
29use futures::{channel::mpsc, prelude::*};
30#[cfg(feature = "std")]
31use libp2p::Multiaddr;
32#[cfg(feature = "std")]
33use log::{error, warn};
34#[cfg(feature = "std")]
35use parking_lot::Mutex;
36#[cfg(feature = "std")]
37use serde::Serialize;
38#[cfg(feature = "std")]
39use soil_client::utils::mpsc::{
40	tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender,
41};
42use std::{
43	collections::{
44		hash_map::Entry::{Occupied, Vacant},
45		HashMap,
46	},
47	sync::{atomic, Arc},
48};
49
50#[cfg(feature = "std")]
51pub use log;
52#[cfg(feature = "std")]
53pub use serde_json;
54
55#[cfg(feature = "std")]
56mod endpoints;
57#[cfg(feature = "std")]
58mod error;
59#[cfg(feature = "std")]
60mod node;
61#[cfg(feature = "std")]
62mod transport;
63
64#[cfg(feature = "std")]
65pub use endpoints::*;
66#[cfg(feature = "std")]
67pub use error::*;
68#[cfg(feature = "std")]
69use node::*;
70#[cfg(feature = "std")]
71use transport::*;
72
73/// Substrate DEBUG log level.
74#[cfg(feature = "std")]
75pub const SUBSTRATE_DEBUG: VerbosityLevel = 9;
76/// Substrate INFO log level.
77#[cfg(feature = "std")]
78pub const SUBSTRATE_INFO: VerbosityLevel = 0;
79
80/// Consensus TRACE log level.
81#[cfg(feature = "std")]
82pub const CONSENSUS_TRACE: VerbosityLevel = 9;
83/// Consensus DEBUG log level.
84#[cfg(feature = "std")]
85pub const CONSENSUS_DEBUG: VerbosityLevel = 5;
86/// Consensus WARN log level.
87#[cfg(feature = "std")]
88pub const CONSENSUS_WARN: VerbosityLevel = 4;
89/// Consensus INFO log level.
90#[cfg(feature = "std")]
91pub const CONSENSUS_INFO: VerbosityLevel = 1;
92
93/// Telemetry message verbosity.
94#[cfg(feature = "std")]
95pub type VerbosityLevel = u8;
96
97pub(crate) type Id = u64;
98pub(crate) type TelemetryPayload = serde_json::Map<String, serde_json::Value>;
99pub(crate) type TelemetryMessage = (Id, VerbosityLevel, TelemetryPayload);
100
101/// Message sent when the connection (re-)establishes.
102#[derive(Debug, Serialize)]
103#[cfg(feature = "std")]
104pub struct ConnectionMessage {
105	/// Node's name.
106	pub name: String,
107	/// Node's implementation.
108	pub implementation: String,
109	/// Node's version.
110	pub version: String,
111	/// Node's configuration.
112	pub config: String,
113	/// Node's chain.
114	pub chain: String,
115	/// Node's genesis hash.
116	pub genesis_hash: String,
117	/// Node is an authority.
118	pub authority: bool,
119	/// Node's startup time.
120	pub startup_time: String,
121	/// Node's network ID.
122	pub network_id: String,
123
124	/// Node's OS.
125	pub target_os: String,
126
127	/// Node's ISA.
128	pub target_arch: String,
129
130	/// Node's target platform ABI or libc.
131	pub target_env: String,
132
133	/// Node's software and hardware information.
134	pub sysinfo: Option<SysInfo>,
135}
136
137/// Hardware and software information for the node.
138///
139/// Gathering most of this information is highly OS-specific,
140/// so most of the fields here are optional.
141#[derive(Debug, Serialize)]
142#[cfg(feature = "std")]
143pub struct SysInfo {
144	/// The exact CPU model.
145	pub cpu: Option<String>,
146	/// The total amount of memory, in bytes.
147	pub memory: Option<u64>,
148	/// The number of physical CPU cores.
149	pub core_count: Option<u32>,
150	/// The Linux kernel version.
151	pub linux_kernel: Option<String>,
152	/// The exact Linux distribution used.
153	pub linux_distro: Option<String>,
154	/// Whether the node's running under a virtual machine.
155	pub is_virtual_machine: Option<bool>,
156}
157
158/// Telemetry worker.
159///
160/// It should run as a background task using the [`TelemetryWorker::run`] method. This method
161/// will consume the object and any further attempts of initializing a new telemetry through its
162/// handle will fail (without being fatal).
163#[derive(Debug)]
164#[cfg(feature = "std")]
165pub struct TelemetryWorker {
166	message_receiver: mpsc::Receiver<TelemetryMessage>,
167	message_sender: mpsc::Sender<TelemetryMessage>,
168	register_receiver: TracingUnboundedReceiver<Register>,
169	register_sender: TracingUnboundedSender<Register>,
170	id_counter: Arc<atomic::AtomicU64>,
171}
172
173#[cfg(feature = "std")]
174impl TelemetryWorker {
175	/// Instantiate a new [`TelemetryWorker`] which can run in background.
176	///
177	/// Only one is needed per process.
178	pub fn new(buffer_size: usize) -> Result<Self> {
179		// Let's try to initialize a transport to get an early return.
180		// Later transport will be initialized multiple times in
181		// `::process_register`, so it's a convenient way to get an
182		// error as early as possible.
183		let _transport = initialize_transport()?;
184		let (message_sender, message_receiver) = mpsc::channel(buffer_size);
185		let (register_sender, register_receiver) =
186			tracing_unbounded("mpsc_telemetry_register", 10_000);
187
188		Ok(Self {
189			message_receiver,
190			message_sender,
191			register_receiver,
192			register_sender,
193			id_counter: Arc::new(atomic::AtomicU64::new(1)),
194		})
195	}
196
197	/// Get a new [`TelemetryWorkerHandle`].
198	///
199	/// This is used when you want to register with the [`TelemetryWorker`].
200	pub fn handle(&self) -> TelemetryWorkerHandle {
201		TelemetryWorkerHandle {
202			message_sender: self.message_sender.clone(),
203			register_sender: self.register_sender.clone(),
204			id_counter: self.id_counter.clone(),
205		}
206	}
207
208	/// Run the telemetry worker.
209	///
210	/// This should be run in a background task.
211	pub async fn run(mut self) {
212		let mut node_map: HashMap<Id, Vec<(VerbosityLevel, Multiaddr)>> = HashMap::new();
213		let mut node_pool: HashMap<Multiaddr, _> = HashMap::new();
214		let mut pending_connection_notifications: Vec<_> = Vec::new();
215
216		loop {
217			futures::select! {
218				message = self.message_receiver.next() => Self::process_message(
219					message,
220					&mut node_pool,
221					&node_map,
222				).await,
223				init_payload = self.register_receiver.next() => Self::process_register(
224					init_payload,
225					&mut node_pool,
226					&mut node_map,
227					&mut pending_connection_notifications,
228				).await,
229			}
230		}
231	}
232
233	async fn process_register(
234		input: Option<Register>,
235		node_pool: &mut HashMap<Multiaddr, Node<WsTrans>>,
236		node_map: &mut HashMap<Id, Vec<(VerbosityLevel, Multiaddr)>>,
237		pending_connection_notifications: &mut Vec<(Multiaddr, ConnectionNotifierSender)>,
238	) {
239		let input = input.expect("the stream is never closed; qed");
240
241		match input {
242			Register::Telemetry { id, endpoints, connection_message } => {
243				let endpoints = endpoints.0;
244
245				let connection_message = match serde_json::to_value(&connection_message) {
246					Ok(serde_json::Value::Object(mut value)) => {
247						value.insert("msg".into(), "system.connected".into());
248						let mut obj = serde_json::Map::new();
249						obj.insert("id".to_string(), id.into());
250						obj.insert("payload".to_string(), value.into());
251						Some(obj)
252					},
253					Ok(_) => {
254						unreachable!("ConnectionMessage always serialize to an object; qed")
255					},
256					Err(err) => {
257						log::error!(
258							target: "telemetry",
259							"Could not serialize connection message: {}",
260							err,
261						);
262						None
263					},
264				};
265
266				for (addr, verbosity) in endpoints {
267					log::trace!(
268						target: "telemetry",
269						"Initializing telemetry for: {:?}",
270						addr,
271					);
272					node_map.entry(id).or_default().push((verbosity, addr.clone()));
273
274					let node = match node_pool.entry(addr.clone()) {
275						Occupied(entry) => entry.into_mut(),
276						Vacant(entry) => {
277							let transport = initialize_transport();
278							let transport = match transport {
279								Ok(t) => t,
280								Err(err) => {
281									log::error!(
282										target: "telemetry",
283										"Could not initialise transport: {}",
284										err,
285									);
286									continue;
287								},
288							};
289							entry.insert(Node::new(transport, addr.clone(), Vec::new(), Vec::new()))
290						},
291					};
292
293					node.connection_messages.extend(connection_message.clone());
294
295					pending_connection_notifications.retain(|(addr_b, connection_message)| {
296						if *addr_b == addr {
297							node.telemetry_connection_notifier.push(connection_message.clone());
298							false
299						} else {
300							true
301						}
302					});
303				}
304			},
305			Register::Notifier { addresses, connection_notifier } => {
306				for addr in addresses {
307					// If the Node has been initialized, we directly push the connection_notifier.
308					// Otherwise we push it to a queue that will be consumed when the connection
309					// initializes, thus ensuring that the connection notifier will be sent to the
310					// Node when it becomes available.
311					if let Some(node) = node_pool.get_mut(&addr) {
312						node.telemetry_connection_notifier.push(connection_notifier.clone());
313					} else {
314						pending_connection_notifications.push((addr, connection_notifier.clone()));
315					}
316				}
317			},
318		}
319	}
320
321	// dispatch messages to the telemetry nodes
322	async fn process_message(
323		input: Option<TelemetryMessage>,
324		node_pool: &mut HashMap<Multiaddr, Node<WsTrans>>,
325		node_map: &HashMap<Id, Vec<(VerbosityLevel, Multiaddr)>>,
326	) {
327		let (id, verbosity, payload) = input.expect("the stream is never closed; qed");
328
329		let ts = chrono::Local::now().to_rfc3339();
330		let mut message = serde_json::Map::new();
331		message.insert("id".into(), id.into());
332		message.insert("ts".into(), ts.into());
333		message.insert("payload".into(), payload.into());
334
335		let nodes = if let Some(nodes) = node_map.get(&id) {
336			nodes
337		} else {
338			// This is a normal error because the telemetry ID exists before the telemetry is
339			// initialized.
340			log::trace!(
341				target: "telemetry",
342				"Received telemetry log for unknown id ({:?}): {}",
343				id,
344				serde_json::to_string(&message)
345					.unwrap_or_else(|err| format!(
346						"could not be serialized ({}): {:?}",
347						err,
348						message,
349					)),
350			);
351			return;
352		};
353
354		for (node_max_verbosity, addr) in nodes {
355			if verbosity > *node_max_verbosity {
356				continue;
357			}
358
359			if let Some(node) = node_pool.get_mut(addr) {
360				let _ = node.send(message.clone()).await;
361			} else {
362				log::debug!(
363					target: "telemetry",
364					"Received message for unknown node ({}). This is a bug. \
365					Message sent: {}",
366					addr,
367					serde_json::to_string(&message)
368						.unwrap_or_else(|err| format!(
369							"could not be serialized ({}): {:?}",
370							err,
371							message,
372						)),
373				);
374			}
375		}
376	}
377}
378
379/// Handle to the [`TelemetryWorker`] thats allows initializing the telemetry for a Substrate node.
380#[derive(Debug, Clone)]
381#[cfg(feature = "std")]
382pub struct TelemetryWorkerHandle {
383	message_sender: mpsc::Sender<TelemetryMessage>,
384	register_sender: TracingUnboundedSender<Register>,
385	id_counter: Arc<atomic::AtomicU64>,
386}
387
388#[cfg(feature = "std")]
389impl TelemetryWorkerHandle {
390	/// Instantiate a new [`Telemetry`] object.
391	pub fn new_telemetry(&mut self, endpoints: TelemetryEndpoints) -> Telemetry {
392		let addresses = endpoints.0.iter().map(|(addr, _)| addr.clone()).collect();
393
394		Telemetry {
395			message_sender: self.message_sender.clone(),
396			register_sender: self.register_sender.clone(),
397			id: self.id_counter.fetch_add(1, atomic::Ordering::Relaxed),
398			connection_notifier: TelemetryConnectionNotifier {
399				register_sender: self.register_sender.clone(),
400				addresses,
401			},
402			endpoints: Some(endpoints),
403		}
404	}
405}
406
407/// A telemetry instance that can be used to send telemetry messages.
408#[derive(Debug)]
409#[cfg(feature = "std")]
410pub struct Telemetry {
411	message_sender: mpsc::Sender<TelemetryMessage>,
412	register_sender: TracingUnboundedSender<Register>,
413	id: Id,
414	connection_notifier: TelemetryConnectionNotifier,
415	endpoints: Option<TelemetryEndpoints>,
416}
417
418#[cfg(feature = "std")]
419impl Telemetry {
420	/// Initialize the telemetry with the endpoints provided in argument for the current substrate
421	/// node.
422	///
423	/// This method must be called during the substrate node initialization.
424	///
425	/// The `endpoints` argument is a collection of telemetry WebSocket servers with a corresponding
426	/// verbosity level.
427	///
428	/// The `connection_message` argument is a JSON object that is sent every time the connection
429	/// (re-)establishes.
430	pub fn start_telemetry(&mut self, connection_message: ConnectionMessage) -> Result<()> {
431		let endpoints = self.endpoints.take().ok_or(Error::TelemetryAlreadyInitialized)?;
432
433		self.register_sender
434			.unbounded_send(Register::Telemetry { id: self.id, endpoints, connection_message })
435			.map_err(|_| Error::TelemetryWorkerDropped)
436	}
437
438	/// Make a new clonable handle to this [`Telemetry`]. This is used for reporting telemetries.
439	pub fn handle(&self) -> TelemetryHandle {
440		TelemetryHandle {
441			message_sender: Arc::new(Mutex::new(self.message_sender.clone())),
442			id: self.id,
443			connection_notifier: self.connection_notifier.clone(),
444		}
445	}
446}
447
448/// Handle to a [`Telemetry`].
449///
450/// Used to report telemetry messages.
451#[derive(Debug, Clone)]
452#[cfg(feature = "std")]
453pub struct TelemetryHandle {
454	message_sender: Arc<Mutex<mpsc::Sender<TelemetryMessage>>>,
455	id: Id,
456	connection_notifier: TelemetryConnectionNotifier,
457}
458
459#[cfg(feature = "std")]
460impl TelemetryHandle {
461	/// Send telemetry messages.
462	pub fn send_telemetry(&self, verbosity: VerbosityLevel, payload: TelemetryPayload) {
463		match self.message_sender.lock().try_send((self.id, verbosity, payload)) {
464			Ok(()) => {},
465			Err(err) if err.is_full() => log::trace!(
466				target: "telemetry",
467				"Telemetry channel full.",
468			),
469			Err(_) => log::trace!(
470				target: "telemetry",
471				"Telemetry channel closed.",
472			),
473		}
474	}
475
476	/// Get event stream for telemetry connection established events.
477	///
478	/// This function will return an error if the telemetry has already been started by
479	/// [`Telemetry::start_telemetry`].
480	pub fn on_connect_stream(&self) -> ConnectionNotifierReceiver {
481		self.connection_notifier.on_connect_stream()
482	}
483}
484
485/// Used to create a stream of events with only one event: when a telemetry connection
486/// (re-)establishes.
487#[derive(Clone, Debug)]
488#[cfg(feature = "std")]
489pub struct TelemetryConnectionNotifier {
490	register_sender: TracingUnboundedSender<Register>,
491	addresses: Vec<Multiaddr>,
492}
493
494#[cfg(feature = "std")]
495impl TelemetryConnectionNotifier {
496	fn on_connect_stream(&self) -> ConnectionNotifierReceiver {
497		let (message_sender, message_receiver) = connection_notifier_channel();
498		if let Err(err) = self.register_sender.unbounded_send(Register::Notifier {
499			addresses: self.addresses.clone(),
500			connection_notifier: message_sender,
501		}) {
502			error!(
503				target: "telemetry",
504				"Could not create a telemetry connection notifier: \
505				the telemetry is probably already running: {}",
506				err,
507			);
508		}
509		message_receiver
510	}
511}
512
513#[derive(Debug)]
514#[cfg(feature = "std")]
515enum Register {
516	Telemetry { id: Id, endpoints: TelemetryEndpoints, connection_message: ConnectionMessage },
517	Notifier { addresses: Vec<Multiaddr>, connection_notifier: ConnectionNotifierSender },
518}
519
520/// Report a telemetry.
521///
522/// Translates to `tracing::info`, but contains an additional verbosity parameter which the log
523/// record is tagged with. Additionally the verbosity parameter is added to the record as a
524/// key-value pair.
525///
526/// # Example
527///
528/// ```no_run
529/// # use soil_telemetry::*;
530/// # let authority_id = 42_u64;
531/// # let set_id = (43_u64, 44_u64);
532/// # let authorities = vec![45_u64];
533/// # let telemetry: Option<TelemetryHandle> = None;
534/// telemetry!(
535///     telemetry;      // an `Option<TelemetryHandle>`
536///     CONSENSUS_INFO;
537///     "afg.authority_set";
538///     "authority_id" => authority_id.to_string(),
539///     "authority_set_id" => ?set_id,
540///     "authorities" => authorities,
541/// );
542/// ```
543#[macro_export(local_inner_macros)]
544#[cfg(feature = "std")]
545macro_rules! telemetry {
546	( $telemetry:expr; $verbosity:expr; $msg:expr; $( $t:tt )* ) => {{
547		if let Some(telemetry) = $telemetry.as_ref() {
548			let verbosity: $crate::VerbosityLevel = $verbosity;
549			match format_fields_to_json!($($t)*) {
550				Err(err) => {
551					$crate::log::debug!(
552						target: "telemetry",
553						"Could not serialize value for telemetry: {}",
554						err,
555					);
556				},
557				Ok(mut json) => {
558					json.insert("msg".into(), $msg.into());
559					telemetry.send_telemetry(verbosity, json);
560				},
561			}
562		}
563	}};
564}
565
566#[macro_export(local_inner_macros)]
567#[doc(hidden)]
568#[cfg(feature = "std")]
569macro_rules! format_fields_to_json {
570	( $k:literal => $v:expr $(,)? $(, $($t:tt)+ )? ) => {{
571		$crate::serde_json::to_value(&$v)
572			.map(|value| {
573				let mut map = $crate::serde_json::Map::new();
574				map.insert($k.into(), value);
575				map
576			})
577			$(
578				.and_then(|mut prev_map| {
579					format_fields_to_json!($($t)*)
580						.map(move |mut other_map| {
581							prev_map.append(&mut other_map);
582							prev_map
583						})
584				})
585			)*
586	}};
587	( $k:literal => ? $v:expr $(,)? $(, $($t:tt)+ )? ) => {{
588		let mut map = $crate::serde_json::Map::new();
589		map.insert($k.into(), std::format!("{:?}", &$v).into());
590		$crate::serde_json::Result::Ok(map)
591		$(
592			.and_then(|mut prev_map| {
593				format_fields_to_json!($($t)*)
594					.map(move |mut other_map| {
595						prev_map.append(&mut other_map);
596						prev_map
597					})
598			})
599		)*
600	}};
601}