1#![warn(missing_docs)]
26#![cfg_attr(not(feature = "std"), no_std)]
27
28#[cfg(feature = "std")]
29use futures::{channel::mpsc, prelude::*};
30#[cfg(feature = "std")]
31use libp2p::Multiaddr;
32#[cfg(feature = "std")]
33use log::{error, warn};
34#[cfg(feature = "std")]
35use parking_lot::Mutex;
36#[cfg(feature = "std")]
37use serde::Serialize;
38#[cfg(feature = "std")]
39use soil_client::utils::mpsc::{
40 tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender,
41};
42use std::{
43 collections::{
44 hash_map::Entry::{Occupied, Vacant},
45 HashMap,
46 },
47 sync::{atomic, Arc},
48};
49
50#[cfg(feature = "std")]
51pub use log;
52#[cfg(feature = "std")]
53pub use serde_json;
54
55#[cfg(feature = "std")]
56mod endpoints;
57#[cfg(feature = "std")]
58mod error;
59#[cfg(feature = "std")]
60mod node;
61#[cfg(feature = "std")]
62mod transport;
63
64#[cfg(feature = "std")]
65pub use endpoints::*;
66#[cfg(feature = "std")]
67pub use error::*;
68#[cfg(feature = "std")]
69use node::*;
70#[cfg(feature = "std")]
71use transport::*;
72
73#[cfg(feature = "std")]
75pub const SUBSTRATE_DEBUG: VerbosityLevel = 9;
76#[cfg(feature = "std")]
78pub const SUBSTRATE_INFO: VerbosityLevel = 0;
79
80#[cfg(feature = "std")]
82pub const CONSENSUS_TRACE: VerbosityLevel = 9;
83#[cfg(feature = "std")]
85pub const CONSENSUS_DEBUG: VerbosityLevel = 5;
86#[cfg(feature = "std")]
88pub const CONSENSUS_WARN: VerbosityLevel = 4;
89#[cfg(feature = "std")]
91pub const CONSENSUS_INFO: VerbosityLevel = 1;
92
93#[cfg(feature = "std")]
95pub type VerbosityLevel = u8;
96
97pub(crate) type Id = u64;
98pub(crate) type TelemetryPayload = serde_json::Map<String, serde_json::Value>;
99pub(crate) type TelemetryMessage = (Id, VerbosityLevel, TelemetryPayload);
100
101#[derive(Debug, Serialize)]
103#[cfg(feature = "std")]
104pub struct ConnectionMessage {
105 pub name: String,
107 pub implementation: String,
109 pub version: String,
111 pub config: String,
113 pub chain: String,
115 pub genesis_hash: String,
117 pub authority: bool,
119 pub startup_time: String,
121 pub network_id: String,
123
124 pub target_os: String,
126
127 pub target_arch: String,
129
130 pub target_env: String,
132
133 pub sysinfo: Option<SysInfo>,
135}
136
137#[derive(Debug, Serialize)]
142#[cfg(feature = "std")]
143pub struct SysInfo {
144 pub cpu: Option<String>,
146 pub memory: Option<u64>,
148 pub core_count: Option<u32>,
150 pub linux_kernel: Option<String>,
152 pub linux_distro: Option<String>,
154 pub is_virtual_machine: Option<bool>,
156}
157
158#[derive(Debug)]
164#[cfg(feature = "std")]
165pub struct TelemetryWorker {
166 message_receiver: mpsc::Receiver<TelemetryMessage>,
167 message_sender: mpsc::Sender<TelemetryMessage>,
168 register_receiver: TracingUnboundedReceiver<Register>,
169 register_sender: TracingUnboundedSender<Register>,
170 id_counter: Arc<atomic::AtomicU64>,
171}
172
173#[cfg(feature = "std")]
174impl TelemetryWorker {
175 pub fn new(buffer_size: usize) -> Result<Self> {
179 let _transport = initialize_transport()?;
184 let (message_sender, message_receiver) = mpsc::channel(buffer_size);
185 let (register_sender, register_receiver) =
186 tracing_unbounded("mpsc_telemetry_register", 10_000);
187
188 Ok(Self {
189 message_receiver,
190 message_sender,
191 register_receiver,
192 register_sender,
193 id_counter: Arc::new(atomic::AtomicU64::new(1)),
194 })
195 }
196
197 pub fn handle(&self) -> TelemetryWorkerHandle {
201 TelemetryWorkerHandle {
202 message_sender: self.message_sender.clone(),
203 register_sender: self.register_sender.clone(),
204 id_counter: self.id_counter.clone(),
205 }
206 }
207
208 pub async fn run(mut self) {
212 let mut node_map: HashMap<Id, Vec<(VerbosityLevel, Multiaddr)>> = HashMap::new();
213 let mut node_pool: HashMap<Multiaddr, _> = HashMap::new();
214 let mut pending_connection_notifications: Vec<_> = Vec::new();
215
216 loop {
217 futures::select! {
218 message = self.message_receiver.next() => Self::process_message(
219 message,
220 &mut node_pool,
221 &node_map,
222 ).await,
223 init_payload = self.register_receiver.next() => Self::process_register(
224 init_payload,
225 &mut node_pool,
226 &mut node_map,
227 &mut pending_connection_notifications,
228 ).await,
229 }
230 }
231 }
232
233 async fn process_register(
234 input: Option<Register>,
235 node_pool: &mut HashMap<Multiaddr, Node<WsTrans>>,
236 node_map: &mut HashMap<Id, Vec<(VerbosityLevel, Multiaddr)>>,
237 pending_connection_notifications: &mut Vec<(Multiaddr, ConnectionNotifierSender)>,
238 ) {
239 let input = input.expect("the stream is never closed; qed");
240
241 match input {
242 Register::Telemetry { id, endpoints, connection_message } => {
243 let endpoints = endpoints.0;
244
245 let connection_message = match serde_json::to_value(&connection_message) {
246 Ok(serde_json::Value::Object(mut value)) => {
247 value.insert("msg".into(), "system.connected".into());
248 let mut obj = serde_json::Map::new();
249 obj.insert("id".to_string(), id.into());
250 obj.insert("payload".to_string(), value.into());
251 Some(obj)
252 },
253 Ok(_) => {
254 unreachable!("ConnectionMessage always serialize to an object; qed")
255 },
256 Err(err) => {
257 log::error!(
258 target: "telemetry",
259 "Could not serialize connection message: {}",
260 err,
261 );
262 None
263 },
264 };
265
266 for (addr, verbosity) in endpoints {
267 log::trace!(
268 target: "telemetry",
269 "Initializing telemetry for: {:?}",
270 addr,
271 );
272 node_map.entry(id).or_default().push((verbosity, addr.clone()));
273
274 let node = match node_pool.entry(addr.clone()) {
275 Occupied(entry) => entry.into_mut(),
276 Vacant(entry) => {
277 let transport = initialize_transport();
278 let transport = match transport {
279 Ok(t) => t,
280 Err(err) => {
281 log::error!(
282 target: "telemetry",
283 "Could not initialise transport: {}",
284 err,
285 );
286 continue;
287 },
288 };
289 entry.insert(Node::new(transport, addr.clone(), Vec::new(), Vec::new()))
290 },
291 };
292
293 node.connection_messages.extend(connection_message.clone());
294
295 pending_connection_notifications.retain(|(addr_b, connection_message)| {
296 if *addr_b == addr {
297 node.telemetry_connection_notifier.push(connection_message.clone());
298 false
299 } else {
300 true
301 }
302 });
303 }
304 },
305 Register::Notifier { addresses, connection_notifier } => {
306 for addr in addresses {
307 if let Some(node) = node_pool.get_mut(&addr) {
312 node.telemetry_connection_notifier.push(connection_notifier.clone());
313 } else {
314 pending_connection_notifications.push((addr, connection_notifier.clone()));
315 }
316 }
317 },
318 }
319 }
320
321 async fn process_message(
323 input: Option<TelemetryMessage>,
324 node_pool: &mut HashMap<Multiaddr, Node<WsTrans>>,
325 node_map: &HashMap<Id, Vec<(VerbosityLevel, Multiaddr)>>,
326 ) {
327 let (id, verbosity, payload) = input.expect("the stream is never closed; qed");
328
329 let ts = chrono::Local::now().to_rfc3339();
330 let mut message = serde_json::Map::new();
331 message.insert("id".into(), id.into());
332 message.insert("ts".into(), ts.into());
333 message.insert("payload".into(), payload.into());
334
335 let nodes = if let Some(nodes) = node_map.get(&id) {
336 nodes
337 } else {
338 log::trace!(
341 target: "telemetry",
342 "Received telemetry log for unknown id ({:?}): {}",
343 id,
344 serde_json::to_string(&message)
345 .unwrap_or_else(|err| format!(
346 "could not be serialized ({}): {:?}",
347 err,
348 message,
349 )),
350 );
351 return;
352 };
353
354 for (node_max_verbosity, addr) in nodes {
355 if verbosity > *node_max_verbosity {
356 continue;
357 }
358
359 if let Some(node) = node_pool.get_mut(addr) {
360 let _ = node.send(message.clone()).await;
361 } else {
362 log::debug!(
363 target: "telemetry",
364 "Received message for unknown node ({}). This is a bug. \
365 Message sent: {}",
366 addr,
367 serde_json::to_string(&message)
368 .unwrap_or_else(|err| format!(
369 "could not be serialized ({}): {:?}",
370 err,
371 message,
372 )),
373 );
374 }
375 }
376 }
377}
378
379#[derive(Debug, Clone)]
381#[cfg(feature = "std")]
382pub struct TelemetryWorkerHandle {
383 message_sender: mpsc::Sender<TelemetryMessage>,
384 register_sender: TracingUnboundedSender<Register>,
385 id_counter: Arc<atomic::AtomicU64>,
386}
387
388#[cfg(feature = "std")]
389impl TelemetryWorkerHandle {
390 pub fn new_telemetry(&mut self, endpoints: TelemetryEndpoints) -> Telemetry {
392 let addresses = endpoints.0.iter().map(|(addr, _)| addr.clone()).collect();
393
394 Telemetry {
395 message_sender: self.message_sender.clone(),
396 register_sender: self.register_sender.clone(),
397 id: self.id_counter.fetch_add(1, atomic::Ordering::Relaxed),
398 connection_notifier: TelemetryConnectionNotifier {
399 register_sender: self.register_sender.clone(),
400 addresses,
401 },
402 endpoints: Some(endpoints),
403 }
404 }
405}
406
407#[derive(Debug)]
409#[cfg(feature = "std")]
410pub struct Telemetry {
411 message_sender: mpsc::Sender<TelemetryMessage>,
412 register_sender: TracingUnboundedSender<Register>,
413 id: Id,
414 connection_notifier: TelemetryConnectionNotifier,
415 endpoints: Option<TelemetryEndpoints>,
416}
417
418#[cfg(feature = "std")]
419impl Telemetry {
420 pub fn start_telemetry(&mut self, connection_message: ConnectionMessage) -> Result<()> {
431 let endpoints = self.endpoints.take().ok_or(Error::TelemetryAlreadyInitialized)?;
432
433 self.register_sender
434 .unbounded_send(Register::Telemetry { id: self.id, endpoints, connection_message })
435 .map_err(|_| Error::TelemetryWorkerDropped)
436 }
437
438 pub fn handle(&self) -> TelemetryHandle {
440 TelemetryHandle {
441 message_sender: Arc::new(Mutex::new(self.message_sender.clone())),
442 id: self.id,
443 connection_notifier: self.connection_notifier.clone(),
444 }
445 }
446}
447
448#[derive(Debug, Clone)]
452#[cfg(feature = "std")]
453pub struct TelemetryHandle {
454 message_sender: Arc<Mutex<mpsc::Sender<TelemetryMessage>>>,
455 id: Id,
456 connection_notifier: TelemetryConnectionNotifier,
457}
458
459#[cfg(feature = "std")]
460impl TelemetryHandle {
461 pub fn send_telemetry(&self, verbosity: VerbosityLevel, payload: TelemetryPayload) {
463 match self.message_sender.lock().try_send((self.id, verbosity, payload)) {
464 Ok(()) => {},
465 Err(err) if err.is_full() => log::trace!(
466 target: "telemetry",
467 "Telemetry channel full.",
468 ),
469 Err(_) => log::trace!(
470 target: "telemetry",
471 "Telemetry channel closed.",
472 ),
473 }
474 }
475
476 pub fn on_connect_stream(&self) -> ConnectionNotifierReceiver {
481 self.connection_notifier.on_connect_stream()
482 }
483}
484
485#[derive(Clone, Debug)]
488#[cfg(feature = "std")]
489pub struct TelemetryConnectionNotifier {
490 register_sender: TracingUnboundedSender<Register>,
491 addresses: Vec<Multiaddr>,
492}
493
494#[cfg(feature = "std")]
495impl TelemetryConnectionNotifier {
496 fn on_connect_stream(&self) -> ConnectionNotifierReceiver {
497 let (message_sender, message_receiver) = connection_notifier_channel();
498 if let Err(err) = self.register_sender.unbounded_send(Register::Notifier {
499 addresses: self.addresses.clone(),
500 connection_notifier: message_sender,
501 }) {
502 error!(
503 target: "telemetry",
504 "Could not create a telemetry connection notifier: \
505 the telemetry is probably already running: {}",
506 err,
507 );
508 }
509 message_receiver
510 }
511}
512
513#[derive(Debug)]
514#[cfg(feature = "std")]
515enum Register {
516 Telemetry { id: Id, endpoints: TelemetryEndpoints, connection_message: ConnectionMessage },
517 Notifier { addresses: Vec<Multiaddr>, connection_notifier: ConnectionNotifierSender },
518}
519
520#[macro_export(local_inner_macros)]
544#[cfg(feature = "std")]
545macro_rules! telemetry {
546 ( $telemetry:expr; $verbosity:expr; $msg:expr; $( $t:tt )* ) => {{
547 if let Some(telemetry) = $telemetry.as_ref() {
548 let verbosity: $crate::VerbosityLevel = $verbosity;
549 match format_fields_to_json!($($t)*) {
550 Err(err) => {
551 $crate::log::debug!(
552 target: "telemetry",
553 "Could not serialize value for telemetry: {}",
554 err,
555 );
556 },
557 Ok(mut json) => {
558 json.insert("msg".into(), $msg.into());
559 telemetry.send_telemetry(verbosity, json);
560 },
561 }
562 }
563 }};
564}
565
566#[macro_export(local_inner_macros)]
567#[doc(hidden)]
568#[cfg(feature = "std")]
569macro_rules! format_fields_to_json {
570 ( $k:literal => $v:expr $(,)? $(, $($t:tt)+ )? ) => {{
571 $crate::serde_json::to_value(&$v)
572 .map(|value| {
573 let mut map = $crate::serde_json::Map::new();
574 map.insert($k.into(), value);
575 map
576 })
577 $(
578 .and_then(|mut prev_map| {
579 format_fields_to_json!($($t)*)
580 .map(move |mut other_map| {
581 prev_map.append(&mut other_map);
582 prev_map
583 })
584 })
585 )*
586 }};
587 ( $k:literal => ? $v:expr $(,)? $(, $($t:tt)+ )? ) => {{
588 let mut map = $crate::serde_json::Map::new();
589 map.insert($k.into(), std::format!("{:?}", &$v).into());
590 $crate::serde_json::Result::Ok(map)
591 $(
592 .and_then(|mut prev_map| {
593 format_fields_to_json!($($t)*)
594 .map(move |mut other_map| {
595 prev_map.append(&mut other_map);
596 prev_map
597 })
598 })
599 )*
600 }};
601}