mod agent_sampling;
mod span_processor;
use std::fmt::Debug;
use std::fmt::Formatter;
use std::time::Duration;
pub(crate) use agent_sampling::DatadogAgentSampling;
use ahash::HashMap;
use ahash::HashMapExt;
use http::Uri;
use opentelemetry::Key;
use opentelemetry::KeyValue;
use opentelemetry::Value;
use opentelemetry::trace::SpanContext;
use opentelemetry::trace::SpanKind;
use opentelemetry_sdk::Resource;
use opentelemetry_sdk::error::OTelSdkResult;
use opentelemetry_sdk::trace::SpanData;
use opentelemetry_sdk::trace::SpanExporter;
use opentelemetry_sdk::trace::span_processor_with_async_runtime::BatchSpanProcessor;
use opentelemetry_semantic_conventions::resource::SERVICE_NAME;
use opentelemetry_semantic_conventions::resource::SERVICE_VERSION;
use schemars::JsonSchema;
use serde::Deserialize;
pub(crate) use span_processor::DatadogSpanProcessor;
use tower::BoxError;
use crate::plugins::telemetry::config::Conf;
use crate::plugins::telemetry::config::GenericWith;
use crate::plugins::telemetry::consts::BUILT_IN_SPAN_NAMES;
use crate::plugins::telemetry::consts::HTTP_REQUEST_SPAN_NAME;
use crate::plugins::telemetry::consts::OTEL_ORIGINAL_NAME;
use crate::plugins::telemetry::consts::QUERY_PLANNING_SPAN_NAME;
use crate::plugins::telemetry::consts::REQUEST_SPAN_NAME;
use crate::plugins::telemetry::consts::ROUTER_SPAN_NAME;
use crate::plugins::telemetry::consts::SUBGRAPH_REQUEST_SPAN_NAME;
use crate::plugins::telemetry::consts::SUBGRAPH_SPAN_NAME;
use crate::plugins::telemetry::consts::SUPERGRAPH_SPAN_NAME;
use crate::plugins::telemetry::endpoint::UriEndpoint;
use crate::plugins::telemetry::reload::tracing::TracingBuilder;
use crate::plugins::telemetry::reload::tracing::TracingConfigurator;
use crate::plugins::telemetry::resource::ConfigResource;
use crate::plugins::telemetry::tracing::BatchProcessorConfig;
use crate::plugins::telemetry::tracing::NamedSpanExporter;
use crate::plugins::telemetry::tracing::NamedTokioRuntime;
use crate::plugins::telemetry::tracing::SpanProcessorExt;
use crate::plugins::telemetry::tracing::datadog_exporter;
use crate::plugins::telemetry::tracing::datadog_exporter::DatadogTraceState;
fn default_resource_mappings() -> HashMap<String, String> {
let mut map = HashMap::with_capacity(7);
map.insert(REQUEST_SPAN_NAME, "http.route");
map.insert(ROUTER_SPAN_NAME, "http.route");
map.insert(SUPERGRAPH_SPAN_NAME, "graphql.operation.name");
map.insert(QUERY_PLANNING_SPAN_NAME, "graphql.operation.name");
map.insert(SUBGRAPH_SPAN_NAME, "subgraph.name");
map.insert(SUBGRAPH_REQUEST_SPAN_NAME, "graphql.operation.name");
map.insert(HTTP_REQUEST_SPAN_NAME, "http.route");
map.iter()
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect()
}
const ENV_KEY: Key = Key::from_static_str("env");
const DEFAULT_ENDPOINT: &str = "http://127.0.0.1:8126";
const DD_TRACE_AGENT_URL: &str = "DD_TRACE_AGENT_URL";
const DD_AGENT_HOST: &str = "DD_AGENT_HOST";
const DD_TRACE_AGENT_PORT: &str = "DD_TRACE_AGENT_PORT";
#[derive(Debug, Clone, Deserialize, JsonSchema, serde_derive_default::Default, PartialEq)]
#[serde(deny_unknown_fields)]
#[schemars(rename = "DatadogConfig")]
pub(crate) struct Config {
pub(crate) enabled: bool,
#[serde(default)]
endpoint: UriEndpoint,
#[serde(default)]
batch_processor: BatchProcessorConfig,
#[serde(default = "default_true")]
enable_span_mapping: bool,
#[serde(default = "default_true")]
fixed_span_names: bool,
#[serde(default)]
resource_mapping: HashMap<String, String>,
#[serde(default = "default_span_metrics")]
span_metrics: HashMap<String, bool>,
}
fn default_span_metrics() -> HashMap<String, bool> {
let mut map = HashMap::with_capacity(BUILT_IN_SPAN_NAMES.len());
for name in BUILT_IN_SPAN_NAMES {
map.insert(name.to_string(), true);
}
map
}
fn default_true() -> bool {
true
}
impl Config {
fn endpoint_with_env_override(&self) -> Result<Uri, BoxError> {
if let Ok(url) = std::env::var(DD_TRACE_AGENT_URL) {
return url.parse::<Uri>().map_err(|e| {
format!("invalid URI in {}: '{}': {}", DD_TRACE_AGENT_URL, url, e).into()
});
}
if let Ok(host) = std::env::var(DD_AGENT_HOST) {
let port = std::env::var(DD_TRACE_AGENT_PORT).unwrap_or_else(|_| "8126".to_string());
let url = format!("http://{}:{}", host, port);
return url.parse::<Uri>().map_err(|e| {
format!(
"invalid URI from {} and {}: '{}': {}",
DD_AGENT_HOST, DD_TRACE_AGENT_PORT, url, e
)
.into()
});
}
Ok(self
.endpoint
.to_full_uri(&Uri::from_static(DEFAULT_ENDPOINT)))
}
}
impl TracingConfigurator for Config {
fn config(conf: &Conf) -> &Self {
&conf.exporters.tracing.datadog
}
fn is_enabled(&self) -> bool {
self.enabled
}
fn configure(&self, builder: &mut TracingBuilder) -> Result<(), BoxError> {
tracing::info!("Configuring Datadog tracing: {}", self.batch_processor);
let resource = builder.tracing_common().to_resource();
let resource_mappings = self.enable_span_mapping.then(|| {
let mut resource_mappings = default_resource_mappings();
resource_mappings.extend(self.resource_mapping.clone());
resource_mappings
.iter()
.map(|(k, v)| (k.clone(), opentelemetry::Key::from(v.clone())))
.collect::<HashMap<String, Key>>()
});
let fixed_span_names = self.fixed_span_names;
let endpoint = self.endpoint_with_env_override()?;
let exporter = datadog_exporter::new_pipeline()
.with_agent_endpoint(endpoint.to_string().trim_end_matches('/'))
.with(&resource_mappings, |builder, resource_mappings| {
let resource_mappings = resource_mappings.clone();
builder.with_resource_mapping(move |span, _model_config| {
let span_name = if let Some(original) = span
.attributes
.iter()
.find(|kv| kv.key.as_str() == OTEL_ORIGINAL_NAME)
{
original.value.as_str()
} else {
span.name.clone()
};
if let Some(mapping) = resource_mappings.get(span_name.as_ref())
&& let Some(KeyValue {
key: _,
value: Value::String(v),
..
}) = span.attributes.iter().find(|kv| kv.key == *mapping)
{
return v.as_str();
}
span.name.as_ref()
})
})
.with_name_mapping(move |span, _model_config| {
if fixed_span_names
&& let Some(original) = span
.attributes
.iter()
.find(|kv| kv.key.as_str() == OTEL_ORIGINAL_NAME)
{
for name in BUILT_IN_SPAN_NAMES {
if name == original.value.as_str() {
return name;
}
}
}
&span.name
})
.with(
&resource.get(&SERVICE_NAME.into()),
|builder, service_name| {
builder.with_service_name(service_name.as_str())
},
)
.with(&resource.get(&ENV_KEY), |builder, env| {
builder.with_env(env.as_str())
})
.with_version(
resource
.get(&SERVICE_VERSION.into())
.expect("cargo version is set as a resource default;qed")
.to_string(),
)
.with_http_client(
reqwest::Client::builder()
.pool_idle_timeout(Duration::from_millis(1))
.build()?,
)
.build_exporter()?;
let mut span_metrics = default_span_metrics();
span_metrics.extend(self.span_metrics.clone());
let wrapper = ExporterWrapper {
delegate: exporter,
span_metrics,
};
let named_exporter = NamedSpanExporter::new(wrapper, "datadog");
let batch_processor =
BatchSpanProcessor::builder(named_exporter, NamedTokioRuntime::new("datadog-tracing"))
.with_batch_config(self.batch_processor.clone().with_env_overrides()?.into())
.build()
.filtered();
if builder
.tracing_common()
.preview_datadog_agent_sampling
.unwrap_or_default()
{
builder.with_span_processor(batch_processor.always_sampled())
} else {
builder.with_span_processor(batch_processor)
}
Ok(())
}
}
struct ExporterWrapper {
delegate: datadog_exporter::DatadogExporter,
span_metrics: HashMap<String, bool>,
}
impl Debug for ExporterWrapper {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
self.delegate.fmt(f)
}
}
impl SpanExporter for ExporterWrapper {
fn export(
&self,
mut batch: Vec<SpanData>,
) -> impl std::future::Future<Output = OTelSdkResult> + Send {
for span in &mut batch {
let original_span_name = span
.attributes
.iter()
.find(|kv| kv.key.as_str() == OTEL_ORIGINAL_NAME)
.map(|kv| kv.value.as_str());
let final_span_name = if let Some(span_name) = &original_span_name {
span_name.as_ref()
} else {
span.name.as_ref()
};
if let Some(setting) = self.span_metrics.get(final_span_name)
&& *setting != span.span_context.trace_state().measuring_enabled()
{
let new_trace_state = span.span_context.trace_state().with_measuring(*setting);
span.span_context = SpanContext::new(
span.span_context.trace_id(),
span.span_context.span_id(),
span.span_context.trace_flags(),
span.span_context.is_remote(),
new_trace_state,
)
}
let span_kind = match &span.span_kind {
SpanKind::Client => "client",
SpanKind::Server => "server",
SpanKind::Producer => "producer",
SpanKind::Consumer => "consumer",
SpanKind::Internal => "internal",
};
span.attributes.push(KeyValue::new("span.kind", span_kind));
}
self.delegate.export(batch)
}
fn shutdown(&mut self) -> OTelSdkResult {
self.delegate.shutdown()
}
fn force_flush(&mut self) -> OTelSdkResult {
self.delegate.force_flush()
}
fn set_resource(&mut self, resource: &Resource) {
self.delegate.set_resource(resource);
}
}