use crate::error::{Error, Result};
use opentelemetry::trace::{SpanId, TraceContextExt};
use serde::{ser::SerializeMap, Serialize, Serializer};
use serde_json::Value;
use std::borrow::Cow;
use std::{collections::BTreeMap, fmt, num::NonZeroU8};
use time::format_description::well_known::iso8601::{Config, Iso8601, TimePrecision};
use tracing::{field::Field, Event, Subscriber};
use tracing_opentelemetry::OtelData;
use tracing_subscriber::registry::{Extensions, SpanRef};
use tracing_subscriber::{
fmt::{format, FmtContext, FormatEvent, FormatFields},
registry::LookupSpan,
};
use crate::observability::floor_char_boundary::floor_char_boundary;
#[cfg(tracing_unstable)]
use crate::observability::TRACING_FIELDS_STRUCTURE_NAME;
#[cfg(tracing_unstable)]
use valuable_serde::Serializable;
pub(crate) const VERSION: &str = "version";
pub(crate) const SERVICE: &str = "service";
pub(crate) const COMPONENT: &str = "component";
pub(crate) const TARGET: &str = "target";
pub(crate) const MSG: &str = "msg";
pub(crate) const MESSAGE: &str = "message";
pub(crate) const LOG_LEVEL: &str = "LogLevel";
pub(crate) const TIME: &str = "time";
pub(crate) const TIMESTAMP: &str = "timestamp";
pub(crate) const TRACE_ID: &str = "traceId";
pub(crate) const SPAN_ID: &str = "spanId";
pub(crate) const INSTANCE_ID: &str = "instanceId";
const DEFAULT_FIELDS: [&str; 11] = [
VERSION, SERVICE, COMPONENT, TARGET, MSG, LOG_LEVEL, TIME, TIMESTAMP, MESSAGE, TRACE_ID,
SPAN_ID,
];
const MIN_LOG_MESSAGE_LEN: usize = 256;
#[derive(Debug, Clone, Default)]
pub struct EventFormatter {
additional_fields: BTreeMap<String, Value>,
msg_len: Option<usize>,
}
impl EventFormatter {
pub fn new() -> Self {
Self::default()
}
pub fn new_with_limit(msg_len: Option<usize>) -> Self {
Self {
additional_fields: Default::default(),
msg_len,
}
}
pub fn add_field_to_events<V: Serialize>(&mut self, key: &str, value: V) -> Result<()> {
if DEFAULT_FIELDS.contains(&key) {
Err(Error::CustomError(format!(
"Prohibited to add key: '{key}' to EventFormatter"
)))
} else {
self.add_default_field_to_events(key, value)
}
}
pub(crate) fn add_default_field_to_events<V: Serialize>(
&mut self,
key: &str,
value: V,
) -> Result<()> {
let val = serde_json::to_value(value)?;
self.additional_fields.insert(key.to_owned(), val);
Ok(())
}
}
impl<S, N> FormatEvent<S, N> for EventFormatter
where
S: Subscriber + for<'a> LookupSpan<'a>,
N: for<'a> FormatFields<'a> + 'static,
{
fn format_event(
&self,
ctx: &FmtContext<'_, S, N>,
mut writer: format::Writer<'_>,
event: &Event<'_>,
) -> fmt::Result {
let serialize = || {
let mut buf = Vec::with_capacity(MIN_LOG_MESSAGE_LEN);
let mut serializer = serde_json::Serializer::new(&mut buf);
let mut map_fmt = serializer.serialize_map(None)?;
let mut visitor = JsonVisitor::new();
event.record(&mut visitor);
let mut event_storage = visitor.storage;
let message = event_storage
.remove(MESSAGE)
.map(|mut msg| {
if let Some(limit) = self.msg_len {
limit_str_value(&mut msg, limit);
}
msg
})
.unwrap_or_default();
let mut event_fields = event_storage.iter().filter(|(key, _)| {
!DEFAULT_FIELDS.contains(&key.as_ref())
&& !self.additional_fields.contains_key(key.as_ref())
});
let mut additional_fields = self.additional_fields.iter();
let target = event.metadata().target();
let level = event.metadata().level();
let time = time::OffsetDateTime::now_utc();
let time_ns = time.unix_timestamp_nanos();
let timestamp = time.format(
&Iso8601::<
{
Config::DEFAULT
.set_time_precision(TimePrecision::Second {
decimal_digits: NonZeroU8::new(3),
})
.encode()
},
>,
);
let tracing_fields = ctx
.lookup_current()
.as_ref()
.map(SpanRef::extensions)
.as_ref()
.and_then(Extensions::get::<OtelData>)
.and_then(|otel_data| {
if otel_data.parent_cx.has_active_span() {
Some(otel_data.parent_cx.span().span_context().trace_id())
} else {
otel_data.builder.trace_id
}
.map(|trace_id| {
let span_id = otel_data.builder.span_id.unwrap_or(SpanId::INVALID);
(span_id, trace_id)
})
});
map_fmt.serialize_entry(TIME, &time_ns)?;
if let Ok(timestamp) = timestamp {
map_fmt.serialize_entry(TIMESTAMP, timestamp.as_str())?;
}
map_fmt.serialize_entry(LOG_LEVEL, level.as_str())?;
map_fmt.serialize_entry(TARGET, target)?;
if let Some((span_id, trace_id)) = tracing_fields {
map_fmt.serialize_entry(TRACE_ID, &trace_id.to_string())?;
map_fmt.serialize_entry(SPAN_ID, &span_id.to_string())?;
}
additional_fields.try_for_each(|(k, v)| map_fmt.serialize_entry(k, v))?;
map_fmt.serialize_entry(MSG, &message)?;
event_fields.try_for_each(|(k, v)| map_fmt.serialize_entry(k, v))?;
map_fmt.end()?;
Ok(buf)
};
let buffer: std::result::Result<Vec<u8>, std::io::Error> = serialize();
match buffer {
Ok(formatted) => match std::str::from_utf8(&formatted) {
Ok(str) => {
write!(writer, "{str}")?;
}
Err(_) => {
write!(writer, "{}", String::from_utf8_lossy(&formatted))?;
}
},
Err(err) => {
write!(writer, "{err}")?;
}
}
writeln!(writer)
}
}
#[derive(Clone, Debug, Default)]
struct JsonVisitor<'a> {
storage: BTreeMap<Cow<'a, str>, Value>,
}
impl<'a> JsonVisitor<'a> {
fn new() -> Self {
Self {
storage: Default::default(),
}
}
#[cfg(tracing_unstable)]
fn insert_owned<T: Serialize>(&mut self, key: String, value: T) {
let value = serde_json::json!(value);
self.storage.insert(Cow::Owned(key), value);
}
fn insert_borrowed<T: Serialize>(&mut self, key: &'a str, value: T) {
let value = serde_json::json!(value);
self.storage.insert(Cow::Borrowed(key), value);
}
}
impl<'a> tracing::field::Visit for JsonVisitor<'a> {
#[cfg(tracing_unstable)]
fn record_value(&mut self, field: &Field, value: valuable::Value<'_>) {
let mut serde_value = serde_json::json!(Serializable::new(value));
let structure = value.as_structable();
if let Some(structure) = structure {
let definition = structure.definition();
if definition.name() == TRACING_FIELDS_STRUCTURE_NAME {
match serde_value.as_object_mut() {
Some(value) => {
let value = std::mem::take(value);
value.into_iter().for_each(|(k, v)| {
self.insert_owned(k, v);
});
return;
}
None => {
unreachable!("Named structure should always be mapped to serde_json::Value::Object()")
}
}
}
}
self.insert_borrowed(field.name(), serde_value)
}
fn record_f64(&mut self, field: &Field, value: f64) {
self.insert_borrowed(field.name(), value);
}
fn record_i64(&mut self, field: &Field, value: i64) {
self.insert_borrowed(field.name(), value);
}
fn record_u64(&mut self, field: &Field, value: u64) {
self.insert_borrowed(field.name(), value);
}
fn record_bool(&mut self, field: &Field, value: bool) {
self.insert_borrowed(field.name(), value);
}
fn record_str(&mut self, field: &Field, value: &str) {
self.insert_borrowed(field.name(), value);
}
fn record_error(&mut self, field: &Field, value: &(dyn std::error::Error + 'static)) {
self.insert_borrowed(field.name(), value.to_string());
}
fn record_debug(&mut self, field: &Field, value: &dyn fmt::Debug) {
self.insert_borrowed(field.name().trim_start_matches("r#"), format!("{value:?}"));
}
}
fn limit_str_value(value: &mut Value, limit: usize) {
if let Value::String(str) = value {
if str.len() > limit {
let new_limit = floor_char_boundary(str, limit);
str.replace_range(new_limit..str.len(), " ...");
}
}
}