1#![warn(missing_docs)]
38
39use futures::{channel::mpsc, prelude::*};
40use tetsy_libp2p::Multiaddr;
41use log::{error, warn};
42use serde::Serialize;
43use tetcore_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver};
44use std::collections::HashMap;
45use tracing::Id;
46
47pub use tetsy_libp2p::wasm_ext::ExtTransport;
48pub use serde_json;
49pub use tracing;
50
51mod endpoints;
52mod layer;
53mod node;
54mod transport;
55
56pub use endpoints::*;
57pub use layer::*;
58use node::*;
59use transport::*;
60
61pub const TETCORE_DEBUG: u8 = 9;
63pub const TETCORE_INFO: u8 = 0;
65
66pub const CONSENSUS_TRACE: u8 = 9;
68pub const CONSENSUS_DEBUG: u8 = 5;
70pub const CONSENSUS_WARN: u8 = 4;
72pub const CONSENSUS_INFO: u8 = 1;
74
75pub(crate) type TelemetryMessage = (Id, u8, String);
76
77#[derive(Debug, Clone)]
79pub struct TelemetrySpan(tracing::Span);
80
81impl TelemetrySpan {
82 pub fn enter(&self) -> tracing::span::Entered {
84 self.0.enter()
85 }
86
87 pub fn new() -> Self {
89 Self(tracing::info_span!(TELEMETRY_LOG_SPAN))
90 }
91
92 pub fn span(&self) -> tracing::Span {
94 self.0.clone()
95 }
96}
97
98#[derive(Debug, Serialize)]
100pub struct ConnectionMessage {
101 pub name: String,
103 pub implementation: String,
105 pub version: String,
107 pub config: String,
109 pub chain: String,
111 pub genesis_hash: String,
113 pub authority: bool,
115 pub startup_time: String,
117 pub network_id: String,
119}
120
121#[derive(Debug)]
127pub struct TelemetryWorker {
128 message_receiver: mpsc::Receiver<TelemetryMessage>,
129 message_sender: mpsc::Sender<TelemetryMessage>,
130 register_receiver: mpsc::UnboundedReceiver<Register>,
131 register_sender: mpsc::UnboundedSender<Register>,
132 transport: WsTrans,
133}
134
135impl TelemetryWorker {
136 pub(crate) fn new(buffer_size: usize, transport: WsTrans) -> Self {
137 let (message_sender, message_receiver) = mpsc::channel(buffer_size);
138 let (register_sender, register_receiver) = mpsc::unbounded();
139
140 Self {
141 message_receiver,
142 message_sender,
143 register_receiver,
144 register_sender,
145 transport,
146 }
147 }
148
149 pub fn handle(&self) -> TelemetryHandle {
153 TelemetryHandle {
154 message_sender: self.register_sender.clone(),
155 }
156 }
157
158 pub(crate) fn message_sender(&self) -> mpsc::Sender<TelemetryMessage> {
160 self.message_sender.clone()
161 }
162
163 pub async fn run(self) {
167 let Self {
168 mut message_receiver,
169 message_sender: _,
170 mut register_receiver,
171 register_sender: _,
172 transport,
173 } = self;
174
175 let mut node_map: HashMap<Id, Vec<(u8, Multiaddr)>> = HashMap::new();
176 let mut node_pool: HashMap<Multiaddr, _> = HashMap::new();
177
178 loop {
179 futures::select! {
180 message = message_receiver.next() => Self::process_message(
181 message,
182 &mut node_pool,
183 &node_map,
184 ).await,
185 init_payload = register_receiver.next() => Self::process_register(
186 init_payload,
187 &mut node_pool,
188 &mut node_map,
189 transport.clone(),
190 ).await,
191 }
192 }
193 }
194
195 async fn process_register(
196 input: Option<Register>,
197 node_pool: &mut HashMap<Multiaddr, Node<WsTrans>>,
198 node_map: &mut HashMap<Id, Vec<(u8, Multiaddr)>>,
199 transport: WsTrans,
200 ) {
201 let input = input.expect("the stream is never closed; qed");
202
203 match input {
204 Register::Telemetry {
205 id,
206 endpoints,
207 connection_message,
208 } => {
209 let endpoints = endpoints.0;
210
211 let connection_message = match serde_json::to_value(&connection_message) {
212 Ok(serde_json::Value::Object(mut value)) => {
213 value.insert("msg".into(), "system.connected".into());
214 let mut obj = serde_json::Map::new();
215 obj.insert("id".to_string(), id.into_u64().into());
216 obj.insert("payload".to_string(), value.into());
217 Some(obj)
218 }
219 Ok(_) => {
220 unreachable!("ConnectionMessage always serialize to an object; qed")
221 }
222 Err(err) => {
223 log::error!(
224 target: "telemetry",
225 "Could not serialize connection message: {}",
226 err,
227 );
228 None
229 }
230 };
231
232 for (addr, verbosity) in endpoints {
233 node_map
234 .entry(id.clone())
235 .or_default()
236 .push((verbosity, addr.clone()));
237
238 let node = node_pool.entry(addr.clone()).or_insert_with(|| {
239 Node::new(transport.clone(), addr.clone(), Vec::new(), Vec::new())
240 });
241
242 node.connection_messages.extend(connection_message.clone());
243 }
244 }
245 Register::Notifier {
246 addresses,
247 connection_notifier,
248 } => {
249 for addr in addresses {
250 if let Some(node) = node_pool.get_mut(&addr) {
251 node.telemetry_connection_notifier
252 .push(connection_notifier.clone());
253 } else {
254 log::error!(
255 target: "telemetry",
256 "Received connection notifier for unknown node ({}). This is a bug.",
257 addr,
258 );
259 }
260 }
261 }
262 }
263 }
264
265 async fn process_message(
267 input: Option<TelemetryMessage>,
268 node_pool: &mut HashMap<Multiaddr, Node<WsTrans>>,
269 node_map: &HashMap<Id, Vec<(u8, Multiaddr)>>,
270 ) {
271 let (id, verbosity, message) = input.expect("the stream is never closed; qed");
272
273 let nodes = if let Some(nodes) = node_map.get(&id) {
274 nodes
275 } else {
276 log::trace!(
280 target: "telemetry",
281 "Received telemetry log for unknown id ({:?}): {}",
282 id,
283 message,
284 );
285 return;
286 };
287
288 for (node_max_verbosity, addr) in nodes {
289 if verbosity > *node_max_verbosity {
290 log::trace!(
291 target: "telemetry",
292 "Skipping {} for log entry with verbosity {:?}",
293 addr,
294 verbosity,
295 );
296 continue;
297 }
298
299 if let Some(node) = node_pool.get_mut(&addr) {
300 let _ = node.send(message.clone()).await;
301 } else {
302 log::error!(
303 target: "telemetry",
304 "Received message for unknown node ({}). This is a bug. \
305 Message sent: {}",
306 addr,
307 message,
308 );
309 }
310 }
311 }
312}
313
314#[derive(Debug, Clone)]
316pub struct TelemetryHandle {
317 message_sender: mpsc::UnboundedSender<Register>,
318}
319
320impl TelemetryHandle {
321 pub fn start_telemetry(
332 &mut self,
333 span: TelemetrySpan,
334 endpoints: TelemetryEndpoints,
335 connection_message: ConnectionMessage,
336 ) -> TelemetryConnectionNotifier {
337 let Self { message_sender } = self;
338
339 let connection_notifier = TelemetryConnectionNotifier {
340 message_sender: message_sender.clone(),
341 addresses: endpoints.0.iter().map(|(addr, _)| addr.clone()).collect(),
342 };
343
344 match span.0.id() {
345 Some(id) => {
346 match message_sender.unbounded_send(Register::Telemetry {
347 id,
348 endpoints,
349 connection_message,
350 }) {
351 Ok(()) => {}
352 Err(err) => error!(
353 target: "telemetry",
354 "Could not initialize telemetry: \
355 the telemetry is probably already running: {}",
356 err,
357 ),
358 }
359 }
360 None => error!(
361 target: "telemetry",
362 "Could not initialize telemetry: the span could not be entered",
363 ),
364 }
365
366 connection_notifier
367 }
368}
369
370#[derive(Clone, Debug)]
373pub struct TelemetryConnectionNotifier {
374 message_sender: mpsc::UnboundedSender<Register>,
375 addresses: Vec<Multiaddr>,
376}
377
378impl TelemetryConnectionNotifier {
379 pub fn on_connect_stream(&self) -> TracingUnboundedReceiver<()> {
384 let (message_sender, message_receiver) = tracing_unbounded("mptc_telemetry_on_connect");
385 if let Err(err) = self.message_sender.unbounded_send(Register::Notifier {
386 addresses: self.addresses.clone(),
387 connection_notifier: message_sender,
388 }) {
389 error!(
390 target: "telemetry",
391 "Could not create a telemetry connection notifier: \
392 the telemetry is probably already running: {}",
393 err,
394 );
395 }
396 message_receiver
397 }
398}
399
400#[derive(Debug)]
401enum Register {
402 Telemetry {
403 id: Id,
404 endpoints: TelemetryEndpoints,
405 connection_message: ConnectionMessage,
406 },
407 Notifier {
408 addresses: Vec<Multiaddr>,
409 connection_notifier: ConnectionNotifierSender,
410 },
411}
412
413#[macro_export(local_inner_macros)]
433macro_rules! telemetry {
434 ( $verbosity:expr; $msg:expr; $( $t:tt )* ) => {{
435 let verbosity: u8 = $verbosity;
436 match format_fields_to_json!($($t)*) {
437 Err(err) => {
438 $crate::tracing::error!(
439 target: "telemetry",
440 "Could not serialize value for telemetry: {}",
441 err,
442 );
443 },
444 Ok(mut json) => {
445 json.insert("msg".into(), $msg.into());
447 let serialized_json = $crate::serde_json::to_string(&json)
448 .expect("contains only string keys; qed");
449 $crate::tracing::info!(target: $crate::TELEMETRY_LOG_SPAN,
450 verbosity,
451 json = serialized_json.as_str(),
452 );
453 },
454 }
455 }};
456}
457
458#[macro_export(local_inner_macros)]
459#[doc(hidden)]
460macro_rules! format_fields_to_json {
461 ( $k:literal => $v:expr $(,)? $(, $($t:tt)+ )? ) => {{
462 $crate::serde_json::to_value(&$v)
463 .map(|value| {
464 let mut map = $crate::serde_json::Map::new();
465 map.insert($k.into(), value);
466 map
467 })
468 $(
469 .and_then(|mut prev_map| {
470 format_fields_to_json!($($t)*)
471 .map(move |mut other_map| {
472 prev_map.append(&mut other_map);
473 prev_map
474 })
475 })
476 )*
477 }};
478 ( $k:literal => ? $v:expr $(,)? $(, $($t:tt)+ )? ) => {{
479 let mut map = $crate::serde_json::Map::new();
480 map.insert($k.into(), std::format!("{:?}", &$v).into());
481 $crate::serde_json::Result::Ok(map)
482 $(
483 .and_then(|mut prev_map| {
484 format_fields_to_json!($($t)*)
485 .map(move |mut other_map| {
486 prev_map.append(&mut other_map);
487 prev_map
488 })
489 })
490 )*
491 }};
492}