use std::io::Stdout;
use std::io::Write;
use serde::ser::{SerializeMap, Serializer};
use serde::Serialize;
use tracing::{Event, Subscriber};
use tracing_log::NormalizeEvent;
use tracing_subscriber::{
fmt::MakeWriter,
layer::Context,
registry::{LookupSpan, SpanRef},
Layer,
};
use crate::json::storage::PrimaJsonVisitor;
use crate::subscriber::{ContextInfo, EventFormatter};
pub struct PrimaFormattingLayer<'writer, W: MakeWriter<'writer>, F: EventFormatter> {
make_writer: &'writer W,
app_name: String,
country: String,
environment: String,
formatter: F,
}
pub fn layer<'writer>(
app_name: String,
country: String,
environment: String,
) -> PrimaFormattingLayer<'writer, impl Fn() -> Stdout, DefaultEventFormatter> {
PrimaFormattingLayer::new(
app_name,
country,
environment,
&std::io::stdout,
DefaultEventFormatter,
)
}
impl<'writer, W: MakeWriter<'writer>, F: EventFormatter> PrimaFormattingLayer<'writer, W, F> {
pub(crate) fn new(
app_name: String,
country: String,
environment: String,
make_writer: &'writer W,
formatter: F,
) -> Self {
Self {
make_writer,
app_name,
country,
environment,
formatter,
}
}
pub fn with_formatter<A: EventFormatter>(
self,
formatter: A,
) -> PrimaFormattingLayer<'writer, W, A> {
PrimaFormattingLayer::new(
self.app_name,
self.country,
self.environment,
self.make_writer,
formatter,
)
}
fn emit(&self, mut buffer: Vec<u8>) -> Result<(), std::io::Error> {
buffer.write_all(b"\n")?;
self.make_writer.make_writer().write_all(&buffer)
}
fn format_event<S>(
&self,
event: &Event<'_>,
ctx: Context<'_, S>,
) -> Result<Vec<u8>, std::io::Error>
where
S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
{
self.formatter.format_event(
event,
ctx,
ContextInfo {
app_name: self.app_name.as_str(),
country: self.country.as_str(),
environment: self.environment.as_str(),
},
)
}
}
impl<S, W, F: 'static> Layer<S> for PrimaFormattingLayer<'static, W, F>
where
S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
W: MakeWriter<'static>,
F: EventFormatter,
{
fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
if let Ok(serialized) = self.format_event(event, ctx) {
let _ = self.emit(serialized);
}
}
}
pub struct DefaultEventFormatter;
impl EventFormatter for DefaultEventFormatter {
fn format_event<S>(
&self,
event: &Event<'_>,
ctx: Context<'_, S>,
info: ContextInfo<'_>,
) -> Result<Vec<u8>, std::io::Error>
where
S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
{
let normalized_metadata = event.normalized_metadata();
let metadata = normalized_metadata
.as_ref()
.unwrap_or_else(|| event.metadata());
let mut buffer = Vec::new();
let mut serializer = serde_json::Serializer::new(&mut buffer);
let mut map_serializer = serializer.serialize_map(None)?;
let mut visitor = PrimaJsonVisitor::default();
event.record(&mut visitor);
for (key, value) in visitor
.fields()
.iter()
.filter(|(&key, _)| key != "message" && !key.starts_with("log."))
{
map_serializer.serialize_entry(key, value)?;
}
map_serializer.serialize_entry("timestamp", &chrono::Utc::now())?;
map_serializer.serialize_entry(
"level",
metadata.level().to_string().to_lowercase().as_str(),
)?;
map_serializer.serialize_entry("country", info.country())?;
map_serializer.serialize_entry("environment", info.environment())?;
map_serializer.serialize_entry("type", info.app_name())?;
map_serializer.serialize_entry("message", &visitor.fields().get("message"))?;
if let Some(current_span) = ctx.current_span().id().and_then(|id| ctx.span(id)) {
map_serializer.serialize_entry("current_span", &SpanSerializer(¤t_span))?;
}
map_serializer.serialize_entry("spans", &SpanListSerializer(&ctx))?;
#[cfg(feature = "datadog")]
{
use opentelemetry::trace::TraceContextExt;
use std::collections::HashMap;
use tracing_opentelemetry::{OpenTelemetrySpanExt, OtelData};
if let Some(current_span) = ctx.current_span().id().and_then(|id| ctx.span(id)) {
let ext = current_span.extensions();
if let Some(otel_data) = ext.get::<OtelData>() {
let mut trace_id_opt = otel_data.trace_id();
let mut span_id_opt = otel_data.span_id();
if trace_id_opt.is_none() || span_id_opt.is_none() {
let ctx = tracing::Span::current().context();
let span = ctx.span();
let sctx = span.span_context();
if sctx.is_valid() {
trace_id_opt = Some(sctx.trace_id());
span_id_opt = Some(sctx.span_id());
}
}
if let (Some(trace_id), Some(span_id)) = (trace_id_opt, span_id_opt) {
let trace_id_u64 = u128::from_be_bytes(trace_id.to_bytes()) as u64;
let span_id_u64 = u64::from_be_bytes(span_id.to_bytes());
let mut dd = HashMap::new();
dd.insert("trace_id", trace_id_u64);
dd.insert("span_id", span_id_u64);
map_serializer.serialize_entry("dd", &dd)?;
}
}
}
}
map_serializer.end()?;
Ok(buffer)
}
}
struct SpanSerializer<'a, 'b, Span>(&'b SpanRef<'a, Span>)
where
Span: for<'lookup> LookupSpan<'lookup>;
impl<Span> Serialize for SpanSerializer<'_, '_, Span>
where
Span: for<'lookup> LookupSpan<'lookup>,
{
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut serializer = serializer.serialize_map(None)?;
serializer.serialize_entry("name", self.0.metadata().name())?;
serializer.serialize_entry("line", &self.0.metadata().line())?;
serializer.serialize_entry("target", &self.0.metadata().target())?;
serializer.serialize_entry("file", &self.0.metadata().file())?;
if let Some(visitor) = self.0.extensions().get::<PrimaJsonVisitor>() {
for (key, value) in visitor.fields().iter() {
serializer.serialize_entry(key, value)?;
}
}
serializer.end()
}
}
struct SpanListSerializer<'a, 'b, S>(&'b Context<'a, S>)
where
S: Subscriber + for<'lookup> LookupSpan<'lookup>;
impl<Sub> Serialize for SpanListSerializer<'_, '_, Sub>
where
Sub: Subscriber + for<'lookup> LookupSpan<'lookup>,
{
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut serializer = serializer.serialize_seq(None)?;
if let Some(span_root) = self
.0
.current_span()
.id()
.and_then(|id| self.0.span_scope(id).map(|iter| iter.from_root()))
{
for span in span_root {
serde::ser::SerializeSeq::serialize_element(
&mut serializer,
&SpanSerializer(&span),
)?;
}
}
serde::ser::SerializeSeq::end(serializer)
}
}