#![warn(missing_docs)]
use futures::{channel::mpsc, prelude::*};
use tetsy_libp2p::Multiaddr;
use log::{error, warn};
use serde::Serialize;
use tetcore_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver};
use std::collections::HashMap;
use tracing::Id;
pub use tetsy_libp2p::wasm_ext::ExtTransport;
pub use serde_json;
pub use tracing;
mod endpoints;
mod layer;
mod node;
mod transport;
pub use endpoints::*;
pub use layer::*;
use node::*;
use transport::*;
pub const TETCORE_DEBUG: u8 = 9;
pub const TETCORE_INFO: u8 = 0;
pub const CONSENSUS_TRACE: u8 = 9;
pub const CONSENSUS_DEBUG: u8 = 5;
pub const CONSENSUS_WARN: u8 = 4;
pub const CONSENSUS_INFO: u8 = 1;
pub(crate) type TelemetryMessage = (Id, u8, String);
#[derive(Debug, Clone)]
pub struct TelemetrySpan(tracing::Span);
impl TelemetrySpan {
pub fn enter(&self) -> tracing::span::Entered {
self.0.enter()
}
pub fn new() -> Self {
Self(tracing::info_span!(TELEMETRY_LOG_SPAN))
}
pub fn span(&self) -> tracing::Span {
self.0.clone()
}
}
#[derive(Debug, Serialize)]
pub struct ConnectionMessage {
pub name: String,
pub implementation: String,
pub version: String,
pub config: String,
pub chain: String,
pub genesis_hash: String,
pub authority: bool,
pub startup_time: String,
pub network_id: String,
}
#[derive(Debug)]
pub struct TelemetryWorker {
message_receiver: mpsc::Receiver<TelemetryMessage>,
message_sender: mpsc::Sender<TelemetryMessage>,
register_receiver: mpsc::UnboundedReceiver<Register>,
register_sender: mpsc::UnboundedSender<Register>,
transport: WsTrans,
}
impl TelemetryWorker {
pub(crate) fn new(buffer_size: usize, transport: WsTrans) -> Self {
let (message_sender, message_receiver) = mpsc::channel(buffer_size);
let (register_sender, register_receiver) = mpsc::unbounded();
Self {
message_receiver,
message_sender,
register_receiver,
register_sender,
transport,
}
}
pub fn handle(&self) -> TelemetryHandle {
TelemetryHandle {
message_sender: self.register_sender.clone(),
}
}
pub(crate) fn message_sender(&self) -> mpsc::Sender<TelemetryMessage> {
self.message_sender.clone()
}
pub async fn run(self) {
let Self {
mut message_receiver,
message_sender: _,
mut register_receiver,
register_sender: _,
transport,
} = self;
let mut node_map: HashMap<Id, Vec<(u8, Multiaddr)>> = HashMap::new();
let mut node_pool: HashMap<Multiaddr, _> = HashMap::new();
loop {
futures::select! {
message = message_receiver.next() => Self::process_message(
message,
&mut node_pool,
&node_map,
).await,
init_payload = register_receiver.next() => Self::process_register(
init_payload,
&mut node_pool,
&mut node_map,
transport.clone(),
).await,
}
}
}
async fn process_register(
input: Option<Register>,
node_pool: &mut HashMap<Multiaddr, Node<WsTrans>>,
node_map: &mut HashMap<Id, Vec<(u8, Multiaddr)>>,
transport: WsTrans,
) {
let input = input.expect("the stream is never closed; qed");
match input {
Register::Telemetry {
id,
endpoints,
connection_message,
} => {
let endpoints = endpoints.0;
let connection_message = match serde_json::to_value(&connection_message) {
Ok(serde_json::Value::Object(mut value)) => {
value.insert("msg".into(), "system.connected".into());
let mut obj = serde_json::Map::new();
obj.insert("id".to_string(), id.into_u64().into());
obj.insert("payload".to_string(), value.into());
Some(obj)
}
Ok(_) => {
unreachable!("ConnectionMessage always serialize to an object; qed")
}
Err(err) => {
log::error!(
target: "telemetry",
"Could not serialize connection message: {}",
err,
);
None
}
};
for (addr, verbosity) in endpoints {
node_map
.entry(id.clone())
.or_default()
.push((verbosity, addr.clone()));
let node = node_pool.entry(addr.clone()).or_insert_with(|| {
Node::new(transport.clone(), addr.clone(), Vec::new(), Vec::new())
});
node.connection_messages.extend(connection_message.clone());
}
}
Register::Notifier {
addresses,
connection_notifier,
} => {
for addr in addresses {
if let Some(node) = node_pool.get_mut(&addr) {
node.telemetry_connection_notifier
.push(connection_notifier.clone());
} else {
log::error!(
target: "telemetry",
"Received connection notifier for unknown node ({}). This is a bug.",
addr,
);
}
}
}
}
}
async fn process_message(
input: Option<TelemetryMessage>,
node_pool: &mut HashMap<Multiaddr, Node<WsTrans>>,
node_map: &HashMap<Id, Vec<(u8, Multiaddr)>>,
) {
let (id, verbosity, message) = input.expect("the stream is never closed; qed");
let nodes = if let Some(nodes) = node_map.get(&id) {
nodes
} else {
log::trace!(
target: "telemetry",
"Received telemetry log for unknown id ({:?}): {}",
id,
message,
);
return;
};
for (node_max_verbosity, addr) in nodes {
if verbosity > *node_max_verbosity {
log::trace!(
target: "telemetry",
"Skipping {} for log entry with verbosity {:?}",
addr,
verbosity,
);
continue;
}
if let Some(node) = node_pool.get_mut(&addr) {
let _ = node.send(message.clone()).await;
} else {
log::error!(
target: "telemetry",
"Received message for unknown node ({}). This is a bug. \
Message sent: {}",
addr,
message,
);
}
}
}
}
#[derive(Debug, Clone)]
pub struct TelemetryHandle {
message_sender: mpsc::UnboundedSender<Register>,
}
impl TelemetryHandle {
pub fn start_telemetry(
&mut self,
span: TelemetrySpan,
endpoints: TelemetryEndpoints,
connection_message: ConnectionMessage,
) -> TelemetryConnectionNotifier {
let Self { message_sender } = self;
let connection_notifier = TelemetryConnectionNotifier {
message_sender: message_sender.clone(),
addresses: endpoints.0.iter().map(|(addr, _)| addr.clone()).collect(),
};
match span.0.id() {
Some(id) => {
match message_sender.unbounded_send(Register::Telemetry {
id,
endpoints,
connection_message,
}) {
Ok(()) => {}
Err(err) => error!(
target: "telemetry",
"Could not initialize telemetry: \
the telemetry is probably already running: {}",
err,
),
}
}
None => error!(
target: "telemetry",
"Could not initialize telemetry: the span could not be entered",
),
}
connection_notifier
}
}
#[derive(Clone, Debug)]
pub struct TelemetryConnectionNotifier {
message_sender: mpsc::UnboundedSender<Register>,
addresses: Vec<Multiaddr>,
}
impl TelemetryConnectionNotifier {
pub fn on_connect_stream(&self) -> TracingUnboundedReceiver<()> {
let (message_sender, message_receiver) = tracing_unbounded("mptc_telemetry_on_connect");
if let Err(err) = self.message_sender.unbounded_send(Register::Notifier {
addresses: self.addresses.clone(),
connection_notifier: message_sender,
}) {
error!(
target: "telemetry",
"Could not create a telemetry connection notifier: \
the telemetry is probably already running: {}",
err,
);
}
message_receiver
}
}
#[derive(Debug)]
enum Register {
Telemetry {
id: Id,
endpoints: TelemetryEndpoints,
connection_message: ConnectionMessage,
},
Notifier {
addresses: Vec<Multiaddr>,
connection_notifier: ConnectionNotifierSender,
},
}
#[macro_export(local_inner_macros)]
macro_rules! telemetry {
( $verbosity:expr; $msg:expr; $( $t:tt )* ) => {{
let verbosity: u8 = $verbosity;
match format_fields_to_json!($($t)*) {
Err(err) => {
$crate::tracing::error!(
target: "telemetry",
"Could not serialize value for telemetry: {}",
err,
);
},
Ok(mut json) => {
json.insert("msg".into(), $msg.into());
let serialized_json = $crate::serde_json::to_string(&json)
.expect("contains only string keys; qed");
$crate::tracing::info!(target: $crate::TELEMETRY_LOG_SPAN,
verbosity,
json = serialized_json.as_str(),
);
},
}
}};
}
#[macro_export(local_inner_macros)]
#[doc(hidden)]
macro_rules! format_fields_to_json {
( $k:literal => $v:expr $(,)? $(, $($t:tt)+ )? ) => {{
$crate::serde_json::to_value(&$v)
.map(|value| {
let mut map = $crate::serde_json::Map::new();
map.insert($k.into(), value);
map
})
$(
.and_then(|mut prev_map| {
format_fields_to_json!($($t)*)
.map(move |mut other_map| {
prev_map.append(&mut other_map);
prev_map
})
})
)*
}};
( $k:literal => ? $v:expr $(,)? $(, $($t:tt)+ )? ) => {{
let mut map = $crate::serde_json::Map::new();
map.insert($k.into(), std::format!("{:?}", &$v).into());
$crate::serde_json::Result::Ok(map)
$(
.and_then(|mut prev_map| {
format_fields_to_json!($($t)*)
.map(move |mut other_map| {
prev_map.append(&mut other_map);
prev_map
})
})
)*
}};
}