use std::sync::Arc;
use std::time::Instant;
use tracing::field::{Field, Visit};
use tracing::span::{Attributes, Id, Record};
use tracing::{event, warn, Level, Span, Subscriber};
use tracing_subscriber::layer::Context;
use tracing_subscriber::Layer;
use super::events::{
storage_metric_from_attrs, CrcReadCompleted, DomainMetadataLoaded, JsonReadCompleted,
LogSegmentLoaded, MetricEvent, ParquetReadCompleted, ProtocolMetadataLoaded,
ScanMetadataCompleted, SetTransactionLoaded, SnapshotCompleted, STORAGE_SPAN,
};
pub trait MetricsReporter: Send + Sync + std::fmt::Debug {
fn report(&self, event: MetricEvent);
}
#[derive(Debug)]
pub struct LoggingMetricsReporter {
level: Level,
}
impl LoggingMetricsReporter {
pub fn new(level: Level) -> Self {
Self { level }
}
}
impl MetricsReporter for LoggingMetricsReporter {
fn report(&self, event: MetricEvent) {
match self.level {
Level::ERROR => event!(parent: Span::none(), Level::ERROR, "{}", event),
Level::WARN => event!(parent: Span::none(), Level::WARN, "{}", event),
Level::INFO => event!(parent: Span::none(), Level::INFO, "{}", event),
Level::DEBUG => event!(parent: Span::none(), Level::DEBUG, "{}", event),
Level::TRACE => event!(parent: Span::none(), Level::TRACE, "{}", event),
}
}
}
#[derive(Debug)]
pub struct ReportGeneratorLayer {
reporter: Arc<dyn MetricsReporter>,
}
impl ReportGeneratorLayer {
pub fn new(reporter: Arc<dyn MetricsReporter>) -> Self {
Self { reporter }
}
fn drain_into_visitor<S>(
span: Option<tracing_subscriber::registry::SpanRef<'_, S>>,
record: impl FnOnce(&mut EventVisitor),
) where
S: Subscriber + for<'l> tracing_subscriber::registry::LookupSpan<'l>,
{
let warnings = span.and_then(|span| {
let mut extensions = span.extensions_mut();
let visitor = extensions.get_mut::<EventVisitor>()?;
record(visitor);
Some(std::mem::take(&mut visitor.pending_warnings))
});
for w in warnings.unwrap_or_default() {
warn!("{w}");
}
}
}
impl<S> Layer<S> for ReportGeneratorLayer
where
S: Subscriber + for<'lookup> tracing_subscriber::registry::LookupSpan<'lookup>,
{
fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
let Some(metadata) = ctx.metadata(id) else {
return;
};
let event = match metadata.name() {
LogSegmentLoaded::SPAN_NAME => Some(MetricEvent::LogSegmentLoaded(
LogSegmentLoaded::from_attrs(attrs),
)),
ProtocolMetadataLoaded::SPAN_NAME => Some(MetricEvent::ProtocolMetadataLoaded(
ProtocolMetadataLoaded::from_attrs(attrs),
)),
SnapshotCompleted::SPAN_NAME => Some(MetricEvent::SnapshotCompleted(
SnapshotCompleted::from_attrs(attrs),
)),
DomainMetadataLoaded::SPAN_NAME => Some(MetricEvent::DomainMetadataLoaded(
DomainMetadataLoaded::from_attrs(attrs),
)),
SetTransactionLoaded::SPAN_NAME => Some(MetricEvent::SetTransactionLoaded(
SetTransactionLoaded::from_attrs(attrs),
)),
CrcReadCompleted::SPAN_NAME => Some(MetricEvent::CrcReadCompleted(
CrcReadCompleted::from_attrs(attrs),
)),
JsonReadCompleted::SPAN_NAME => Some(MetricEvent::JsonReadCompleted(
JsonReadCompleted::from_attrs(attrs),
)),
ParquetReadCompleted::SPAN_NAME => Some(MetricEvent::ParquetReadCompleted(
ParquetReadCompleted::from_attrs(attrs),
)),
ScanMetadataCompleted::SPAN_NAME => Some(MetricEvent::ScanMetadataCompleted(
ScanMetadataCompleted::from_attrs(attrs),
)),
STORAGE_SPAN => storage_metric_from_attrs(attrs),
_ => None,
};
if let Some(span) = ctx.span(id) {
let mut extensions = span.extensions_mut();
extensions.insert(EventVisitor::new(event));
}
}
fn on_event(&self, event: &tracing::Event<'_>, ctx: Context<'_, S>) {
Self::drain_into_visitor(ctx.event_span(event), |v| event.record(v));
}
fn on_record(&self, id: &Id, values: &Record<'_>, ctx: Context<'_, S>) {
Self::drain_into_visitor(ctx.span(id), |v| values.record(v));
}
fn on_enter(&self, id: &Id, ctx: Context<'_, S>) {
if let Some(span) = ctx.span(id) {
let mut extensions = span.extensions_mut();
if extensions.get_mut::<Instant>().is_none() {
extensions.insert(Instant::now());
}
}
}
fn on_close(&self, id: Id, ctx: Context<'_, S>) {
let Some(metadata) = ctx.metadata(&id) else {
return;
};
if metadata.fields().field("report").is_none() {
return;
}
let Some(span) = ctx.span(&id) else { return };
let event = {
let mut extensions = span.extensions_mut();
let duration = extensions.get_mut::<Instant>().map(|s| s.elapsed());
let Some(visitor) = extensions.get_mut::<EventVisitor>() else {
return;
};
if let (Some(d), Some(event)) = (duration, visitor.event.as_mut()) {
event.set_duration_if_applicable(d);
}
visitor.event.take()
}; if let Some(event) = event {
self.reporter.report(event);
}
}
}
struct EventVisitor {
event: Option<MetricEvent>,
pending_warnings: Vec<String>,
}
impl EventVisitor {
fn new(event: Option<MetricEvent>) -> Self {
Self {
event,
pending_warnings: vec![],
}
}
fn warn_invalid(&mut self, field: &Field, span_name: &str) {
self.pending_warnings.push(format!(
"Invalid field '{}' recorded on {span_name} span",
field.name()
));
}
}
impl Visit for EventVisitor {
fn record_u64(&mut self, field: &Field, value: u64) {
let Some(event) = self.event.as_mut() else {
return;
};
if let Err(span_name) = event.record_u64(field.name(), value) {
self.warn_invalid(field, span_name);
}
}
fn record_bool(&mut self, field: &Field, value: bool) {
let Some(event) = self.event.as_mut() else {
return;
};
if let Err(span_name) = event.record_bool(field.name(), value) {
self.warn_invalid(field, span_name);
}
}
fn record_debug(&mut self, field: &Field, _value: &dyn std::fmt::Debug) {
match field.name() {
"return" => {} "error" => {
self.event = match self.event.take() {
Some(MetricEvent::SnapshotCompleted(snap)) => {
Some(MetricEvent::SnapshotFailed(snap.into_failed()))
}
Some(
MetricEvent::DomainMetadataLoaded(_) | MetricEvent::SetTransactionLoaded(_),
) => None,
other => other,
};
}
_ => {}
}
}
}