#![warn(
missing_debug_implementations,
missing_docs,
rust_2018_idioms,
unreachable_pub
)]
pub mod visitor;
use std::future::Future;
use std::net::SocketAddr;
use bytes::Bytes;
use futures_channel::mpsc;
use futures_util::stream::Stream;
use futures_util::{SinkExt, StreamExt};
use serde_json::{map::Map, Value};
use tokio::net::{lookup_host, TcpStream, ToSocketAddrs, UdpSocket};
use tokio::time;
use tokio_util::codec::{BytesCodec, FramedWrite};
use tokio_util::udp::UdpFramed;
use tracing_core::dispatcher::SetGlobalDefaultError;
use tracing_core::{
span::{Attributes, Id, Record},
Event, Subscriber,
};
use tracing_subscriber::layer::{Context, Layer};
use tracing_subscriber::{registry::LookupSpan, Registry};
const DEFAULT_BUFFER: usize = 512;
const DEFAULT_TIMEOUT: u32 = 10_000;
const DEFAULT_VERSION: &str = "1.1";
#[derive(Debug)]
pub struct Logger {
base_object: Map<String, Value>,
line_numbers: bool,
file_names: bool,
module_paths: bool,
spans: bool,
sender: mpsc::Sender<Bytes>,
}
impl Logger {
pub fn builder() -> Builder {
Builder::default()
}
}
#[derive(Debug)]
#[non_exhaustive]
pub enum BuilderError {
HostnameResolution(std::io::Error),
OsString(std::ffi::OsString),
Global(SetGlobalDefaultError),
}
#[derive(Debug)]
pub struct Builder {
additional_fields: Map<String, Value>,
version: Option<String>,
file_names: bool,
line_numbers: bool,
module_paths: bool,
spans: bool,
timeout_ms: Option<u32>,
buffer: Option<usize>,
}
impl Default for Builder {
fn default() -> Self {
Builder {
additional_fields: Map::with_capacity(32),
version: None,
file_names: true,
line_numbers: true,
module_paths: true,
spans: true,
timeout_ms: None,
buffer: None,
}
}
}
type BackgroundTask = std::pin::Pin<Box<dyn Future<Output = ()> + Send>>;
impl Builder {
pub fn additional_field<K: ToString, V: Into<Value>>(mut self, key: K, value: V) -> Self {
let coerced_value: Value = match value.into() {
Value::Number(n) => Value::Number(n),
Value::String(x) => Value::String(x),
x => Value::String(x.to_string()),
};
self.additional_fields
.insert(format!("_{}", key.to_string()), coerced_value);
self
}
pub fn version<V: ToString>(mut self, version: V) -> Self {
self.version = Some(version.to_string());
self
}
pub fn line_numbers(mut self, value: bool) -> Self {
self.line_numbers = value;
self
}
pub fn file_names(mut self, value: bool) -> Self {
self.file_names = value;
self
}
pub fn module_paths(mut self, value: bool) -> Self {
self.module_paths = value;
self
}
pub fn reconnection_timeout(mut self, millis: u32) -> Self {
self.timeout_ms = Some(millis);
self
}
pub fn buffer(mut self, length: usize) -> Self {
self.buffer = Some(length);
self
}
pub fn connect_tcp<T>(self, addr: T) -> Result<(Logger, BackgroundTask), BuilderError>
where
T: ToSocketAddrs,
T: Send + Sync + 'static,
{
let mut base_object = self.additional_fields;
let hostname = hostname::get()
.map_err(BuilderError::HostnameResolution)?
.into_string()
.map_err(BuilderError::OsString)?;
base_object.insert("host".to_string(), hostname.into());
let version = self.version.unwrap_or_else(|| DEFAULT_VERSION.to_string());
base_object.insert("version".to_string(), version.into());
let timeout_ms = self.timeout_ms.unwrap_or(DEFAULT_TIMEOUT);
let buffer = self.buffer.unwrap_or(DEFAULT_BUFFER);
let (sender, receiver) = mpsc::channel::<Bytes>(buffer);
let mut ok_receiver = receiver.map(Ok);
let bg_task = Box::pin(async move {
loop {
let addrs = lookup_host(&addr).await.into_iter().flatten();
for addr in addrs {
handle_tcp_connection(addr, &mut ok_receiver).await;
}
time::sleep(time::Duration::from_millis(timeout_ms as u64)).await;
}
});
let logger = Logger {
base_object,
file_names: self.file_names,
line_numbers: self.line_numbers,
module_paths: self.module_paths,
spans: self.spans,
sender,
};
Ok((logger, bg_task))
}
pub fn init_tcp_with_subscriber<S, T>(
self,
addr: T,
subscriber: S,
) -> Result<BackgroundTask, BuilderError>
where
S: Subscriber + for<'a> LookupSpan<'a>,
S: Send + Sync + 'static,
T: ToSocketAddrs,
T: Send + Sync + 'static,
{
let (logger, bg_task) = self.connect_tcp(addr)?;
let subscriber = logger.with_subscriber(subscriber);
tracing_core::dispatcher::set_global_default(tracing_core::dispatcher::Dispatch::new(
subscriber,
))
.map_err(BuilderError::Global)?;
Ok(bg_task)
}
pub fn init_tcp<T>(self, addr: T) -> Result<BackgroundTask, BuilderError>
where
T: ToSocketAddrs,
T: Send + Sync + 'static,
{
self.init_tcp_with_subscriber(addr, Registry::default())
}
pub fn connect_udp<T>(self, addr: T) -> Result<(Logger, BackgroundTask), BuilderError>
where
T: ToSocketAddrs,
T: Send + Sync + 'static,
{
let mut base_object = self.additional_fields;
let hostname = hostname::get()
.map_err(BuilderError::HostnameResolution)?
.into_string()
.map_err(BuilderError::OsString)?;
base_object.insert("host".to_string(), hostname.into());
let version = self.version.unwrap_or_else(|| DEFAULT_VERSION.to_string());
base_object.insert("version".to_string(), version.into());
let timeout_ms = self.timeout_ms.unwrap_or(DEFAULT_TIMEOUT);
let buffer = self.buffer.unwrap_or(DEFAULT_BUFFER);
let (sender, mut receiver) = mpsc::channel::<Bytes>(buffer);
let bg_task = Box::pin(async move {
loop {
let addrs = lookup_host(&addr).await.into_iter().flatten();
for addr in addrs {
handle_udp_connection(addr, &mut receiver).await;
}
time::sleep(time::Duration::from_millis(timeout_ms as u64)).await;
}
});
let logger = Logger {
base_object,
file_names: self.file_names,
line_numbers: self.line_numbers,
module_paths: self.module_paths,
spans: self.spans,
sender,
};
Ok((logger, bg_task))
}
pub fn init_udp_with_subscriber<S, T>(
self,
addr: T,
subscriber: S,
) -> Result<BackgroundTask, BuilderError>
where
S: Subscriber + for<'a> LookupSpan<'a>,
S: Send + Sync + 'static,
T: ToSocketAddrs,
T: Send + Sync + 'static,
{
let (logger, bg_task) = self.connect_udp(addr)?;
let subscriber = logger.with_subscriber(subscriber);
tracing_core::dispatcher::set_global_default(tracing_core::dispatcher::Dispatch::new(
subscriber,
))
.map_err(BuilderError::Global)?;
Ok(bg_task)
}
pub fn init_udp<T>(self, addr: T) -> Result<BackgroundTask, BuilderError>
where
T: ToSocketAddrs,
T: Send + Sync + 'static,
{
self.init_udp_with_subscriber(addr, Registry::default())
}
}
impl<S> Layer<S> for Logger
where
S: Subscriber + for<'a> LookupSpan<'a>,
{
fn new_span(&self, attrs: &Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
let span = ctx.span(id).expect("Span not found, this is a bug");
let mut extensions = span.extensions_mut();
if extensions.get_mut::<Map<String, Value>>().is_none() {
let mut object = Map::with_capacity(16);
let mut visitor = visitor::AdditionalFieldVisitor::new(&mut object);
attrs.record(&mut visitor);
extensions.insert(object);
}
}
fn on_record(&self, id: &Id, values: &Record<'_>, ctx: Context<'_, S>) {
let span = ctx.span(id).expect("Span not found, this is a bug");
let mut extensions = span.extensions_mut();
if let Some(mut object) = extensions.get_mut::<Map<String, Value>>() {
let mut add_field_visitor = visitor::AdditionalFieldVisitor::new(&mut object);
values.record(&mut add_field_visitor);
} else {
let mut object = Map::with_capacity(16);
let mut add_field_visitor = visitor::AdditionalFieldVisitor::new(&mut object);
values.record(&mut add_field_visitor);
extensions.insert(object)
}
}
fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
let mut object: Map<String, Value> = self.base_object.clone();
if self.spans {
let span = ctx.scope().fold(String::new(), |mut spans, span| {
if let Some(span_object) = span.extensions().get::<Map<String, Value>>() {
object.extend(span_object.clone());
}
if spans != String::new() {
spans = format!("{}:{}", spans, span.name());
} else {
spans = span.name().to_string();
}
spans
});
object.insert("_span".to_string(), span.into());
}
let metadata = event.metadata();
let level_num = match *metadata.level() {
tracing_core::Level::ERROR => 3,
tracing_core::Level::WARN => 4,
tracing_core::Level::INFO => 5,
tracing_core::Level::DEBUG => 6,
tracing_core::Level::TRACE => 7,
};
object.insert("level".to_string(), level_num.into());
if self.file_names {
if let Some(file) = metadata.file() {
object.insert("_file".to_string(), file.into());
}
}
if self.line_numbers {
if let Some(line) = metadata.line() {
object.insert("_line".to_string(), line.into());
}
}
if self.module_paths {
if let Some(module_path) = metadata.module_path() {
object.insert("_module_path".to_string(), module_path.into());
}
}
let mut add_field_visitor = visitor::AdditionalFieldVisitor::new(&mut object);
event.record(&mut add_field_visitor);
if !object.contains_key("short_message") {
object.insert("short_message".to_string(), "".into());
}
let final_object = Value::Object(object);
let mut raw = serde_json::to_vec(&final_object).unwrap();
raw.push(0);
if let Err(_err) = self.sender.clone().try_send(Bytes::from(raw)) {
};
}
}
async fn handle_tcp_connection<S>(addr: SocketAddr, receiver: &mut S)
where
S: Stream<Item = Result<Bytes, std::io::Error>>,
S: Unpin,
{
let mut tcp_stream = match TcpStream::connect(addr).await {
Ok(ok) => ok,
Err(_) => {
return;
}
};
let (_, writer) = tcp_stream.split();
let mut sink = FramedWrite::new(writer, BytesCodec::new());
if let Err(_err) = sink.send_all(receiver).await {
};
}
async fn handle_udp_connection<S>(addr: SocketAddr, receiver: &mut S)
where
S: Stream<Item = Bytes>,
S: Unpin,
{
let bind_addr = if addr.is_ipv4() {
"0.0.0.0:0"
} else {
"[::]:0"
};
let udp_socket = match UdpSocket::bind(bind_addr).await {
Ok(ok) => ok,
Err(_) => {
return;
}
};
let udp_stream = UdpFramed::new(udp_socket, BytesCodec::new());
let (mut sink, _) = udp_stream.split();
while let Some(bytes) = receiver.next().await {
if let Err(_err) = sink.send((bytes, addr)).await {
break;
};
}
}