use smol::{future, net::UdpSocket};
use smoldot::libp2p::PeerId;
use std::{
convert::TryFrom as _, future::Future, io, net::SocketAddr, num::NonZero, pin::Pin, sync::Arc,
};
pub struct Config<'a> {
pub tasks_executor: &'a mut dyn FnMut(Pin<Box<dyn Future<Output = ()> + Send>>),
pub service_name: String,
pub jaeger_agent: Option<SocketAddr>,
}
pub struct JaegerService {
traces_in: Arc<mick_jaeger::TracesIn>,
shutdown_notify: event_listener::Event,
}
impl JaegerService {
pub async fn new(config: Config<'_>) -> Result<Arc<Self>, io::Error> {
let (traces_in, mut traces_out) = mick_jaeger::init(mick_jaeger::Config {
service_name: config.service_name,
});
let shutdown_notify = event_listener::Event::new();
if let Some(jaeger_agent) = config.jaeger_agent {
let udp_socket = UdpSocket::bind("0.0.0.0:0").await?;
let mut on_shutdown = shutdown_notify.listen();
(config.tasks_executor)(Box::pin(async move {
loop {
let Some(buf) = future::or(
async {
(&mut on_shutdown).await;
None
},
async { Some(traces_out.next().await) },
)
.await
else {
break;
};
udp_socket.send_to(&buf, jaeger_agent).await.unwrap();
}
}));
}
Ok(Arc::new(JaegerService {
traces_in,
shutdown_notify,
}))
}
pub fn block_announce_receive_span(
&self,
local_peer_id: &PeerId,
remote_peer_id: &PeerId,
block_number: u64,
block_hash: &[u8; 32],
) -> mick_jaeger::Span {
let mut span =
self.net_connection_span(local_peer_id, remote_peer_id, "block-announce-received");
if let Ok(block_number) = i64::try_from(block_number) {
span.add_int_tag("number", block_number);
}
span.add_string_tag("hash", &hex::encode(block_hash));
span
}
pub fn block_announce_process_span(&self, block_hash: &[u8; 32]) -> mick_jaeger::Span {
self.block_span(block_hash, "block-announce-process")
}
pub fn block_authorship_span(
&self,
block_hash: &[u8; 32],
start_time: mick_jaeger::StartTime,
) -> mick_jaeger::Span {
self.block_span(block_hash, "author")
.with_start_time_override(start_time)
}
pub fn block_verify_span(&self, block_hash: &[u8; 32]) -> mick_jaeger::Span {
self.block_span(block_hash, "block-verify")
}
pub fn block_import_queue_span(&self, block_hash: &[u8; 32]) -> mick_jaeger::Span {
self.block_span(block_hash, "block-import-queue")
}
pub fn incoming_block_request_span(
&self,
local_peer_id: &PeerId,
remote_peer_id: &PeerId,
num_requested_blocks: u32,
block_hash: Option<&[u8; 32]>,
) -> [Option<mick_jaeger::Span>; 2] {
let mut span1 =
self.net_connection_span(local_peer_id, remote_peer_id, "incoming-blocks-request");
span1.add_int_tag("num-blocks", num_requested_blocks.into());
let span2 = if let Some(block_hash) = block_hash {
let mut span = self.block_span(block_hash, "incoming-blocks-request");
let hex = hex::encode(block_hash);
span.add_string_tag("hash", &hex);
span1.add_string_tag("hash", &hex);
Some(span)
} else {
None
};
[Some(span1), span2]
}
pub fn _outgoing_block_request_span(
&self,
local_peer_id: &PeerId,
remote_peer_id: &PeerId,
num_requested_blocks: u32,
block_hash: Option<&[u8; 32]>,
) -> [Option<mick_jaeger::Span>; 2] {
let mut span1 =
self.net_connection_span(local_peer_id, remote_peer_id, "outgoing-blocks-request");
span1.add_int_tag("num-blocks", num_requested_blocks.into());
let span2 = if let Some(block_hash) = block_hash {
let mut span = self.block_span(block_hash, "outgoing-blocks-request");
let hex = hex::encode(block_hash);
span.add_string_tag("hash", &hex);
span1.add_string_tag("hash", &hex);
Some(span)
} else {
None
};
[Some(span1), span2]
}
fn block_span(
&self,
block_hash: &[u8; 32],
operation_name: impl Into<String>,
) -> mick_jaeger::Span {
let trace_id = NonZero::<u128>::new(u128::from_be_bytes(
<[u8; 16]>::try_from(&block_hash[16..]).unwrap(),
))
.unwrap_or_else(|| NonZero::<u128>::new(1u128).unwrap());
self.traces_in.span(trace_id, operation_name)
}
fn net_connection_span(
&self,
local_peer_id: &PeerId,
remote_peer_id: &PeerId,
operation_name: impl Into<String>,
) -> mick_jaeger::Span {
let local_peer_id = local_peer_id.as_bytes();
let remote_peer_id = remote_peer_id.as_bytes();
let mut buf = [0; 16];
if local_peer_id < remote_peer_id {
buf[..8].copy_from_slice(&local_peer_id[local_peer_id.len() - 8..]);
buf[8..].copy_from_slice(&remote_peer_id[remote_peer_id.len() - 8..]);
} else {
buf[..8].copy_from_slice(&remote_peer_id[remote_peer_id.len() - 8..]);
buf[8..].copy_from_slice(&local_peer_id[local_peer_id.len() - 8..]);
};
let trace_id = NonZero::<u128>::new(u128::from_be_bytes(buf)).unwrap();
self.traces_in.span(trace_id, operation_name)
}
}
impl Drop for JaegerService {
fn drop(&mut self) {
self.shutdown_notify.notify(usize::MAX);
}
}