use std::fmt::Display;
use std::fmt::Formatter;
use std::time::Duration;
use opentelemetry::Context;
use opentelemetry_sdk::Resource;
use opentelemetry_sdk::error::OTelSdkResult;
use opentelemetry_sdk::trace::BatchConfig;
use opentelemetry_sdk::trace::BatchConfigBuilder;
use opentelemetry_sdk::trace::Span;
use opentelemetry_sdk::trace::SpanData;
use opentelemetry_sdk::trace::SpanProcessor;
use schemars::JsonSchema;
use serde::Deserialize;
use tower::BoxError;
use super::formatters::APOLLO_CONNECTOR_PREFIX;
use super::formatters::APOLLO_PRIVATE_PREFIX;
use crate::plugins::telemetry::tracing::datadog::DatadogSpanProcessor;
pub(crate) mod apollo;
pub(crate) mod apollo_telemetry;
pub(crate) mod datadog;
#[allow(unreachable_pub, dead_code)]
pub(crate) mod datadog_exporter;
mod named;
pub(crate) mod otlp;
pub(crate) mod reload;
pub(crate) mod zipkin;
pub(crate) use named::NamedSpanExporter;
pub(crate) use named::NamedTokioRuntime;
#[derive(Debug)]
struct ApolloFilterSpanProcessor<T: SpanProcessor> {
delegate: T,
}
impl<T: SpanProcessor> SpanProcessor for ApolloFilterSpanProcessor<T> {
fn on_start(&self, span: &mut Span, cx: &Context) {
self.delegate.on_start(span, cx);
}
fn on_end(&self, span: SpanData) {
if span.attributes.iter().any(|kv| {
kv.key.as_str().starts_with(APOLLO_PRIVATE_PREFIX)
|| kv.key.as_str().starts_with(APOLLO_CONNECTOR_PREFIX)
}) {
let span = SpanData {
attributes: span
.attributes
.into_iter()
.filter(|kv| {
!kv.key.as_str().starts_with(APOLLO_PRIVATE_PREFIX)
&& !kv.key.as_str().starts_with(APOLLO_CONNECTOR_PREFIX)
})
.collect(),
..span
};
self.delegate.on_end(span);
} else {
self.delegate.on_end(span);
}
}
fn force_flush(&self) -> OTelSdkResult {
self.delegate.force_flush()
}
fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult {
self.delegate.shutdown_with_timeout(timeout)
}
fn set_resource(&mut self, resource: &Resource) {
self.delegate.set_resource(resource)
}
}
trait SpanProcessorExt
where
Self: Sized + SpanProcessor,
{
fn filtered(self) -> ApolloFilterSpanProcessor<Self>;
fn always_sampled(self) -> DatadogSpanProcessor<Self>;
}
impl<T: SpanProcessor> SpanProcessorExt for T
where
Self: Sized,
{
fn filtered(self) -> ApolloFilterSpanProcessor<Self> {
ApolloFilterSpanProcessor { delegate: self }
}
fn always_sampled(self) -> DatadogSpanProcessor<Self> {
DatadogSpanProcessor::new(self)
}
}
#[derive(Debug, Clone, Deserialize, JsonSchema, PartialEq)]
#[serde(default)]
pub(crate) struct BatchProcessorConfig {
#[serde(deserialize_with = "humantime_serde::deserialize")]
#[schemars(with = "String")]
pub(crate) scheduled_delay: Duration,
pub(crate) max_queue_size: usize,
pub(crate) max_export_batch_size: usize,
#[serde(deserialize_with = "humantime_serde::deserialize")]
#[schemars(with = "String")]
pub(crate) max_export_timeout: Duration,
pub(crate) max_concurrent_exports: usize,
}
pub(crate) fn scheduled_delay_default() -> Duration {
Duration::from_secs(5)
}
pub(crate) fn max_queue_size_default() -> usize {
2048
}
fn max_export_batch_size_default() -> usize {
512
}
pub(crate) fn max_export_timeout_default() -> Duration {
Duration::from_secs(30)
}
fn max_concurrent_exports_default() -> usize {
1
}
impl BatchProcessorConfig {
pub(crate) fn with_env_overrides(self) -> Result<Self, BoxError> {
Ok(BatchProcessorConfig {
scheduled_delay: Self::parse_duration_env(
"OTEL_BSP_SCHEDULE_DELAY",
self.scheduled_delay,
)?,
max_queue_size: Self::parse_usize_env("OTEL_BSP_MAX_QUEUE_SIZE", self.max_queue_size)?,
max_export_batch_size: Self::parse_usize_env(
"OTEL_BSP_MAX_EXPORT_BATCH_SIZE",
self.max_export_batch_size,
)?,
max_export_timeout: Self::parse_duration_env(
"OTEL_BSP_EXPORT_TIMEOUT",
self.max_export_timeout,
)?,
max_concurrent_exports: Self::parse_usize_env(
"OTEL_BSP_MAX_CONCURRENT_EXPORTS",
self.max_concurrent_exports,
)?,
})
}
fn parse_duration_env(var: &str, default: Duration) -> Result<Duration, BoxError> {
match std::env::var(var) {
Ok(value) => {
let millis = value.parse::<u64>().map_err(|e| {
format!(
"invalid value '{}' for {}, expected milliseconds: {}",
value, var, e
)
})?;
Ok(Duration::from_millis(millis))
}
Err(_) => Ok(default),
}
}
fn parse_usize_env(var: &str, default: usize) -> Result<usize, BoxError> {
match std::env::var(var) {
Ok(value) => value.parse::<usize>().map_err(|e| {
format!(
"invalid value '{}' for {}, expected integer: {}",
value, var, e
)
.into()
}),
Err(_) => Ok(default),
}
}
}
impl From<BatchProcessorConfig> for BatchConfig {
fn from(config: BatchProcessorConfig) -> Self {
BatchConfigBuilder::default()
.with_scheduled_delay(config.scheduled_delay)
.with_max_queue_size(config.max_queue_size)
.with_max_export_batch_size(config.max_export_batch_size)
.with_max_concurrent_exports(config.max_concurrent_exports)
.with_max_export_timeout(config.max_export_timeout)
.build()
}
}
impl Display for BatchProcessorConfig {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_str(&format!("BatchConfig {{ scheduled_delay={}, max_queue_size={}, max_export_batch_size={}, max_export_timeout={}, max_concurrent_exports={} }}",
humantime::format_duration(self.scheduled_delay),
self.max_queue_size,
self.max_export_batch_size,
humantime::format_duration(self.max_export_timeout),
self.max_concurrent_exports))
}
}
impl Default for BatchProcessorConfig {
fn default() -> Self {
BatchProcessorConfig {
scheduled_delay: scheduled_delay_default(),
max_queue_size: max_queue_size_default(),
max_export_batch_size: max_export_batch_size_default(),
max_export_timeout: max_export_timeout_default(),
max_concurrent_exports: max_concurrent_exports_default(),
}
}
}