use std::io::IsTerminal;
use std::time::Duration;
use anyhow::anyhow;
use once_cell::sync::OnceCell;
use opentelemetry::Context;
use opentelemetry::InstrumentationScope;
use opentelemetry::trace::SpanContext;
use opentelemetry::trace::SpanId;
use opentelemetry::trace::TraceContextExt;
use opentelemetry::trace::TraceFlags;
use opentelemetry::trace::TraceState;
use opentelemetry::trace::TracerProvider;
use opentelemetry_sdk::trace::Tracer;
use tower::BoxError;
use tracing_subscriber::EnvFilter;
use tracing_subscriber::Layer;
use tracing_subscriber::Registry;
use tracing_subscriber::layer::Layered;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::registry::SpanRef;
use tracing_subscriber::reload::Handle;
use tracing_subscriber::util::SubscriberInitExt;
use crate::plugins::telemetry::dynamic_attribute::DynAttributeLayer;
use crate::plugins::telemetry::fmt_layer::FmtLayer;
use crate::plugins::telemetry::formatters::json::Json;
use crate::plugins::telemetry::formatters::text::Text;
use crate::plugins::telemetry::otel;
use crate::plugins::telemetry::otel::OpenTelemetryLayer;
use crate::plugins::telemetry::otel::PreSampledTracer;
use crate::plugins::telemetry::reload::rate_limit::RateLimitLayer;
use crate::plugins::telemetry::tracing::reload::ReloadTracer;
use crate::tracer::TraceId;
pub(crate) type LayeredRegistry = Layered<DynAttributeLayer, Registry>;
pub(in crate::plugins::telemetry) type LayeredTracer =
Layered<OpenTelemetryLayer<LayeredRegistry, ReloadTracer<Tracer>>, LayeredRegistry>;
pub(in crate::plugins::telemetry) static OPENTELEMETRY_TRACER_HANDLE: OnceCell<
ReloadTracer<opentelemetry_sdk::trace::Tracer>,
> = OnceCell::new();
static FMT_LAYER_HANDLE: OnceCell<
Handle<Box<dyn Layer<LayeredTracer> + Send + Sync>, LayeredTracer>,
> = OnceCell::new();
pub(crate) fn init_telemetry(log_level: &str) -> anyhow::Result<()> {
let hot_tracer = ReloadTracer::new(
opentelemetry_sdk::trace::SdkTracerProvider::default()
.tracer_with_scope(InstrumentationScope::builder("noop").build()),
);
let opentelemetry_layer = otel::layer().with_tracer(hot_tracer.clone());
let fmt = if std::io::stdout().is_terminal() {
FmtLayer::new(Text::default(), std::io::stdout).boxed()
} else {
FmtLayer::new(Json::default(), std::io::stdout).boxed()
};
let (fmt_layer, fmt_handle) = tracing_subscriber::reload::Layer::new(fmt);
OPENTELEMETRY_TRACER_HANDLE
.get_or_try_init(move || {
let log_level = format!("{log_level},salsa=error,opentelemetry=warn");
tracing::debug!("Running the router with log level set to {log_level}");
tracing_subscriber::registry()
.with(DynAttributeLayer::new())
.with(opentelemetry_layer)
.with(fmt_layer)
.with(WarnLegacyMetricsLayer)
.with(RateLimitLayer::new(
"opentelemetry",
Duration::from_secs(10),
))
.with(EnvFilter::try_new(log_level)?)
.try_init()?;
Ok(hot_tracer)
})
.map_err(|e: BoxError| anyhow!("failed to set OpenTelemetry tracer: {e}"))?;
FMT_LAYER_HANDLE
.set(fmt_handle)
.map_err(|_| anyhow!("failed to set fmt layer handle"))?;
Ok(())
}
pub(in crate::plugins::telemetry) fn reload_fmt(
layer: Box<dyn Layer<LayeredTracer> + Send + Sync>,
) {
if let Some(handle) = FMT_LAYER_HANDLE.get() {
handle.reload(layer).expect("fmt layer reload must succeed");
}
}
pub(crate) fn apollo_opentelemetry_initialized() -> bool {
OPENTELEMETRY_TRACER_HANDLE.get().is_some()
}
pub(crate) fn prepare_context(context: Context) -> Context {
if !context.span().span_context().is_valid()
&& let Some(tracer) = OPENTELEMETRY_TRACER_HANDLE.get()
{
let span_context = SpanContext::new(
tracer.new_trace_id(),
tracer.new_span_id(),
TraceFlags::default(),
false,
TraceState::default(),
);
return context.with_remote_span_context(span_context);
}
context
}
#[derive(Clone, Debug)]
pub(crate) enum SampledSpan {
NotSampled(TraceId, SpanId),
Sampled(TraceId, SpanId),
}
impl SampledSpan {
pub(crate) fn trace_and_span_id(&self) -> (TraceId, SpanId) {
match self {
SampledSpan::NotSampled(trace_id, span_id)
| SampledSpan::Sampled(trace_id, span_id) => (trace_id.clone(), *span_id),
}
}
}
pub(crate) trait IsSampled {
fn is_sampled(&self) -> bool;
fn get_trace_id(&self) -> Option<TraceId>;
}
impl<'a, T> IsSampled for SpanRef<'a, T>
where
T: tracing_subscriber::registry::LookupSpan<'a>,
{
fn is_sampled(&self) -> bool {
self.extensions()
.get::<SampledSpan>()
.is_some_and(|s| matches!(s, SampledSpan::Sampled(_, _)))
}
fn get_trace_id(&self) -> Option<TraceId> {
let extensions = self.extensions();
extensions.get::<SampledSpan>().map(|s| match s {
SampledSpan::Sampled(trace_id, _) | SampledSpan::NotSampled(trace_id, _) => {
trace_id.clone()
}
})
}
}
const LEGACY_METRIC_PREFIX_MONOTONIC_COUNTER: &str = "monotonic_counter.";
const LEGACY_METRIC_PREFIX_COUNTER: &str = "counter.";
const LEGACY_METRIC_PREFIX_HISTOGRAM: &str = "histogram.";
const LEGACY_METRIC_PREFIX_VALUE: &str = "value.";
struct WarnLegacyMetricsLayer;
static WARN_LEGACY_METRIC_CALLSITE: tracing_core::callsite::DefaultCallsite =
tracing_core::callsite::DefaultCallsite::new(&WARN_LEGACY_METRIC_METADATA);
static WARN_LEGACY_METRIC_METADATA: tracing_core::Metadata = tracing_core::metadata! {
name: "warn_legacy_metric",
target: module_path!(),
level: tracing_core::Level::ERROR,
fields: &["message", "metric_name"],
callsite: &WARN_LEGACY_METRIC_CALLSITE,
kind: tracing_core::metadata::Kind::EVENT,
};
impl<S: tracing::Subscriber> Layer<S> for WarnLegacyMetricsLayer {
fn on_event(&self, event: &tracing::Event<'_>, ctx: tracing_subscriber::layer::Context<'_, S>) {
if let Some(field) = event.fields().find(|field| {
field
.name()
.starts_with(LEGACY_METRIC_PREFIX_MONOTONIC_COUNTER)
|| field.name().starts_with(LEGACY_METRIC_PREFIX_COUNTER)
|| field.name().starts_with(LEGACY_METRIC_PREFIX_HISTOGRAM)
|| field.name().starts_with(LEGACY_METRIC_PREFIX_VALUE)
}) {
let fields = WARN_LEGACY_METRIC_METADATA.fields();
let message_field = fields.field("message").unwrap();
let message =
"Detected unsupported legacy metrics reporting, remove or migrate to opentelemetry"
.to_string();
let name_field = fields.field("metric_name").unwrap();
let metric_name = field.name().to_string();
let value_set = &[
(&message_field, Some(&message as &dyn tracing::Value)),
(&name_field, Some(&metric_name as &dyn tracing::Value)),
];
let value_set = fields.value_set(value_set);
ctx.event(&tracing_core::Event::new(
&WARN_LEGACY_METRIC_METADATA,
&value_set,
));
}
}
}