pub(crate) mod json;
pub(crate) mod text;
use std::collections::HashMap;
use std::fmt;
use std::time::Instant;
use opentelemetry::sdk::Resource;
use opentelemetry_api::KeyValue;
use opentelemetry_api::trace::SpanId;
use opentelemetry_api::trace::TraceContextExt;
use opentelemetry_api::trace::TraceId;
use parking_lot::Mutex;
use serde_json::Number;
use tracing::Subscriber;
use tracing_core::callsite::Identifier;
use tracing_subscriber::fmt::FormatEvent;
use tracing_subscriber::fmt::FormatFields;
use tracing_subscriber::fmt::format::Writer;
use tracing_subscriber::layer::Context;
use tracing_subscriber::registry::LookupSpan;
use tracing_subscriber::registry::SpanRef;
use super::config_new::logging::RateLimit;
use super::dynamic_attribute::LogAttributes;
use super::reload::SampledSpan;
use crate::metrics::layer::METRIC_PREFIX_COUNTER;
use crate::metrics::layer::METRIC_PREFIX_HISTOGRAM;
use crate::metrics::layer::METRIC_PREFIX_MONOTONIC_COUNTER;
use crate::metrics::layer::METRIC_PREFIX_VALUE;
use crate::plugins::telemetry::otel::OtelData;
pub(crate) const APOLLO_PRIVATE_PREFIX: &str = "apollo_private.";
pub(crate) const EXCLUDED_ATTRIBUTES: [&str; 5] = [
"code.filepath",
"code.namespace",
"code.lineno",
"thread.id",
"thread.name",
];
pub(crate) struct FilteringFormatter<T, F> {
inner: T,
filter_fn: F,
rate_limiter: Mutex<HashMap<Identifier, RateCounter>>,
config: RateLimit,
}
impl<T, F> FilteringFormatter<T, F>
where
F: Fn(&tracing::Event<'_>) -> bool,
{
pub(crate) fn new(inner: T, filter_fn: F, rate_limit: &RateLimit) -> Self {
Self {
inner,
filter_fn,
rate_limiter: Mutex::new(HashMap::new()),
config: rate_limit.clone(),
}
}
}
impl<T, F, S, N> FormatEvent<S, N> for FilteringFormatter<T, F>
where
T: FormatEvent<S, N>,
F: Fn(&tracing::Event<'_>) -> bool,
S: Subscriber + for<'a> LookupSpan<'a>,
N: for<'a> FormatFields<'a> + 'static,
{
fn format_event(
&self,
ctx: &tracing_subscriber::fmt::FmtContext<'_, S, N>,
writer: Writer<'_>,
event: &tracing::Event<'_>,
) -> fmt::Result {
if (self.filter_fn)(event) {
match self.rate_limit(event) {
RateResult::Deny => return Ok(()),
RateResult::Allow => {}
RateResult::AllowSkipped(skipped) => {
if let Some(span) = event
.parent()
.and_then(|id| ctx.span(id))
.or_else(|| ctx.lookup_current())
{
let mut extensions = span.extensions_mut();
match extensions.get_mut::<LogAttributes>() {
None => {
let mut attributes = LogAttributes::default();
attributes
.insert(KeyValue::new("skipped_messages", skipped as i64));
extensions.insert(attributes);
}
Some(attributes) => {
attributes
.insert(KeyValue::new("skipped_messages", skipped as i64));
}
}
}
}
}
self.inner.format_event(ctx, writer, event)
} else {
Ok(())
}
}
}
impl<T, F, S> EventFormatter<S> for FilteringFormatter<T, F>
where
T: EventFormatter<S>,
F: Fn(&tracing::Event<'_>) -> bool,
S: Subscriber + for<'a> LookupSpan<'a>,
{
fn format_event<W>(
&self,
ctx: &Context<'_, S>,
writer: &mut W,
event: &tracing::Event<'_>,
) -> fmt::Result
where
W: std::fmt::Write,
{
if (self.filter_fn)(event) {
match self.rate_limit(event) {
RateResult::Deny => return Ok(()),
RateResult::Allow => {}
RateResult::AllowSkipped(skipped) => {
if let Some(span) = event
.parent()
.and_then(|id| ctx.span(id))
.or_else(|| ctx.lookup_current())
{
let mut extensions = span.extensions_mut();
match extensions.get_mut::<LogAttributes>() {
None => {
let mut attributes = LogAttributes::default();
attributes
.insert(KeyValue::new("skipped_messages", skipped as i64));
extensions.insert(attributes);
}
Some(attributes) => {
attributes
.insert(KeyValue::new("skipped_messages", skipped as i64));
}
}
}
}
}
self.inner.format_event(ctx, writer, event)
} else {
Ok(())
}
}
}
enum RateResult {
Allow,
AllowSkipped(u32),
Deny,
}
impl<T, F> FilteringFormatter<T, F> {
fn rate_limit(&self, event: &tracing::Event<'_>) -> RateResult {
if self.config.enabled {
let now = Instant::now();
if let Some(counter) = self
.rate_limiter
.lock()
.get_mut(&event.metadata().callsite())
{
if now - counter.last < self.config.interval {
counter.count += 1;
if counter.count >= self.config.capacity {
return RateResult::Deny;
}
} else {
if counter.count > self.config.capacity {
let skipped = counter.count - self.config.capacity;
counter.last = now;
counter.count += 1;
return RateResult::AllowSkipped(skipped);
}
counter.last = now;
counter.count += 1;
}
return RateResult::Allow;
}
self.rate_limiter.lock().insert(
event.metadata().callsite(),
RateCounter {
last: now,
count: 1,
},
);
}
RateResult::Allow
}
}
struct RateCounter {
last: Instant,
count: u32,
}
pub(crate) fn filter_metric_events(event: &tracing::Event<'_>) -> bool {
!event.metadata().fields().iter().any(|f| {
f.name().starts_with(METRIC_PREFIX_COUNTER)
|| f.name().starts_with(METRIC_PREFIX_HISTOGRAM)
|| f.name().starts_with(METRIC_PREFIX_MONOTONIC_COUNTER)
|| f.name().starts_with(METRIC_PREFIX_VALUE)
})
}
pub(crate) fn to_list(resource: Resource) -> Vec<(String, serde_json::Value)> {
resource
.into_iter()
.map(|(k, v)| {
(
k.into(),
match v {
opentelemetry::Value::Bool(value) => serde_json::Value::Bool(value),
opentelemetry::Value::I64(value) => {
serde_json::Value::Number(Number::from(value))
}
opentelemetry::Value::F64(value) => serde_json::Value::Number(
Number::from_f64(value).unwrap_or(Number::from(0)),
),
opentelemetry::Value::String(value) => serde_json::Value::String(value.into()),
opentelemetry::Value::Array(value) => match value {
opentelemetry::Array::Bool(array) => serde_json::Value::Array(
array.into_iter().map(serde_json::Value::Bool).collect(),
),
opentelemetry::Array::I64(array) => serde_json::Value::Array(
array
.into_iter()
.map(|value| serde_json::Value::Number(Number::from(value)))
.collect(),
),
opentelemetry::Array::F64(array) => serde_json::Value::Array(
array
.into_iter()
.map(|value| {
serde_json::Value::Number(
Number::from_f64(value).unwrap_or(Number::from(0)),
)
})
.collect(),
),
opentelemetry::Array::String(array) => serde_json::Value::Array(
array
.into_iter()
.map(|s| serde_json::Value::String(s.to_string()))
.collect(),
),
},
},
)
})
.collect()
}
pub(crate) trait EventFormatter<S> {
fn format_event<W>(
&self,
ctx: &Context<'_, S>,
writer: &mut W,
event: &tracing::Event<'_>,
) -> fmt::Result
where
W: std::fmt::Write;
}
#[inline]
pub(crate) fn get_trace_and_span_id<S>(span: &SpanRef<S>) -> Option<(TraceId, SpanId)>
where
S: Subscriber + for<'lookup> LookupSpan<'lookup>,
{
let ext = span.extensions();
if let Some(otel_data) = ext.get::<OtelData>() {
if let (Some(trace_id), Some(span_id)) =
(otel_data.builder.trace_id, otel_data.builder.span_id)
{
return Some((trace_id, span_id));
}
let span = otel_data.parent_cx.span();
let span_context = span.span_context();
if span_context.is_valid() {
return Some((span_context.trace_id(), span_context.span_id()));
}
}
if let Some(sampled_span) = ext.get::<SampledSpan>() {
let (trace_id, span_id) = sampled_span.trace_and_span_id();
return Some((
opentelemetry_api::trace::TraceId::from(trace_id.to_u128()),
span_id,
));
}
None
}