use std::collections::BTreeMap;
use std::collections::HashMap;
use std::fmt;
use std::ops::ControlFlow;
use std::sync::Arc;
use std::sync::LazyLock;
use std::sync::atomic::AtomicU64;
use std::time::Duration;
use std::time::Instant;
use ::tracing::Span;
use ::tracing::info_span;
use config_new::Selectors;
use config_new::cache::CacheInstruments;
use config_new::connector::instruments::ConnectorInstruments;
use config_new::instruments::InstrumentsConfig;
use config_new::instruments::StaticInstrument;
use config_new::router_overhead;
use futures::StreamExt;
use futures::future::BoxFuture;
use futures::future::ready;
use futures::stream::once;
use http::HeaderMap;
use http::HeaderName;
use http::HeaderValue;
use http::StatusCode;
use http::header;
use http::header::CACHE_CONTROL;
use metrics::apollo::studio::SingleLimitsStats;
use metrics::local_type_stats::LocalTypeStatRecorder;
use multimap::MultiMap;
use opentelemetry::Key;
use opentelemetry::KeyValue;
use opentelemetry::metrics::MeterProvider;
use opentelemetry::metrics::ObservableGauge;
use opentelemetry::propagation::Extractor;
use opentelemetry::propagation::Injector;
use opentelemetry::propagation::TextMapPropagator;
use opentelemetry::propagation::text_map_propagator::FieldIter;
use opentelemetry::trace::SpanContext;
use opentelemetry::trace::SpanId;
use opentelemetry::trace::TraceContextExt;
use opentelemetry::trace::TraceFlags;
use opentelemetry::trace::TraceId;
use opentelemetry::trace::TraceState;
use parking_lot::Mutex;
use parking_lot::RwLock;
use rand::RngExt as _;
use regex::Regex;
use reload::activation::Activation;
use reload::tracing::TracingConfigurator;
use serde_json_bytes::ByteString;
use serde_json_bytes::Map;
use serde_json_bytes::Value;
use serde_json_bytes::json;
use tower::BoxError;
use tower::ServiceBuilder;
use tower::ServiceExt;
use uuid::Uuid;
use self::apollo::ForwardValues;
use self::apollo::LicensedOperationCountByType;
use self::apollo::OperationSubType;
use self::apollo::SingleReport;
use self::apollo_exporter::Sender;
use self::apollo_exporter::proto;
use self::config::Conf;
use self::config::TraceIdFormat;
use self::config_new::instruments::Instrumented;
use self::config_new::router::events::RouterEvents;
use self::config_new::router::instruments::RouterInstruments;
use self::config_new::subgraph::events::SubgraphEvents;
use self::config_new::subgraph::instruments::SubgraphInstruments;
use self::config_new::supergraph::events::SupergraphEvents;
use self::metrics::apollo::studio::SingleTypeStat;
pub(crate) use self::span_factory::SpanMode;
use self::tracing::apollo_telemetry::APOLLO_PRIVATE_DURATION_NS;
use self::tracing::apollo_telemetry::CLIENT_NAME_KEY;
use self::tracing::apollo_telemetry::CLIENT_VERSION_KEY;
use crate::Context;
use crate::ListenAddr;
use crate::apollo_studio_interop::ExtendedReferenceStats;
use crate::apollo_studio_interop::ReferencedEnums;
use crate::apollo_studio_interop::UsageReporting;
use crate::context::OPERATION_KIND;
use crate::context::OPERATION_NAME;
use crate::graphql::ResponseVisitor;
use crate::layers::ServiceBuilderExt;
use crate::layers::instrument::InstrumentLayer;
use crate::metrics::meter_provider;
use crate::plugin::PluginInit;
use crate::plugin::PluginPrivate;
use crate::plugins::telemetry::apollo::ForwardHeaders;
use crate::plugins::telemetry::apollo_exporter::proto::reports::StatsContext;
use crate::plugins::telemetry::apollo_exporter::proto::reports::trace::node::Id::ResponseName;
use crate::plugins::telemetry::config::AttributeValue;
use crate::plugins::telemetry::config_new::DatadogId;
use crate::plugins::telemetry::config_new::apollo::instruments::ApolloConnectorInstruments;
use crate::plugins::telemetry::config_new::apollo::instruments::ApolloSubgraphInstruments;
use crate::plugins::telemetry::config_new::connector::events::ConnectorEvents;
use crate::plugins::telemetry::config_new::cost::add_cost_attributes;
use crate::plugins::telemetry::config_new::graphql::GraphQLInstruments;
use crate::plugins::telemetry::config_new::instruments::CustomHistogramInner;
use crate::plugins::telemetry::config_new::instruments::SupergraphInstruments;
use crate::plugins::telemetry::config_new::router::instruments::ResponseBodySizeRecording;
use crate::plugins::telemetry::config_new::trace_id;
use crate::plugins::telemetry::consts::EXECUTION_SPAN_NAME;
use crate::plugins::telemetry::consts::HTTP_REQUEST_SPAN_NAME;
use crate::plugins::telemetry::consts::OTEL_NAME;
use crate::plugins::telemetry::consts::OTEL_STATUS_CODE;
use crate::plugins::telemetry::consts::OTEL_STATUS_CODE_ERROR;
use crate::plugins::telemetry::consts::OTEL_STATUS_CODE_OK;
use crate::plugins::telemetry::consts::REQUEST_SPAN_NAME;
use crate::plugins::telemetry::consts::ROUTER_SPAN_NAME;
use crate::plugins::telemetry::dynamic_attribute::SpanDynAttribute;
use crate::plugins::telemetry::error_counter::count_execution_errors;
use crate::plugins::telemetry::error_counter::count_router_errors;
use crate::plugins::telemetry::error_counter::count_subgraph_errors;
use crate::plugins::telemetry::error_counter::count_supergraph_errors;
use crate::plugins::telemetry::metrics::apollo::histogram::ListLengthHistogram;
use crate::plugins::telemetry::metrics::apollo::studio::LocalTypeStat;
use crate::plugins::telemetry::metrics::apollo::studio::SingleContextualizedStats;
use crate::plugins::telemetry::metrics::apollo::studio::SinglePathErrorStats;
use crate::plugins::telemetry::metrics::apollo::studio::SingleQueryLatencyStats;
use crate::plugins::telemetry::metrics::apollo::studio::SingleStats;
use crate::plugins::telemetry::metrics::apollo::studio::SingleStatsReport;
use crate::plugins::telemetry::otel::OpenTelemetrySpanExt;
use crate::plugins::telemetry::reload::metrics::MetricsConfigurator;
use crate::plugins::telemetry::tracing::apollo_telemetry::APOLLO_PRIVATE_OPERATION_SIGNATURE;
use crate::plugins::telemetry::tracing::apollo_telemetry::decode_ftv1_trace;
use crate::query_planner::OperationKind;
use crate::router_factory::Endpoint;
use crate::services::ExecutionRequest;
use crate::services::ExecutionResponse;
use crate::services::SubgraphRequest;
use crate::services::SubgraphResponse;
use crate::services::SupergraphRequest;
use crate::services::SupergraphResponse;
use crate::services::connector;
use crate::services::execution;
use crate::services::layers::apq::PERSISTED_QUERY_CACHE_HIT;
use crate::services::layers::persisted_queries::RequestPersistedQueryId;
use crate::services::router;
use crate::services::subgraph;
use crate::services::supergraph;
use crate::spec::operation_limits::OperationLimits;
pub(crate) mod apollo;
pub(crate) mod apollo_exporter;
pub(crate) mod apollo_otlp_exporter;
pub(crate) mod config;
pub(crate) mod config_new;
pub(crate) mod consts;
pub(crate) mod dynamic_attribute;
mod endpoint;
mod error_counter;
mod fmt_layer;
pub(crate) mod formatters;
mod logging;
pub(crate) mod metrics;
pub(crate) mod otel;
mod otlp;
pub(crate) mod reload;
pub(crate) mod resource;
pub(crate) mod span_ext;
mod span_factory;
pub(crate) mod tracing;
pub(crate) mod utils;
pub(crate) const CLIENT_NAME: &str = "apollo::telemetry::client_name";
pub(crate) const CLIENT_LIBRARY_NAME: &str = "apollo::telemetry::client_library_name";
pub(crate) const CLIENT_VERSION: &str = "apollo::telemetry::client_version";
pub(crate) const CLIENT_LIBRARY_VERSION: &str = "apollo::telemetry::client_library_version";
pub(crate) const SUBGRAPH_FTV1: &str = "apollo::telemetry::subgraph_ftv1";
pub(crate) const STUDIO_EXCLUDE: &str = "apollo::telemetry::studio_exclude";
pub(crate) const SUPERGRAPH_SCHEMA_ID_CONTEXT_KEY: &str = "apollo::supergraph_schema_id";
const GLOBAL_TRACER_NAME: &str = "apollo-router";
const DEFAULT_EXPOSE_TRACE_ID_HEADER: &str = "apollo-trace-id";
static DEFAULT_EXPOSE_TRACE_ID_HEADER_NAME: HeaderName =
HeaderName::from_static(DEFAULT_EXPOSE_TRACE_ID_HEADER);
static FTV1_HEADER_NAME: HeaderName = HeaderName::from_static("apollo-federation-include-trace");
static FTV1_HEADER_VALUE: HeaderValue = HeaderValue::from_static("ftv1");
pub(crate) const APOLLO_PRIVATE_QUERY_ALIASES: Key =
Key::from_static_str("apollo_private.query.aliases");
pub(crate) const APOLLO_PRIVATE_QUERY_DEPTH: Key =
Key::from_static_str("apollo_private.query.depth");
pub(crate) const APOLLO_PRIVATE_QUERY_HEIGHT: Key =
Key::from_static_str("apollo_private.query.height");
pub(crate) const APOLLO_PRIVATE_QUERY_ROOT_FIELDS: Key =
Key::from_static_str("apollo_private.query.root_fields");
pub(crate) const APOLLO_CLIENT_NAME_ATTRIBUTE: &str = "apollo.client.name";
pub(crate) const APOLLO_CLIENT_VERSION_ATTRIBUTE: &str = "apollo.client.version";
pub(crate) const GRAPHQL_OPERATION_NAME_ATTRIBUTE: &str = "graphql.operation.name";
pub(crate) const GRAPHQL_OPERATION_TYPE_ATTRIBUTE: &str = "graphql.operation.type";
pub(crate) const APOLLO_OPERATION_ID_ATTRIBUTE: &str = "apollo.operation.id";
pub(crate) const APOLLO_HAS_ERRORS_ATTRIBUTE: &str = "has_errors";
pub(crate) const APOLLO_CONNECTOR_SOURCE_ATTRIBUTE: &str = "connector.source";
#[doc(hidden)] pub(crate) struct Telemetry {
pub(crate) config: Arc<config::Conf>,
supergraph_schema_id: Arc<String>,
custom_endpoints: MultiMap<ListenAddr, Endpoint>,
apollo_metrics_sender: apollo_exporter::Sender,
field_level_instrumentation_ratio: f64,
builtin_instruments: RwLock<BuiltinInstruments>,
activation: Mutex<Option<Activation>>,
enabled_features: EnabledFeatures,
}
#[derive(Debug, Clone)]
pub(crate) struct LruSizeInstrument {
value: Arc<AtomicU64>,
_gauge: ObservableGauge<u64>,
}
impl LruSizeInstrument {
pub(crate) fn new(gauge_name: &'static str) -> Self {
let value = Arc::new(AtomicU64::new(0));
let meter = meter_provider().meter("apollo/router");
let gauge = meter
.u64_observable_gauge(gauge_name)
.with_callback({
let value = Arc::clone(&value);
move |gauge| {
gauge.observe(value.load(std::sync::atomic::Ordering::Relaxed), &[]);
}
})
.build();
Self {
value,
_gauge: gauge,
}
}
pub(crate) fn update(&self, value: u64) {
self.value
.store(value, std::sync::atomic::Ordering::Relaxed);
}
}
struct BuiltinInstruments {
graphql_custom_instruments: Arc<HashMap<String, StaticInstrument>>,
router_custom_instruments: Arc<HashMap<String, StaticInstrument>>,
supergraph_custom_instruments: Arc<HashMap<String, StaticInstrument>>,
subgraph_custom_instruments: Arc<HashMap<String, StaticInstrument>>,
apollo_subgraph_instruments: Arc<HashMap<String, StaticInstrument>>,
connector_custom_instruments: Arc<HashMap<String, StaticInstrument>>,
apollo_connector_instruments: Arc<HashMap<String, StaticInstrument>>,
cache_custom_instruments: Arc<HashMap<String, StaticInstrument>>,
}
fn create_builtin_instruments(config: &InstrumentsConfig) -> BuiltinInstruments {
BuiltinInstruments {
graphql_custom_instruments: Arc::new(config.new_builtin_graphql_instruments()),
router_custom_instruments: Arc::new(config.new_builtin_router_instruments()),
supergraph_custom_instruments: Arc::new(config.new_builtin_supergraph_instruments()),
subgraph_custom_instruments: Arc::new(config.new_builtin_subgraph_instruments()),
apollo_subgraph_instruments: Arc::new(config.new_builtin_apollo_subgraph_instruments()),
connector_custom_instruments: Arc::new(config.new_builtin_connector_instruments()),
apollo_connector_instruments: Arc::new(config.new_builtin_apollo_connector_instruments()),
cache_custom_instruments: Arc::new(config.new_builtin_cache_instruments()),
}
}
#[derive(Clone, Debug)]
struct EnabledFeatures {
distributed_apq_cache: bool,
entity_cache: bool,
response_cache: bool,
}
impl EnabledFeatures {
fn list(&self) -> Vec<String> {
[
("distributed_apq_cache", self.distributed_apq_cache),
("entity_cache", self.entity_cache),
("response_cache", self.response_cache),
]
.iter()
.filter(|&&(_, enabled)| enabled)
.map(&|(name, _): &(&str, _)| name.to_string())
.collect()
}
}
#[async_trait::async_trait]
impl PluginPrivate for Telemetry {
type Config = config::Conf;
async fn new(init: PluginInit<Self::Config>) -> Result<Self, BoxError> {
match &init.previous_config {
Some(_prev_config) => {
::tracing::debug!("Telemetry plugin reload detected with previous configuration");
}
None => {
::tracing::debug!(
"Telemetry plugin initial startup without previous configuration"
);
}
}
let mut config = init.config;
config.instrumentation.spans.update_defaults();
config.instrumentation.instruments.update_defaults();
if let Err(err) = config.instrumentation.validate() {
::tracing::warn!(
"Potential configuration error for 'instrumentation': {err}, please check the documentation on https://www.apollographql.com/docs/router/configuration/telemetry/instrumentation/events"
);
}
let field_level_instrumentation_ratio =
config.calculate_field_level_instrumentation_ratio()?;
let (activation, custom_endpoints, apollo_metrics_sender) =
reload::prepare(&init.previous_config, &config)?;
if config.instrumentation.spans.mode == SpanMode::Deprecated {
::tracing::warn!(
"telemetry.instrumentation.spans.mode is currently set to 'deprecated', either explicitly or via defaulting. Set telemetry.instrumentation.spans.mode explicitly in your router.yaml to 'spec_compliant' for log and span attributes that follow OpenTelemetry semantic conventions. This option will be defaulted to 'spec_compliant' in a future release and eventually removed altogether"
);
}
let full_config = init
.full_config
.as_ref()
.expect("Required full router configuration not found in telemetry plugin");
let enabled_features = Self::extract_enabled_features(full_config);
::tracing::debug!("Enabled scale features: {:?}", enabled_features);
Ok(Telemetry {
custom_endpoints,
apollo_metrics_sender,
supergraph_schema_id: init.supergraph_schema_id,
field_level_instrumentation_ratio,
activation: Mutex::new(Some(activation)),
builtin_instruments: RwLock::new(create_builtin_instruments(
&config.instrumentation.instruments,
)),
enabled_features,
config: Arc::new(config),
})
}
fn router_service(&self, service: router::BoxService) -> router::BoxService {
let config = self.config.clone();
let supergraph_schema_id = self.supergraph_schema_id.clone();
let config_later = self.config.clone();
let config_request = self.config.clone();
let config_checkpoint = self.config.clone();
let span_mode = config.instrumentation.spans.mode;
let use_legacy_request_span =
matches!(config.instrumentation.spans.mode, SpanMode::Deprecated);
let enabled_features = self.enabled_features.clone();
let field_level_instrumentation_ratio = self.field_level_instrumentation_ratio;
let metrics_sender = self.apollo_metrics_sender.clone();
let static_router_instruments = self
.builtin_instruments
.read()
.router_custom_instruments
.clone();
let spans = &self.config.instrumentation.spans;
let router_attributes = &spans.router.attributes.attributes;
let client_name_key = router_attributes
.client_name
.as_ref()
.and_then(|a| a.key(CLIENT_NAME_KEY));
let client_version_key = router_attributes
.client_version
.as_ref()
.and_then(|a| a.key(CLIENT_VERSION_KEY));
ServiceBuilder::new()
.layer(metrics::allocation::AllocationMetricsLayer::new())
.map_response(move |response: router::Response| {
let span = Span::current();
if let Some(span_name) = span.metadata().map(|metadata| metadata.name())
&& ((use_legacy_request_span && span_name == REQUEST_SPAN_NAME)
|| (!use_legacy_request_span && span_name == ROUTER_SPAN_NAME))
{
let operation_kind = response.context.get::<_, String>(OPERATION_KIND);
let operation_name = response.context.get::<_, String>(OPERATION_NAME);
if let Ok(Some(operation_kind)) = &operation_kind {
span.record("graphql.operation.type", operation_kind);
}
if let Ok(Some(operation_name)) = &operation_name {
span.record("graphql.operation.name", operation_name);
}
match (&operation_kind, &operation_name) {
(Ok(Some(kind)), Ok(Some(name))) => span.set_span_dyn_attribute(
OTEL_NAME.into(),
format!("{kind} {name}").into(),
),
(Ok(Some(kind)), _) => {
span.set_span_dyn_attribute(OTEL_NAME.into(), kind.clone().into())
}
_ => span
.set_span_dyn_attribute(OTEL_NAME.into(), "GraphQL Operation".into()),
};
}
response
})
.option_layer(use_legacy_request_span.then(move || {
InstrumentLayer::new(move |request: &router::Request| {
span_mode.create_router(&request.router_request)
})
}))
.checkpoint(move |req: router::Request| {
let library_name_valid = req
.router_request
.headers()
.get(&config_checkpoint.apollo.library_name_header)
.and_then(|v| v.to_str().ok())
.is_none_or(is_valid_client_library_value);
let library_version_valid = req
.router_request
.headers()
.get(&config_checkpoint.apollo.library_version_header)
.and_then(|v| v.to_str().ok())
.is_none_or(is_valid_client_library_value);
if !library_name_valid || !library_version_valid {
if !library_name_valid {
::tracing::warn!(
"Rejecting request: invalid client library name header value"
);
}
if !library_version_valid {
::tracing::warn!(
"Rejecting request: invalid client library version header value"
);
}
Ok(ControlFlow::Break(
router::Response::error_builder()
.status_code(StatusCode::BAD_REQUEST)
.context(req.context)
.build()?,
))
} else {
Ok(ControlFlow::Continue(req))
}
})
.map_future_with_request_data(
move |request: &router::Request| {
let _ = request.context.insert(
SUPERGRAPH_SCHEMA_ID_CONTEXT_KEY,
supergraph_schema_id.clone(),
);
let client_name = request
.router_request
.headers()
.get(&config_request.apollo.client_name_header)
.and_then(|h| h.to_str().ok());
let client_version = request
.router_request
.headers()
.get(&config_request.apollo.client_version_header)
.and_then(|h| h.to_str().ok());
if let Some(name) = client_name {
let _ = request.context.insert(CLIENT_NAME, name.to_owned());
}
if let Some(version) = client_version {
let _ = request.context.insert(CLIENT_VERSION, version.to_owned());
}
let library_name = request
.router_request
.headers()
.get(&config_request.apollo.library_name_header)
.and_then(|h| h.to_str().ok());
let library_version = request
.router_request
.headers()
.get(&config_request.apollo.library_version_header)
.and_then(|h| h.to_str().ok());
if let Some(name) = library_name {
let _ = request.context.insert(CLIENT_LIBRARY_NAME, name.to_owned());
}
if let Some(version) = library_version {
let _ = request
.context
.insert(CLIENT_LIBRARY_VERSION, version.to_owned());
}
let mut custom_attributes = config_request
.instrumentation
.spans
.router
.attributes
.on_request(request);
custom_attributes.push(KeyValue::new(
Key::from_static_str("apollo_private.http.request_headers"),
filter_headers(
request.router_request.headers(),
&config_request.apollo.send_headers,
),
));
request.context.extensions().with_lock(|lock| {
lock.insert(router_overhead::RouterOverheadTracker::new());
});
let custom_instruments: RouterInstruments = config_request
.instrumentation
.instruments
.new_router_instruments(static_router_instruments.clone());
custom_instruments.on_request(request);
let mut custom_events: RouterEvents =
config_request.instrumentation.events.new_router_events();
custom_events.on_request(request);
(
custom_attributes,
custom_instruments,
custom_events,
request.context.clone(),
)
},
move |(mut custom_attributes, custom_instruments, mut custom_events, ctx): (
Vec<KeyValue>,
RouterInstruments,
RouterEvents,
Context,
),
fut| {
let start = Instant::now();
let config = config_later.clone();
let sender = metrics_sender.clone();
let enabled_features = enabled_features.clone();
let client_name_key = client_name_key.clone();
let client_version_key = client_version_key.clone();
Self::plugin_metrics(&config);
async move {
let get_from_context =
|ctx: &Context, key| ctx.get::<&str, String>(key).ok().flatten();
let client_name = get_from_context(&ctx, CLIENT_NAME).or_else(|| {
get_from_context(
&ctx,
crate::context::deprecated::DEPRECATED_CLIENT_NAME,
)
});
let client_version = get_from_context(&ctx, CLIENT_VERSION).or_else(|| {
get_from_context(
&ctx,
crate::context::deprecated::DEPRECATED_CLIENT_VERSION,
)
});
if let Some(key) = client_name_key {
custom_attributes
.push(KeyValue::new(key, client_name.unwrap_or_default()));
}
if let Some(key) = client_version_key {
custom_attributes
.push(KeyValue::new(key, client_version.unwrap_or_default()));
}
if let Some(http_server_response_body_size) =
&custom_instruments.http_server_response_body_size
{
let CustomHistogramInner {
histogram,
attributes,
..
} = &*http_server_response_body_size.inner.lock();
if let Some(histogram) = &histogram {
let recording = ResponseBodySizeRecording::new(
histogram.clone(),
attributes.clone(),
);
ctx.extensions().with_lock(|lock| lock.insert(recording));
}
}
let span = Span::current();
span.set_span_dyn_attributes(custom_attributes);
let response: Result<router::Response, BoxError> = fut.await;
span.record(
APOLLO_PRIVATE_DURATION_NS,
start.elapsed().as_nanos() as i64,
);
let expose_trace_id = &config.exporters.tracing.response_trace_id;
if let Ok(response) = &response {
span.set_span_dyn_attributes(
config
.instrumentation
.spans
.router
.attributes
.on_response(response),
);
custom_instruments.on_response(response);
custom_events.on_response(response);
let mut headers: HashMap<String, Vec<String>> =
HashMap::with_capacity(2);
if expose_trace_id.enabled {
let header_name = expose_trace_id
.header_name
.as_ref()
.unwrap_or(&DEFAULT_EXPOSE_TRACE_ID_HEADER_NAME);
if let Some(value) = response.response.headers().get(header_name) {
headers.insert(
header_name.to_string(),
vec![value.to_str().unwrap_or_default().to_string()],
);
}
}
if let Some(value) = response.response.headers().get(&CACHE_CONTROL) {
headers.insert(
CACHE_CONTROL.to_string(),
vec![value.to_str().unwrap_or_default().to_string()],
);
}
if !headers.is_empty() {
let response_headers =
serde_json::to_string(&headers).unwrap_or_default();
span.record(
"apollo_private.http.response_headers",
&response_headers,
);
}
if response.context.extensions().with_lock(|lock| {
lock.get::<Arc<UsageReporting>>()
.map(|u| matches!(**u, UsageReporting::Error { .. }))
.unwrap_or(false)
}) {
Self::update_apollo_metrics(
&response.context,
field_level_instrumentation_ratio,
sender,
true,
start.elapsed(),
OperationKind::Query,
None,
Default::default(),
enabled_features.clone(),
);
}
if response.response.status() >= StatusCode::BAD_REQUEST {
span.record(OTEL_STATUS_CODE, OTEL_STATUS_CODE_ERROR);
} else {
span.record(OTEL_STATUS_CODE, OTEL_STATUS_CODE_OK);
}
} else if let Err(err) = &response {
span.record(OTEL_STATUS_CODE, OTEL_STATUS_CODE_ERROR);
span.set_span_dyn_attributes(
config
.instrumentation
.spans
.router
.attributes
.on_error(err, &ctx),
);
custom_instruments.on_error(err, &ctx);
custom_events.on_error(err, &ctx);
}
if let Ok(resp) = response {
Ok(count_router_errors(resp, &config.apollo.errors).await)
} else {
response
}
}
},
)
.service(service)
.boxed()
}
fn supergraph_service(&self, service: supergraph::BoxService) -> supergraph::BoxService {
let metrics_sender = self.apollo_metrics_sender.clone();
let span_mode = self.config.instrumentation.spans.mode;
let config = self.config.clone();
let config_instrument = self.config.clone();
let config_map_res_first = config.clone();
let config_map_res = config.clone();
let enabled_features = self.enabled_features.clone();
let field_level_instrumentation_ratio = self.field_level_instrumentation_ratio;
let static_supergraph_instruments = self
.builtin_instruments
.read()
.supergraph_custom_instruments
.clone();
let static_graphql_instruments = self
.builtin_instruments
.read()
.graphql_custom_instruments
.clone();
ServiceBuilder::new()
.instrument(move |supergraph_req: &SupergraphRequest| {
span_mode.create_supergraph(
&config_instrument.apollo,
supergraph_req,
field_level_instrumentation_ratio,
)
})
.map_response(move |mut resp: SupergraphResponse| {
let config = config_map_res_first.clone();
if let Some(usage_reporting) = resp
.context
.extensions()
.with_lock(|lock| lock.get::<Arc<UsageReporting>>().cloned())
{
Span::current().record(
APOLLO_PRIVATE_OPERATION_SIGNATURE.as_str(),
usage_reporting.get_stats_report_key().as_str(),
);
}
let expose_trace_id_header =
config.exporters.tracing.response_trace_id.enabled.then(|| {
config
.exporters
.tracing
.response_trace_id
.header_name
.clone()
.unwrap_or_else(|| DEFAULT_EXPOSE_TRACE_ID_HEADER_NAME.clone())
});
let format_id = |trace_id: TraceId| {
let id = match config.exporters.tracing.response_trace_id.format {
TraceIdFormat::Hexadecimal | TraceIdFormat::OpenTelemetry => {
format!("{trace_id:032x}")
}
TraceIdFormat::Decimal => {
format!("{}", u128::from_be_bytes(trace_id.to_bytes()))
}
TraceIdFormat::Datadog => trace_id.to_datadog(),
TraceIdFormat::Uuid => Uuid::from_bytes(trace_id.to_bytes()).to_string(),
};
HeaderValue::from_str(&id).ok()
};
if let (Some(header_name), Some(trace_id)) =
(expose_trace_id_header, trace_id().and_then(format_id))
{
resp.response.headers_mut().append(header_name, trace_id);
}
resp
})
.map_future_with_request_data(
move |req: &SupergraphRequest| {
let custom_attributes = config
.instrumentation
.spans
.supergraph
.attributes
.on_request(req);
Self::populate_context(field_level_instrumentation_ratio, req);
let custom_instruments = config
.instrumentation
.instruments
.new_supergraph_instruments(static_supergraph_instruments.clone());
custom_instruments.on_request(req);
let custom_graphql_instruments: GraphQLInstruments = config
.instrumentation
.instruments
.new_graphql_instruments(static_graphql_instruments.clone());
custom_graphql_instruments.on_request(req);
let mut supergraph_events =
config.instrumentation.events.new_supergraph_events();
supergraph_events.on_request(req);
(
req.context.clone(),
custom_instruments,
custom_attributes,
supergraph_events,
custom_graphql_instruments,
)
},
move |(
ctx,
custom_instruments,
mut custom_attributes,
mut supergraph_events,
custom_graphql_instruments,
): (
Context,
SupergraphInstruments,
Vec<KeyValue>,
SupergraphEvents,
GraphQLInstruments,
),
fut| {
let config = config_map_res.clone();
let sender = metrics_sender.clone();
let enabled_features = enabled_features.clone();
let start = Instant::now();
async move {
let span = Span::current();
let mut result: Result<SupergraphResponse, BoxError> = fut.await;
add_query_attributes(&ctx, &mut custom_attributes);
add_cost_attributes(&ctx, &mut custom_attributes);
span.set_span_dyn_attributes(custom_attributes);
match &result {
Ok(resp) => {
span.set_span_dyn_attributes(
config
.instrumentation
.spans
.supergraph
.attributes
.on_response(resp),
);
custom_instruments.on_response(resp);
supergraph_events.on_response(resp);
custom_graphql_instruments.on_response(resp);
}
Err(err) => {
span.set_span_dyn_attributes(
config
.instrumentation
.spans
.supergraph
.attributes
.on_error(err, &ctx),
);
custom_instruments.on_error(err, &ctx);
supergraph_events.on_error(err, &ctx);
custom_graphql_instruments.on_error(err, &ctx);
}
}
if let Ok(resp) = result {
result = Ok(count_supergraph_errors(resp, &config.apollo.errors).await);
}
result = Self::update_otel_metrics(
config.clone(),
ctx.clone(),
result,
custom_instruments,
supergraph_events,
custom_graphql_instruments,
)
.await;
Self::update_metrics_on_response_events(
&ctx,
config,
field_level_instrumentation_ratio,
sender,
start,
result,
enabled_features,
)
}
},
)
.service(service)
.boxed()
}
fn execution_service(&self, service: execution::BoxService) -> execution::BoxService {
let config = self.config.clone();
let config_map_res_first = config.clone();
ServiceBuilder::new()
.instrument(move |req: &ExecutionRequest| {
let operation_kind = req.query_plan.query.operation.kind();
match operation_kind {
OperationKind::Subscription => info_span!(
EXECUTION_SPAN_NAME,
"otel.kind" = "INTERNAL",
"graphql.operation.type" = operation_kind.as_apollo_operation_type(),
"apollo_private.operation.subtype" =
OperationSubType::SubscriptionRequest.as_str(),
),
_ => info_span!(
EXECUTION_SPAN_NAME,
"otel.kind" = "INTERNAL",
"graphql.operation.type" = operation_kind.as_apollo_operation_type(),
),
}
})
.and_then(move |resp: ExecutionResponse| {
let config = config_map_res_first.clone();
async move {
let resp = count_execution_errors(resp, &config.apollo.errors).await;
Ok::<_, BoxError>(resp)
}
})
.service(service)
.boxed()
}
fn subgraph_service(&self, name: &str, service: subgraph::BoxService) -> subgraph::BoxService {
let config = self.config.clone();
let span_mode = self.config.instrumentation.spans.mode;
let conf = self.config.clone();
let subgraph_name = ByteString::from(name);
let name = name.to_owned();
let static_subgraph_instruments = self
.builtin_instruments
.read()
.subgraph_custom_instruments
.clone();
let static_apollo_subgraph_instruments = self
.builtin_instruments
.read()
.apollo_subgraph_instruments
.clone();
let static_cache_instruments = self
.builtin_instruments
.read()
.cache_custom_instruments
.clone();
ServiceBuilder::new()
.instrument(move |req: &SubgraphRequest| span_mode.create_subgraph(name.as_str(), req))
.map_request(move |req: SubgraphRequest| request_ftv1(req))
.map_response(move |resp| store_ftv1(&subgraph_name, resp))
.map_future_with_request_data(
move |sub_request: &SubgraphRequest| {
let custom_attributes = config
.instrumentation
.spans
.subgraph
.attributes
.on_request(sub_request);
let custom_instruments = config
.instrumentation
.instruments
.new_subgraph_instruments(static_subgraph_instruments.clone());
custom_instruments.on_request(sub_request);
let mut custom_events = config.instrumentation.events.new_subgraph_events();
custom_events.on_request(sub_request);
let apollo_instruments: ApolloSubgraphInstruments = config
.instrumentation
.instruments
.new_apollo_subgraph_instruments(
static_apollo_subgraph_instruments.clone(),
config.apollo.clone(),
);
apollo_instruments.on_request(sub_request);
let custom_cache_instruments: CacheInstruments = config
.instrumentation
.instruments
.new_cache_instruments(static_cache_instruments.clone());
custom_cache_instruments.on_request(sub_request);
(
sub_request.context.clone(),
custom_instruments,
custom_attributes,
custom_events,
apollo_instruments,
custom_cache_instruments,
)
},
move |(
context,
custom_instruments,
custom_attributes,
mut custom_events,
apollo_instruments,
custom_cache_instruments,
): (
Context,
SubgraphInstruments,
Vec<KeyValue>,
SubgraphEvents,
ApolloSubgraphInstruments,
CacheInstruments,
),
f: BoxFuture<'static, Result<SubgraphResponse, BoxError>>| {
let conf = conf.clone();
async move {
let span = Span::current();
span.set_span_dyn_attributes(custom_attributes);
let result: Result<SubgraphResponse, BoxError> = f.await;
match &result {
Ok(resp) => {
if resp.response.status() >= StatusCode::BAD_REQUEST {
span.record(OTEL_STATUS_CODE, OTEL_STATUS_CODE_ERROR);
} else {
span.record(OTEL_STATUS_CODE, OTEL_STATUS_CODE_OK);
}
span.set_span_dyn_attributes(
conf.instrumentation
.spans
.subgraph
.attributes
.on_response(resp),
);
apollo_instruments.on_response(resp);
custom_cache_instruments.on_response(resp);
custom_instruments.on_response(resp);
custom_events.on_response(resp);
}
Err(err) => {
span.record(OTEL_STATUS_CODE, OTEL_STATUS_CODE_ERROR);
span.set_span_dyn_attributes(
conf.instrumentation
.spans
.subgraph
.attributes
.on_error(err, &context),
);
apollo_instruments.on_error(err, &context);
custom_cache_instruments.on_error(err, &context);
custom_instruments.on_error(err, &context);
custom_events.on_error(err, &context);
}
}
if let Ok(resp) = result {
Ok(count_subgraph_errors(resp, &conf.apollo.errors).await)
} else {
result
}
}
},
)
.service(service)
.boxed()
}
fn connector_request_service(
&self,
service: connector::request_service::BoxService,
source_name: String,
) -> connector::request_service::BoxService {
let req_fn_config = self.config.clone();
let res_fn_config = self.config.clone();
let span_mode = self.config.instrumentation.spans.mode;
let static_connector_instruments = self
.builtin_instruments
.read()
.connector_custom_instruments
.clone();
let static_apollo_connector_instruments = self
.builtin_instruments
.read()
.apollo_connector_instruments
.clone();
ServiceBuilder::new()
.instrument(move |_req: &connector::request_service::Request| {
span_mode.create_connector(source_name.as_str())
})
.map_future_with_request_data(
move |request: &connector::request_service::Request| {
let custom_instruments = req_fn_config
.instrumentation
.instruments
.new_connector_instruments(static_connector_instruments.clone());
custom_instruments.on_request(request);
let apollo_instruments = req_fn_config
.instrumentation
.instruments
.new_apollo_connector_instruments(
static_apollo_connector_instruments.clone(),
req_fn_config.apollo.clone(),
);
apollo_instruments.on_request(request);
let mut custom_events =
req_fn_config.instrumentation.events.new_connector_events();
custom_events.on_request(request);
let custom_span_attributes = req_fn_config
.instrumentation
.spans
.connector
.attributes
.on_request(request);
(
request.context.clone(),
custom_instruments,
apollo_instruments,
custom_events,
custom_span_attributes,
)
},
move |(
context,
custom_instruments,
apollo_connector_instruments,
mut custom_events,
custom_span_attributes,
): (
Context,
ConnectorInstruments,
ApolloConnectorInstruments,
ConnectorEvents,
Vec<KeyValue>,
),
f: BoxFuture<
'static,
Result<connector::request_service::Response, BoxError>,
>| {
let conf = res_fn_config.clone();
async move {
let span = Span::current();
span.set_span_dyn_attributes(custom_span_attributes);
let result = f.await;
match &result {
Ok(response) => {
span.set_span_dyn_attributes(
conf.instrumentation
.spans
.connector
.attributes
.on_response(response),
);
custom_instruments.on_response(response);
apollo_connector_instruments.on_response(response);
custom_events.on_response(response);
}
Err(err) => {
span.set_span_dyn_attributes(
conf.instrumentation
.spans
.connector
.attributes
.on_error(err, &context),
);
custom_instruments.on_error(err, &context);
apollo_connector_instruments.on_error(err, &context);
custom_events.on_error(err, &context);
}
}
result
}
},
)
.service(service)
.boxed()
}
fn http_client_service(
&self,
_subgraph_name: &str,
service: crate::services::http::BoxService,
) -> crate::services::http::BoxService {
let req_fn_config = self.config.clone();
let res_fn_config = self.config.clone();
ServiceBuilder::new()
.layer(router_overhead::OverheadLayer::new())
.instrument(move |request: &crate::services::http::HttpRequest| {
let schema_uri = request.http_request.uri();
let host = schema_uri.host().unwrap_or_default();
let port = schema_uri.port_u16().unwrap_or_else(|| {
let scheme = schema_uri.scheme_str();
if scheme == Some("https") {
443
} else if scheme == Some("http") {
80
} else {
0
}
});
let path = schema_uri.path();
::tracing::info_span!(HTTP_REQUEST_SPAN_NAME,
"otel.kind" = "CLIENT",
"net.peer.name" = %host,
"net.peer.port" = %port,
"http.route" = %path,
"http.url" = %schema_uri,
"net.transport" = "ip_tcp",
)
})
.map_future_with_request_data(
move |request: &crate::services::http::HttpRequest| {
let custom_span_attributes = req_fn_config
.instrumentation
.spans
.http_client
.attributes
.on_request(request);
(request.context.clone(), custom_span_attributes)
},
move |(context, custom_span_attributes): (Context, Vec<KeyValue>),
f: BoxFuture<
'static,
Result<crate::services::http::HttpResponse, BoxError>,
>| {
let conf = res_fn_config.clone();
async move {
let span = Span::current();
span.set_span_dyn_attributes(custom_span_attributes);
let result = f.await;
match &result {
Ok(response) => {
span.set_span_dyn_attributes(
conf.instrumentation
.spans
.http_client
.attributes
.on_response(response),
);
}
Err(err) => {
span.set_span_dyn_attributes(
conf.instrumentation
.spans
.http_client
.attributes
.on_error(err, &context),
);
}
}
result
}
},
)
.service(service)
.boxed()
}
fn web_endpoints(&self) -> MultiMap<ListenAddr, Endpoint> {
self.custom_endpoints.clone()
}
fn activate(&self) {
if let Some(activation) = self.activation.lock().take() {
activation.commit();
*self.builtin_instruments.write() =
create_builtin_instruments(&self.config.instrumentation.instruments);
}
}
}
impl Telemetry {
fn filter_variables_values(
variables: &Map<ByteString, Value>,
forward_rules: &ForwardValues,
) -> String {
let nb_var = variables.len();
#[allow(clippy::mutable_key_type)] let variables = variables
.iter()
.map(|(name, value)| {
if match &forward_rules {
ForwardValues::None => false,
ForwardValues::All => true,
ForwardValues::Only(only) => only.contains(&name.as_str().to_string()),
ForwardValues::Except(except) => !except.contains(&name.as_str().to_string()),
} {
(
name,
serde_json::to_string(value).unwrap_or_else(|_| "<unknown>".to_string()),
)
} else {
(name, "".to_string())
}
})
.fold(HashMap::with_capacity(nb_var), |mut acc, (name, value)| {
acc.insert(name, value);
acc
});
match serde_json::to_string(&variables) {
Ok(result) => result,
Err(_err) => {
::tracing::warn!(
"could not serialize variables, trace will not have variables information"
);
Default::default()
}
}
}
async fn update_otel_metrics(
config: Arc<Conf>,
context: Context,
result: Result<SupergraphResponse, BoxError>,
custom_instruments: SupergraphInstruments,
custom_events: SupergraphEvents,
custom_graphql_instruments: GraphQLInstruments,
) -> Result<SupergraphResponse, BoxError> {
let response = result?;
let ctx = context.clone();
let (parts, stream) = response.response.into_parts();
let config_cloned = config.clone();
let stream = stream.inspect(move |resp| {
let span = Span::current();
span.set_span_dyn_attributes(
config_cloned
.instrumentation
.spans
.supergraph
.attributes
.on_response_event(resp, &ctx),
);
custom_instruments.on_response_event(resp, &ctx);
custom_events.on_response_event(resp, &ctx);
custom_graphql_instruments.on_response_event(resp, &ctx);
});
let (first_response, rest) = StreamExt::into_future(stream).await;
let response = http::Response::from_parts(
parts,
once(ready(first_response.unwrap_or_default()))
.chain(rest)
.boxed(),
);
Ok(SupergraphResponse { context, response })
}
fn populate_context(field_level_instrumentation_ratio: f64, req: &SupergraphRequest) {
let context = &req.context;
let mut attributes: HashMap<String, AttributeValue> = HashMap::new();
if let Some(operation_name) = &req.supergraph_request.body().operation_name {
attributes.insert(
OPERATION_NAME.to_string(),
AttributeValue::String(operation_name.clone()),
);
}
if rand::rng().random_bool(field_level_instrumentation_ratio) {
context
.extensions()
.with_lock(|lock| lock.insert(EnableSubgraphFtv1));
}
}
#[allow(clippy::too_many_arguments)]
fn update_metrics_on_response_events(
ctx: &Context,
config: Arc<Conf>,
field_level_instrumentation_ratio: f64,
sender: Sender,
start: Instant,
result: Result<supergraph::Response, BoxError>,
enabled_features: EnabledFeatures,
) -> Result<supergraph::Response, BoxError> {
let operation_kind: OperationKind =
ctx.get(OPERATION_KIND).ok().flatten().unwrap_or_default();
match result {
Err(e) => {
if !matches!(sender, Sender::Noop) {
let operation_subtype = (operation_kind == OperationKind::Subscription)
.then_some(OperationSubType::SubscriptionRequest);
Self::update_apollo_metrics(
ctx,
field_level_instrumentation_ratio,
sender,
true,
start.elapsed(),
operation_kind,
operation_subtype,
Default::default(),
enabled_features.clone(),
);
}
Err(e)
}
Ok(router_response) => {
let http_status_is_success = router_response.response.status().is_success();
if operation_kind == OperationKind::Subscription && !http_status_is_success {
Self::update_apollo_metrics(
ctx,
field_level_instrumentation_ratio,
sender.clone(),
true,
start.elapsed(),
operation_kind,
Some(OperationSubType::SubscriptionRequest),
Default::default(),
enabled_features.clone(),
);
}
Ok(router_response.map(move |response_stream| {
let sender = sender.clone();
let ctx = ctx.clone();
let mut local_stat_recorder = LocalTypeStatRecorder::new();
response_stream
.enumerate()
.map(move |(idx, response)| {
let has_errors = !response.errors.is_empty();
if !matches!(sender, Sender::Noop) {
if let (true, Some(query)) = (
config.apollo.experimental_local_field_metrics,
ctx.executable_document(),
) {
local_stat_recorder.visit(
&query,
&response,
&ctx.get_demand_control_context()
.map(|c| c.variables)
.unwrap_or_default(),
);
}
if operation_kind == OperationKind::Subscription {
if idx == 0 {
if http_status_is_success {
Self::update_apollo_metrics(
&ctx,
field_level_instrumentation_ratio,
sender.clone(),
has_errors,
start.elapsed(),
operation_kind,
Some(OperationSubType::SubscriptionRequest),
local_stat_recorder
.local_type_stats
.drain()
.collect(),
enabled_features.clone(),
);
}
} else {
Self::update_apollo_metrics(
&ctx,
field_level_instrumentation_ratio,
sender.clone(),
has_errors,
response
.created_at
.map(|c| c.elapsed())
.unwrap_or_else(|| start.elapsed()),
operation_kind,
Some(OperationSubType::SubscriptionEvent),
local_stat_recorder.local_type_stats.drain().collect(),
enabled_features.clone(),
);
}
} else {
if !response.has_next.unwrap_or(false) {
Self::update_apollo_metrics(
&ctx,
field_level_instrumentation_ratio,
sender.clone(),
has_errors,
start.elapsed(),
operation_kind,
None,
local_stat_recorder.local_type_stats.drain().collect(),
enabled_features.clone(),
);
}
}
}
response
})
.boxed()
}))
}
}
}
#[allow(clippy::too_many_arguments)]
fn update_apollo_metrics(
context: &Context,
field_level_instrumentation_ratio: f64,
sender: Sender,
has_errors: bool,
duration: Duration,
operation_kind: OperationKind,
operation_subtype: Option<OperationSubType>,
local_per_type_stat: HashMap<String, LocalTypeStat>,
enabled_features: EnabledFeatures,
) {
let metrics = if let Some(usage_reporting) = context
.extensions()
.with_lock(|lock| lock.get::<Arc<UsageReporting>>().cloned())
{
let licensed_operation_count = licensed_operation_count(&usage_reporting);
let persisted_query_hit = context
.get::<_, bool>(PERSISTED_QUERY_CACHE_HIT)
.unwrap_or_default();
if context
.get(STUDIO_EXCLUDE)
.is_ok_and(|x| x.unwrap_or_default())
{
SingleStatsReport {
licensed_operation_count_by_type: (licensed_operation_count > 0).then_some(
LicensedOperationCountByType {
r#type: operation_kind,
subtype: operation_subtype,
licensed_operation_count,
},
),
router_features_enabled: enabled_features.list(),
..Default::default()
}
} else {
let traces = Self::subgraph_ftv1_traces(context);
let per_type_stat = Self::per_type_stat(&traces, field_level_instrumentation_ratio);
let root_error_stats = Self::per_path_error_stats(&traces);
let strategy = context.get_demand_control_context().map(|c| c.strategy);
let limits_stats = context.extensions().with_lock(|guard| {
let query_limits = guard.get::<OperationLimits<u32>>();
SingleLimitsStats {
strategy: strategy.and_then(|s| serde_json::to_string(&s.mode).ok()),
cost_estimated: context.get_estimated_cost().ok().flatten(),
cost_actual: context.get_actual_cost().ok().flatten(),
depth: query_limits.map_or(0, |ql| ql.depth as u64),
height: query_limits.map_or(0, |ql| ql.height as u64),
alias_count: query_limits.map_or(0, |ql| ql.aliases as u64),
root_field_count: query_limits.map_or(0, |ql| ql.root_fields as u64),
}
});
let extended_references = context
.extensions()
.with_lock(|lock| lock.get::<ExtendedReferenceStats>().cloned())
.unwrap_or_default();
let enum_response_references = context
.extensions()
.with_lock(|lock| lock.remove::<ReferencedEnums>())
.unwrap_or_default();
let maybe_pq_id = context
.extensions()
.with_lock(|lock| lock.get::<RequestPersistedQueryId>().cloned())
.map(|u| u.pq_id);
let usage_reporting = if let Some(pq_id) = maybe_pq_id {
Arc::new(usage_reporting.with_pq_id(pq_id))
} else {
usage_reporting
};
SingleStatsReport {
request_id: uuid::Uuid::from_bytes(
Span::current()
.context()
.span()
.span_context()
.trace_id()
.to_bytes(),
),
licensed_operation_count_by_type: (licensed_operation_count > 0).then_some(
LicensedOperationCountByType {
r#type: operation_kind,
subtype: operation_subtype,
licensed_operation_count,
},
),
stats: HashMap::from([(
usage_reporting.get_stats_report_key(),
SingleStats {
stats_with_context: SingleContextualizedStats {
context: StatsContext {
result: "".to_string(),
client_name: context
.get(CLIENT_NAME)
.unwrap_or_default()
.unwrap_or_default(),
client_version: context
.get(CLIENT_VERSION)
.unwrap_or_default()
.unwrap_or_default(),
client_library_name: context
.get(CLIENT_LIBRARY_NAME)
.unwrap_or_default()
.unwrap_or_default(),
client_library_version: context
.get(CLIENT_LIBRARY_VERSION)
.unwrap_or_default()
.unwrap_or_default(),
operation_type: operation_kind
.as_apollo_operation_type()
.to_string(),
operation_subtype: operation_subtype
.map(|op| op.to_string())
.unwrap_or_default(),
},
limits_stats,
query_latency_stats: SingleQueryLatencyStats {
latency: duration,
has_errors,
persisted_query_hit,
root_error_stats,
..Default::default()
},
per_type_stat,
extended_references,
enum_response_references,
local_per_type_stat,
},
referenced_fields_by_type: usage_reporting
.get_referenced_fields()
.into_iter()
.map(|(k, v)| (k, convert(v)))
.collect(),
query_metadata: usage_reporting.get_query_metadata(),
},
)]),
router_features_enabled: enabled_features.list(),
}
}
} else {
SingleStatsReport {
licensed_operation_count_by_type: LicensedOperationCountByType {
r#type: operation_kind,
subtype: operation_subtype,
licensed_operation_count: 1,
}
.into(),
router_features_enabled: enabled_features.list(),
..Default::default()
}
};
sender.send(SingleReport::Stats(metrics));
}
fn subgraph_ftv1_traces(context: &Context) -> Vec<(ByteString, proto::reports::Trace)> {
if let Some(Value::Array(array)) = context.get_json_value(SUBGRAPH_FTV1) {
array
.iter()
.filter_map(|value| match value.as_array()?.as_slice() {
[Value::String(subgraph_name), trace] => {
Some((subgraph_name.clone(), decode_ftv1_trace(trace.as_str()?)?))
}
_ => None,
})
.collect()
} else {
Vec::new()
}
}
fn per_type_stat(
traces: &[(ByteString, proto::reports::Trace)],
field_level_instrumentation_ratio: f64,
) -> HashMap<String, SingleTypeStat> {
fn recur(
per_type: &mut HashMap<String, SingleTypeStat>,
field_execution_weight: f64,
node: &proto::reports::trace::Node,
) {
for child in &node.child {
recur(per_type, field_execution_weight, child)
}
let response_name = if let Some(ResponseName(response_name)) = &node.id {
response_name
} else {
return;
};
let field_name = if node.original_field_name.is_empty() {
response_name
} else {
&node.original_field_name
};
if field_name.is_empty()
|| node.parent_type.is_empty()
|| node.r#type.is_empty()
|| node.start_time == 0
|| node.end_time == 0
{
return;
}
let field_stat = per_type
.entry(node.parent_type.clone())
.or_default()
.per_field_stat
.entry(field_name.clone())
.or_insert_with(|| metrics::apollo::studio::SingleFieldStat {
return_type: node.r#type.clone(), errors_count: 0,
latency: Default::default(),
observed_execution_count: 0,
requests_with_errors_count: 0,
length: ListLengthHistogram::new(None),
});
let latency = Duration::from_nanos(node.end_time.saturating_sub(node.start_time));
field_stat
.latency
.record(Some(latency), field_execution_weight);
field_stat.observed_execution_count += 1;
field_stat.errors_count += node.error.len() as u64;
if !node.error.is_empty() {
field_stat.requests_with_errors_count += 1;
}
}
let field_execution_weight = 1.0 / field_level_instrumentation_ratio;
let mut per_type = HashMap::new();
for (_subgraph_name, trace) in traces {
if let Some(node) = &trace.root {
recur(&mut per_type, field_execution_weight, node)
}
}
per_type
}
fn per_path_error_stats(
traces: &[(ByteString, proto::reports::Trace)],
) -> SinglePathErrorStats {
fn recur<'node>(
stats_root: &mut SinglePathErrorStats,
path: &mut Vec<&'node String>,
node: &'node proto::reports::trace::Node,
) {
if let Some(ResponseName(name)) = &node.id {
path.push(name)
}
if !node.error.is_empty() {
let mut stats = &mut *stats_root;
for &name in &*path {
stats = stats.children.entry(name.clone()).or_default();
}
stats.errors_count += node.error.len() as u64;
stats.requests_with_errors_count += 1;
}
for child in &node.child {
recur(stats_root, path, child)
}
if let Some(ResponseName(_)) = &node.id {
path.pop();
}
}
let mut root = Default::default();
for (subgraph_name, trace) in traces {
if let Some(node) = &trace.root {
let path = format!("service:{}", subgraph_name.as_str());
recur(&mut root, &mut vec![&path], node)
}
}
root
}
fn plugin_metrics(config: &Arc<Conf>) {
let mut attributes = Vec::new();
if MetricsConfigurator::is_enabled(&config.exporters.metrics.otlp) {
attributes.push(KeyValue::new("telemetry.metrics.otlp", true));
}
if config.exporters.metrics.prometheus.enabled {
attributes.push(KeyValue::new("telemetry.metrics.prometheus", true));
}
if TracingConfigurator::is_enabled(&config.exporters.tracing.otlp) {
attributes.push(KeyValue::new("telemetry.tracing.otlp", true));
}
if config.exporters.tracing.datadog.is_enabled() {
attributes.push(KeyValue::new("telemetry.tracing.datadog", true));
}
if config.exporters.tracing.zipkin.is_enabled() {
attributes.push(KeyValue::new("telemetry.tracing.zipkin", true));
}
if !attributes.is_empty() {
u64_counter!(
"apollo.router.operations.telemetry",
"Telemetry exporters enabled",
1,
attributes
);
}
}
fn extract_enabled_features(full_config: &serde_json::Value) -> EnabledFeatures {
EnabledFeatures {
distributed_apq_cache: {
let enabled = full_config["apq"]["enabled"].as_bool().unwrap_or(true);
let redis_cache_config_set =
full_config["apq"]["router"]["cache"]["redis"].is_object();
enabled && redis_cache_config_set
},
entity_cache: full_config["preview_entity_cache"]["enabled"]
.as_bool()
.unwrap_or(false),
response_cache: full_config["response_cache"]["enabled"]
.as_bool()
.unwrap_or(false),
}
}
}
static VALID_CLIENT_LIBRARY_VALUE_REGEX: LazyLock<Regex> =
LazyLock::new(|| Regex::new(r"^[ a-zA-Z0-9.@/_\-]{1,60}$").unwrap());
pub(crate) fn is_valid_client_library_value(value: &str) -> bool {
VALID_CLIENT_LIBRARY_VALUE_REGEX.is_match(value)
}
fn filter_headers(headers: &HeaderMap, forward_rules: &ForwardHeaders) -> String {
if let ForwardHeaders::None = forward_rules {
return String::from("{}");
}
let headers_map = headers
.iter()
.filter(|(name, _value)| {
name != &header::AUTHORIZATION && name != &header::COOKIE && name != &header::SET_COOKIE
})
.filter_map(|(name, value)| {
let send_header = match &forward_rules {
ForwardHeaders::None => false,
ForwardHeaders::All => true,
ForwardHeaders::Only(only) => only.contains(name),
ForwardHeaders::Except(except) => !except.contains(name),
};
send_header.then(|| {
(
name.to_string(),
value.to_str().unwrap_or("<unknown>").to_string(),
)
})
})
.fold(
BTreeMap::new(),
|mut acc: BTreeMap<String, Vec<String>>, (name, value)| {
acc.entry(name).or_default().push(value);
acc
},
);
match serde_json::to_string(&headers_map) {
Ok(result) => result,
Err(_err) => {
::tracing::warn!("could not serialize header, trace will not have header information");
Default::default()
}
}
}
fn licensed_operation_count(usage_reporting: &UsageReporting) -> u64 {
match usage_reporting {
UsageReporting::Error(_) => 0,
_ => 1,
}
}
fn convert(
referenced_fields: crate::apollo_studio_interop::ReferencedFieldsForType,
) -> crate::plugins::telemetry::apollo_exporter::proto::reports::ReferencedFieldsForType {
crate::plugins::telemetry::apollo_exporter::proto::reports::ReferencedFieldsForType {
field_names: referenced_fields.field_names,
is_interface: referenced_fields.is_interface,
}
}
register_private_plugin!("apollo", "telemetry", Telemetry);
fn request_ftv1(mut req: SubgraphRequest) -> SubgraphRequest {
if req
.context
.extensions()
.with_lock(|lock| lock.contains_key::<EnableSubgraphFtv1>())
&& Span::current().context().span().span_context().is_sampled()
{
req.subgraph_request
.headers_mut()
.insert(FTV1_HEADER_NAME.clone(), FTV1_HEADER_VALUE.clone());
}
req
}
fn store_ftv1(subgraph_name: &ByteString, resp: SubgraphResponse) -> SubgraphResponse {
if resp
.context
.extensions()
.with_lock(|lock| lock.contains_key::<EnableSubgraphFtv1>())
&& let Some(serde_json_bytes::Value::String(ftv1)) =
resp.response.body().extensions.get("ftv1")
{
Span::current().record("apollo_private.ftv1", ftv1.as_str());
resp.context
.upsert_json_value(SUBGRAPH_FTV1, move |value: Value| {
let mut vec = match value {
Value::Array(array) => array,
Value::Null => Vec::new(),
_ => panic!("unexpected JSON value kind"),
};
vec.push(json!([subgraph_name, ftv1]));
Value::Array(vec)
})
}
resp
}
#[derive(Debug)]
struct CustomTraceIdPropagator {
header_name: String,
fields: [String; 1],
format: TraceIdFormat,
}
impl CustomTraceIdPropagator {
fn new(header_name: String, format: TraceIdFormat) -> Self {
Self {
fields: [header_name.clone()],
header_name,
format,
}
}
fn extract_span_context(&self, extractor: &dyn Extractor) -> Option<SpanContext> {
let trace_id = extractor.get(&self.header_name)?;
let trace_id = trace_id.replace('-', "");
let trace_id = match opentelemetry::trace::TraceId::from_hex(&trace_id) {
Ok(trace_id) => trace_id,
Err(err) => {
::tracing::error!(trace_id = %trace_id, error = %err, "cannot generate custom trace_id");
return None;
}
};
SpanContext::new(
trace_id,
SpanId::INVALID,
TraceFlags::default().with_sampled(true),
true,
TraceState::default(),
)
.into()
}
}
impl TextMapPropagator for CustomTraceIdPropagator {
fn inject_context(&self, cx: &opentelemetry::Context, injector: &mut dyn Injector) {
let span = cx.span();
let span_context = span.span_context();
if span_context.trace_id() != TraceId::INVALID {
let formatted_trace_id = self.format.format(span_context.trace_id());
injector.set(&self.header_name, formatted_trace_id);
}
}
fn extract_with_context(
&self,
cx: &opentelemetry::Context,
extractor: &dyn Extractor,
) -> opentelemetry::Context {
cx.with_remote_span_context(
self.extract_span_context(extractor)
.unwrap_or_else(SpanContext::empty_context),
)
}
fn fields(&self) -> FieldIter<'_> {
FieldIter::new(self.fields.as_ref())
}
}
pub(crate) fn add_query_attributes(context: &Context, custom_attributes: &mut Vec<KeyValue>) {
context.extensions().with_lock(|c| {
if let Some(limits) = c.get::<OperationLimits<u32>>() {
custom_attributes.push(KeyValue::new(
APOLLO_PRIVATE_QUERY_ALIASES.clone(),
AttributeValue::I64(limits.aliases.into()),
));
custom_attributes.push(KeyValue::new(
APOLLO_PRIVATE_QUERY_DEPTH.clone(),
AttributeValue::I64(limits.depth.into()),
));
custom_attributes.push(KeyValue::new(
APOLLO_PRIVATE_QUERY_HEIGHT.clone(),
AttributeValue::I64(limits.height.into()),
));
custom_attributes.push(KeyValue::new(
APOLLO_PRIVATE_QUERY_ROOT_FIELDS.clone(),
AttributeValue::I64(limits.root_fields.into()),
));
}
});
}
struct EnableSubgraphFtv1;
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use http::HeaderMap;
use http::HeaderName;
use http::HeaderValue;
use http::StatusCode;
use http::header::CONTENT_TYPE;
use insta::assert_snapshot;
use itertools::Itertools;
use opentelemetry::propagation::Injector;
use opentelemetry::propagation::TextMapPropagator;
use opentelemetry::trace::SpanContext;
use opentelemetry::trace::SpanId;
use opentelemetry::trace::TraceContextExt;
use opentelemetry::trace::TraceFlags;
use opentelemetry::trace::TraceId;
use opentelemetry::trace::TraceState;
use serde_json::Value;
use serde_json_bytes::ByteString;
use serde_json_bytes::json;
use tower::Service;
use tower::ServiceExt;
use tower::util::BoxService;
use super::CustomTraceIdPropagator;
use super::EnabledFeatures;
use super::Telemetry;
use super::apollo::ForwardHeaders;
use crate::error::FetchError;
use crate::graphql;
use crate::graphql::Error;
use crate::graphql::IntoGraphQLErrors;
use crate::graphql::Request;
use crate::http_ext;
use crate::json_ext::Object;
use crate::metrics::FutureMetricsExt;
use crate::plugin::DynPlugin;
use crate::plugin::PluginInit;
use crate::plugin::test::MockRouterService;
use crate::plugin::test::MockSubgraphService;
use crate::plugin::test::MockSupergraphService;
use crate::plugins::demand_control::COST_ACTUAL_KEY;
use crate::plugins::demand_control::COST_ESTIMATED_KEY;
use crate::plugins::demand_control::COST_RESULT_KEY;
use crate::plugins::demand_control::COST_STRATEGY_KEY;
use crate::plugins::demand_control::DemandControlError;
use crate::plugins::telemetry::EnableSubgraphFtv1;
use crate::plugins::telemetry::config::TraceIdFormat;
use crate::services::RouterRequest;
use crate::services::RouterResponse;
use crate::services::SubgraphRequest;
use crate::services::SubgraphResponse;
use crate::services::SupergraphRequest;
use crate::services::SupergraphResponse;
use crate::services::router;
macro_rules! assert_prometheus_metrics {
($plugin:expr) => {{
let prometheus_metrics = get_prometheus_metrics($plugin.as_ref()).await;
let regexp = regex::Regex::new(
r#"process_executable_name="(?P<process>[^"]+)",?|service_name="(?P<service>[^"]+)",?"#,
)
.unwrap();
let prometheus_metrics = regexp.replace_all(&prometheus_metrics, "").to_owned();
assert_snapshot!(prometheus_metrics.replace(
&format!(r#"service_version="{}""#, std::env!("CARGO_PKG_VERSION")),
r#"service_version="X""#
));
}};
}
async fn create_plugin_with_config(full_config: &str) -> Box<dyn DynPlugin> {
let full_config = serde_yaml::from_str::<Value>(full_config).expect("yaml must be valid");
let telemetry_config = full_config
.as_object()
.expect("must be an object")
.get("telemetry")
.expect("telemetry must be a root key");
let init = PluginInit::fake_builder()
.config(telemetry_config.clone())
.full_config(full_config)
.build()
.with_deserialized_config()
.expect("unable to deserialize telemetry config");
crate::plugin::plugins()
.find(|factory| factory.name == "apollo.telemetry")
.expect("Plugin not found")
.create_instance(init)
.await
.expect("unable to create telemetry plugin")
}
async fn get_prometheus_metrics(plugin: &dyn DynPlugin) -> String {
let web_endpoint = plugin
.web_endpoints()
.into_iter()
.next()
.unwrap()
.1
.into_iter()
.next()
.unwrap()
.into_router();
let http_req_prom = http::Request::get("http://localhost:9090/metrics")
.body(axum::body::Body::empty())
.unwrap();
let mut resp = web_endpoint.oneshot(http_req_prom).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let body = router::body::into_bytes(resp.body_mut()).await.unwrap();
String::from_utf8_lossy(&body)
.split('\n')
.filter(|l| l.contains("bucket"))
.sorted()
.join("\n")
}
async fn make_supergraph_request(plugin: &dyn DynPlugin) {
let mut mock_service = MockSupergraphService::new();
mock_service
.expect_call()
.times(1)
.returning(move |req: SupergraphRequest| {
Ok(SupergraphResponse::fake_builder()
.context(req.context)
.header("x-custom", "coming_from_header")
.data(json!({"data": {"my_value": 2usize}}))
.build()
.unwrap())
});
let mut supergraph_service = plugin.supergraph_service(BoxService::new(mock_service));
let router_req = SupergraphRequest::fake_builder().header("test", "my_value_set");
let _router_response = supergraph_service
.ready()
.await
.unwrap()
.call(router_req.build().unwrap())
.await
.unwrap()
.next_response()
.await
.unwrap();
}
#[tokio::test(flavor = "multi_thread")]
async fn plugin_registered() {
let full_config = serde_json::json!({
"telemetry": {
"apollo": {
"schema_id": "abc"
},
"exporters": {
"tracing": {},
},
},
});
let telemetry_config = full_config["telemetry"].clone();
crate::plugin::plugins()
.find(|factory| factory.name == "apollo.telemetry")
.expect("Plugin not found")
.create_instance(
PluginInit::fake_builder()
.config(telemetry_config)
.full_config(full_config)
.build(),
)
.with_metrics()
.await
.unwrap();
}
#[tokio::test(flavor = "multi_thread")]
async fn config_serialization() {
create_plugin_with_config(include_str!("testdata/config.router.yaml"))
.with_metrics()
.await;
}
#[tokio::test(flavor = "multi_thread")]
async fn test_enabled_features() {
let plugin = create_plugin_with_config(include_str!(
"testdata/full_config_all_features_enabled.router.yaml"
))
.with_metrics()
.await;
let features = enabled_features(plugin.as_ref());
assert!(
features.distributed_apq_cache,
"Telemetry plugin should consider apq feature enabled when explicitly enabled"
);
assert!(
features.entity_cache,
"Telemetry plugin should consider entity cache feature enabled when explicitly enabled"
);
let plugin = create_plugin_with_config(include_str!(
"testdata/full_config_all_features_enabled_response_cache.router.yaml"
))
.with_metrics()
.await;
let features = enabled_features(plugin.as_ref());
assert!(
features.response_cache,
"Telemetry plugin should consider response cache feature enabled when explicitly enabled"
);
assert!(
features.distributed_apq_cache,
"Telemetry plugin should consider apq feature enabled when explicitly enabled"
);
let plugin = create_plugin_with_config(include_str!(
"testdata/full_config_all_features_explicitly_disabled.router.yaml"
))
.with_metrics()
.await;
let features = enabled_features(plugin.as_ref());
assert!(
!features.distributed_apq_cache,
"Telemetry plugin should consider apq feature disabled when explicitly disabled"
);
assert!(
!features.entity_cache,
"Telemetry plugin should consider entity cache feature disabled when explicitly disabled"
);
assert!(
!features.response_cache,
"Telemetry plugin should consider response cache feature disabled when explicitly disabled"
);
let plugin = create_plugin_with_config(include_str!(
"testdata/full_config_all_features_defaults.router.yaml"
))
.with_metrics()
.await;
let features = enabled_features(plugin.as_ref());
assert!(
!features.distributed_apq_cache,
"Telemetry plugin should consider apq feature disabled when all values are defaulted"
);
assert!(
!features.entity_cache,
"Telemetry plugin should consider entity cache feature disabled when all values are defaulted"
);
assert!(
!features.response_cache,
"Telemetry plugin should consider response cache feature disabled when all values are defaulted"
);
let plugin = create_plugin_with_config(include_str!(
"testdata/full_config_apq_enabled_partial_defaults.router.yaml"
))
.with_metrics()
.await;
let features = enabled_features(plugin.as_ref());
assert!(
features.distributed_apq_cache,
"Telemetry plugin should consider apq feature enabled when top-level enabled flag is defaulted and redis config is defined"
);
let plugin = create_plugin_with_config(include_str!(
"testdata/full_config_apq_disabled_partial_defaults.router.yaml"
))
.with_metrics()
.await;
let features = enabled_features(plugin.as_ref());
assert!(
!features.distributed_apq_cache,
"Telemetry plugin should consider apq feature disabled when redis cache is not enabled"
);
}
fn enabled_features(plugin: &dyn DynPlugin) -> &EnabledFeatures {
&plugin
.as_any()
.downcast_ref::<Telemetry>()
.expect("telemetry plugin")
.enabled_features
}
#[tokio::test(flavor = "multi_thread")]
async fn test_supergraph_metrics_ok() {
async {
let plugin =
create_plugin_with_config(include_str!("testdata/custom_attributes.router.yaml"))
.await;
make_supergraph_request(plugin.as_ref()).await;
assert_counter!(
"http.request",
1.0,
"another_test" = "my_default_value",
"my_value" = 2,
"myname" = "label_value",
"renamed_value" = "my_value_set",
"x-custom" = "coming_from_header"
);
}
.with_metrics()
.await;
}
#[tokio::test(flavor = "multi_thread")]
async fn test_supergraph_metrics_bad_request() {
async {
let plugin =
create_plugin_with_config(include_str!("testdata/custom_attributes.router.yaml"))
.await;
let mut mock_bad_request_service = MockSupergraphService::new();
mock_bad_request_service.expect_call().times(1).returning(
move |req: SupergraphRequest| {
Ok(SupergraphResponse::fake_builder()
.context(req.context)
.status_code(StatusCode::BAD_REQUEST)
.errors(vec![
crate::graphql::Error::builder()
.message("nope")
.extension_code("NOPE")
.build(),
])
.build()
.unwrap())
},
);
let mut bad_request_supergraph_service =
plugin.supergraph_service(BoxService::new(mock_bad_request_service));
let router_req = SupergraphRequest::fake_builder().header("test", "my_value_set");
let _router_response = bad_request_supergraph_service
.ready()
.await
.unwrap()
.call(router_req.build().unwrap())
.await
.unwrap()
.next_response()
.await
.unwrap();
assert_counter!(
"http.request",
1.0,
"another_test" = "my_default_value",
"error" = "nope",
"myname" = "label_value",
"renamed_value" = "my_value_set"
);
}
.with_metrics()
.await;
}
#[tokio::test(flavor = "multi_thread")]
async fn test_custom_router_instruments() {
async {
let plugin =
create_plugin_with_config(include_str!("testdata/custom_instruments.router.yaml"))
.await;
let mut mock_bad_request_service = MockRouterService::new();
mock_bad_request_service
.expect_call()
.times(2)
.returning(move |req: RouterRequest| {
Ok(RouterResponse::fake_builder()
.context(req.context)
.status_code(StatusCode::BAD_REQUEST)
.header("content-type", "application/json")
.data(json!({"errors": [{"message": "nope"}]}))
.build()
.unwrap())
});
let mut bad_request_router_service =
plugin.router_service(BoxService::new(mock_bad_request_service));
let router_req = RouterRequest::fake_builder()
.header("x-custom", "TEST")
.header("conditional-custom", "X")
.header("custom-length", "55")
.header("content-length", "55")
.header("content-type", "application/graphql");
let _router_response = bad_request_router_service
.ready()
.await
.unwrap()
.call(router_req.build().unwrap())
.await
.unwrap()
.next_response()
.await
.unwrap();
assert_counter!("acme.graphql.custom_req", 1.0);
assert_histogram_sum!(
"http.server.request.body.size",
55.0,
"http.response.status_code" = 400,
"acme.my_attribute" = "application/json"
);
assert_histogram_sum!("acme.request.length", 55.0);
let router_req = RouterRequest::fake_builder()
.header("x-custom", "TEST")
.header("custom-length", "5")
.header("content-length", "5")
.header("content-type", "application/graphql");
let _router_response = bad_request_router_service
.ready()
.await
.unwrap()
.call(router_req.build().unwrap())
.await
.unwrap()
.next_response()
.await
.unwrap();
assert_counter!("acme.graphql.custom_req", 1.0);
assert_histogram_sum!("acme.request.length", 60.0);
assert_histogram_sum!(
"http.server.request.body.size",
60.0,
"http.response.status_code" = 400,
"acme.my_attribute" = "application/json"
);
}
.with_metrics()
.await;
}
#[tokio::test(flavor = "multi_thread")]
async fn test_custom_router_instruments_with_requirement_level() {
async {
let plugin = create_plugin_with_config(include_str!(
"testdata/custom_instruments_level.router.yaml"
))
.await;
let mut mock_bad_request_service = MockRouterService::new();
mock_bad_request_service
.expect_call()
.times(2)
.returning(move |req: RouterRequest| {
Ok(RouterResponse::fake_builder()
.context(req.context)
.status_code(StatusCode::BAD_REQUEST)
.header("content-type", "application/json")
.data(json!({"errors": [{"message": "nope"}]}))
.build()
.unwrap())
});
let mut bad_request_router_service =
plugin.router_service(BoxService::new(mock_bad_request_service));
let router_req = RouterRequest::fake_builder()
.header("x-custom", "TEST")
.header("conditional-custom", "X")
.header("custom-length", "55")
.header("content-length", "55")
.header("content-type", "application/graphql");
let _router_response = bad_request_router_service
.ready()
.await
.unwrap()
.call(router_req.build().unwrap())
.await
.unwrap()
.next_response()
.await
.unwrap();
assert_counter!("acme.graphql.custom_req", 1.0);
assert_histogram_sum!(
"http.server.request.body.size",
55.0,
"acme.my_attribute" = "application/json",
"error.type" = "Bad Request",
"http.response.status_code" = 400,
"network.protocol.version" = "HTTP/1.1"
);
assert_histogram_exists!(
"http.server.request.duration",
f64,
"error.type" = "Bad Request",
"http.response.status_code" = 400,
"network.protocol.version" = "HTTP/1.1",
"http.request.method" = "GET"
);
assert_histogram_sum!("acme.request.length", 55.0);
let router_req = RouterRequest::fake_builder()
.header("x-custom", "TEST")
.header("custom-length", "5")
.header("content-length", "5")
.header("content-type", "application/graphql");
let _router_response = bad_request_router_service
.ready()
.await
.unwrap()
.call(router_req.build().unwrap())
.await
.unwrap()
.next_response()
.await
.unwrap();
assert_counter!("acme.graphql.custom_req", 1.0);
assert_histogram_sum!("acme.request.length", 60.0);
assert_histogram_sum!(
"http.server.request.body.size",
60.0,
"http.response.status_code" = 400,
"acme.my_attribute" = "application/json",
"error.type" = "Bad Request",
"network.protocol.version" = "HTTP/1.1"
);
}
.with_metrics()
.await;
}
#[tokio::test(flavor = "multi_thread")]
async fn test_custom_supergraph_instruments() {
async {
let plugin =
create_plugin_with_config(include_str!("testdata/custom_instruments.router.yaml"))
.await;
let mut mock_bad_request_service = MockSupergraphService::new();
mock_bad_request_service.expect_call().times(3).returning(
move |req: SupergraphRequest| {
Ok(SupergraphResponse::fake_builder()
.context(req.context)
.status_code(StatusCode::BAD_REQUEST)
.header("content-type", "application/json")
.data(json!({"errors": [{"message": "nope"}]}))
.build()
.unwrap())
},
);
let mut bad_request_supergraph_service =
plugin.supergraph_service(BoxService::new(mock_bad_request_service));
let supergraph_req = SupergraphRequest::fake_builder()
.header("x-custom", "TEST")
.header("conditional-custom", "X")
.header("custom-length", "55")
.header("content-length", "55")
.header("content-type", "application/graphql")
.query("Query test { me {name} }")
.operation_name("test".to_string());
let _router_response = bad_request_supergraph_service
.ready()
.await
.unwrap()
.call(supergraph_req.build().unwrap())
.await
.unwrap()
.next_response()
.await
.unwrap();
assert_counter!(
"acme.graphql.requests",
1.0,
"acme.my_attribute" = "application/json",
"graphql_query" = "Query test { me {name} }",
"graphql.document" = "Query test { me {name} }"
);
let supergraph_req = SupergraphRequest::fake_builder()
.header("x-custom", "TEST")
.header("custom-length", "5")
.header("content-length", "5")
.header("content-type", "application/graphql")
.query("Query test { me {name} }")
.operation_name("test".to_string());
let _router_response = bad_request_supergraph_service
.ready()
.await
.unwrap()
.call(supergraph_req.build().unwrap())
.await
.unwrap()
.next_response()
.await
.unwrap();
assert_counter!(
"acme.graphql.requests",
2.0,
"acme.my_attribute" = "application/json",
"graphql_query" = "Query test { me {name} }",
"graphql.document" = "Query test { me {name} }"
);
let supergraph_req = SupergraphRequest::fake_builder()
.header("custom-length", "5")
.header("content-length", "5")
.header("content-type", "application/graphql")
.query("Query test { me {name} }")
.operation_name("test".to_string());
let _router_response = bad_request_supergraph_service
.ready()
.await
.unwrap()
.call(supergraph_req.build().unwrap())
.await
.unwrap()
.next_response()
.await
.unwrap();
assert_counter!(
"acme.graphql.requests",
2.0,
"acme.my_attribute" = "application/json",
"graphql_query" = "Query test { me {name} }",
"graphql.document" = "Query test { me {name} }"
);
}
.with_metrics()
.await;
}
#[tokio::test(flavor = "multi_thread")]
async fn test_custom_subgraph_instruments_level() {
async {
let plugin = create_plugin_with_config(include_str!(
"testdata/custom_instruments_level.router.yaml"
))
.await;
let mut mock_bad_request_service = MockSubgraphService::new();
mock_bad_request_service.expect_call().times(2).returning(
move |req: SubgraphRequest| {
let mut headers = HeaderMap::new();
headers.insert(CONTENT_TYPE, "application/json".parse().unwrap());
let errors = vec![
graphql::Error::builder()
.message("nope".to_string())
.extension_code("NOPE")
.build(),
graphql::Error::builder()
.message("nok".to_string())
.extension_code("NOK")
.build(),
];
Ok(SubgraphResponse::fake_builder()
.context(req.context)
.status_code(StatusCode::BAD_REQUEST)
.headers(headers)
.errors(errors)
.build())
},
);
let mut bad_request_subgraph_service =
plugin.subgraph_service("test", BoxService::new(mock_bad_request_service));
let sub_req = http::Request::builder()
.method("POST")
.uri("http://test")
.header("x-custom", "TEST")
.header("conditional-custom", "X")
.header("custom-length", "55")
.header("content-length", "55")
.header("content-type", "application/graphql")
.body(graphql::Request::builder().query("{ me {name} }").build())
.unwrap();
let subgraph_req = SubgraphRequest::fake_builder()
.subgraph_request(sub_req)
.subgraph_name("test".to_string())
.build();
let _router_response = bad_request_subgraph_service
.ready()
.await
.unwrap()
.call(subgraph_req)
.await
.unwrap();
assert_counter!(
"acme.subgraph.error_reqs",
1.0,
graphql_error = opentelemetry::Value::Array(opentelemetry::Array::String(vec![
"nope".into(),
"nok".into()
])),
subgraph.name = "test"
);
let sub_req = http::Request::builder()
.method("POST")
.uri("http://test")
.header("x-custom", "TEST")
.header("conditional-custom", "X")
.header("custom-length", "55")
.header("content-length", "55")
.header("content-type", "application/graphql")
.body(graphql::Request::builder().query("{ me {name} }").build())
.unwrap();
let subgraph_req = SubgraphRequest::fake_builder()
.subgraph_request(sub_req)
.subgraph_name("test".to_string())
.build();
let _router_response = bad_request_subgraph_service
.ready()
.await
.unwrap()
.call(subgraph_req)
.await
.unwrap();
assert_counter!(
"acme.subgraph.error_reqs",
2.0,
graphql_error = opentelemetry::Value::Array(opentelemetry::Array::String(vec![
"nope".into(),
"nok".into()
])),
subgraph.name = "test"
);
assert_histogram_not_exists!("http.client.request.duration", f64);
}
.with_metrics()
.await;
}
#[tokio::test(flavor = "multi_thread")]
async fn test_custom_subgraph_instruments() {
async {
let plugin = Box::new(
create_plugin_with_config(include_str!("testdata/custom_instruments.router.yaml"))
.await,
);
let mut mock_bad_request_service = MockSubgraphService::new();
mock_bad_request_service.expect_call().times(2).returning(
move |req: SubgraphRequest| {
let mut headers = HeaderMap::new();
headers.insert(CONTENT_TYPE, "application/json".parse().unwrap());
let errors = vec![
graphql::Error::builder()
.message("nope".to_string())
.extension_code("NOPE")
.build(),
graphql::Error::builder()
.message("nok".to_string())
.extension_code("NOK")
.build(),
];
Ok(SubgraphResponse::fake_builder()
.context(req.context)
.status_code(StatusCode::BAD_REQUEST)
.headers(headers)
.errors(errors)
.build())
},
);
let mut bad_request_subgraph_service =
plugin.subgraph_service("test", BoxService::new(mock_bad_request_service));
let sub_req = http::Request::builder()
.method("POST")
.uri("http://test")
.header("x-custom", "TEST")
.header("conditional-custom", "X")
.header("custom-length", "55")
.header("content-length", "55")
.header("content-type", "application/graphql")
.body(graphql::Request::builder().query("{ me {name} }").build())
.unwrap();
let subgraph_req = SubgraphRequest::fake_builder()
.subgraph_request(sub_req)
.subgraph_name("test".to_string())
.build();
let _router_response = bad_request_subgraph_service
.ready()
.await
.unwrap()
.call(subgraph_req)
.await
.unwrap();
assert_counter!(
"acme.subgraph.error_reqs",
1.0,
graphql_error = opentelemetry::Value::Array(opentelemetry::Array::String(vec![
"nope".into(),
"nok".into()
])),
subgraph.name = "test"
);
let sub_req = http::Request::builder()
.method("POST")
.uri("http://test")
.header("x-custom", "TEST")
.header("conditional-custom", "X")
.header("custom-length", "55")
.header("content-length", "55")
.header("content-type", "application/graphql")
.body(graphql::Request::builder().query("{ me {name} }").build())
.unwrap();
let subgraph_req = SubgraphRequest::fake_builder()
.subgraph_request(sub_req)
.subgraph_name("test".to_string())
.build();
let _router_response = bad_request_subgraph_service
.ready()
.await
.unwrap()
.call(subgraph_req)
.await
.unwrap();
assert_counter!(
"acme.subgraph.error_reqs",
2.0,
graphql_error = opentelemetry::Value::Array(opentelemetry::Array::String(vec![
"nope".into(),
"nok".into()
])),
subgraph.name = "test"
);
}
.with_metrics()
.await;
}
#[tokio::test(flavor = "multi_thread")]
async fn test_field_instrumentation_sampler_with_preview_datadog_agent_sampling() {
let plugin = create_plugin_with_config(include_str!(
"testdata/config.field_instrumentation_sampler.router.yaml"
))
.with_metrics()
.await;
let ftv1_counter = Arc::new(AtomicUsize::new(0));
let ftv1_counter_cloned = ftv1_counter.clone();
let mut mock_request_service = MockSupergraphService::new();
mock_request_service
.expect_call()
.times(10)
.returning(move |req: SupergraphRequest| {
if req
.context
.extensions()
.with_lock(|lock| lock.contains_key::<EnableSubgraphFtv1>())
{
ftv1_counter_cloned.fetch_add(1, Ordering::Relaxed);
}
Ok(SupergraphResponse::fake_builder()
.context(req.context)
.status_code(StatusCode::OK)
.header("content-type", "application/json")
.data(json!({"errors": [{"message": "nope"}]}))
.build()
.unwrap())
});
let mut request_supergraph_service =
plugin.supergraph_service(BoxService::new(mock_request_service));
for _ in 0..10 {
let supergraph_req = SupergraphRequest::fake_builder()
.header("x-custom", "TEST")
.header("conditional-custom", "X")
.header("custom-length", "55")
.header("content-length", "55")
.header("content-type", "application/graphql")
.query("Query test { me {name} }")
.operation_name("test".to_string());
let _router_response = request_supergraph_service
.ready()
.await
.unwrap()
.call(supergraph_req.build().unwrap())
.await
.unwrap()
.next_response()
.await
.unwrap();
}
assert_eq!(ftv1_counter.load(Ordering::Relaxed), 10);
}
#[tokio::test(flavor = "multi_thread")]
async fn test_subgraph_metrics_ok() {
async {
let plugin =
create_plugin_with_config(include_str!("testdata/custom_attributes.router.yaml"))
.await;
let mut mock_subgraph_service = MockSubgraphService::new();
mock_subgraph_service
.expect_call()
.times(1)
.returning(move |req: SubgraphRequest| {
let mut extension = Object::new();
extension.insert(
serde_json_bytes::ByteString::from("status"),
serde_json_bytes::Value::String(ByteString::from(
"custom_error_for_propagation",
)),
);
let _ = req
.context
.insert("my_key", "my_custom_attribute_from_context".to_string())
.unwrap();
Ok(SubgraphResponse::fake_builder()
.context(req.context)
.error(
Error::builder()
.message(String::from("an error occured"))
.extensions(extension)
.extension_code("FETCH_ERROR")
.build(),
)
.build())
});
let mut subgraph_service =
plugin.subgraph_service("my_subgraph_name", BoxService::new(mock_subgraph_service));
let subgraph_req = SubgraphRequest::fake_builder()
.subgraph_request(
http_ext::Request::fake_builder()
.header("test", "my_value_set")
.body(
Request::fake_builder()
.query(String::from("query { test }"))
.build(),
)
.build()
.unwrap(),
)
.subgraph_name("my_subgraph_name")
.build();
let _subgraph_response = subgraph_service
.ready()
.await
.unwrap()
.call(subgraph_req)
.await
.unwrap();
assert_histogram_count!(
"http.client.request.duration",
1,
"error" = "custom_error_for_propagation",
"my_key" = "my_custom_attribute_from_context",
"query_from_request" = "query { test }",
"status" = 200,
"subgraph" = "my_subgraph_name",
"subgraph_error_extended_code" = "FETCH_ERROR"
);
}
.with_metrics()
.await;
}
#[tokio::test(flavor = "multi_thread")]
async fn test_subgraph_metrics_http_error() {
async {
let plugin =
create_plugin_with_config(include_str!("testdata/custom_attributes.router.yaml"))
.await;
let mut mock_subgraph_service_in_error = MockSubgraphService::new();
mock_subgraph_service_in_error
.expect_call()
.times(1)
.returning(move |_req: SubgraphRequest| {
Err(Box::new(FetchError::SubrequestHttpError {
status_code: None,
service: String::from("my_subgraph_name_error"),
reason: String::from("cannot contact the subgraph"),
}))
});
let mut subgraph_service = plugin.subgraph_service(
"my_subgraph_name_error",
BoxService::new(mock_subgraph_service_in_error),
);
let subgraph_req = SubgraphRequest::fake_builder()
.subgraph_request(
http_ext::Request::fake_builder()
.header("test", "my_value_set")
.body(
Request::fake_builder()
.query(String::from("query { test }"))
.build(),
)
.build()
.unwrap(),
)
.subgraph_name("my_subgraph_name_error")
.build();
let _subgraph_response = subgraph_service
.ready()
.await
.unwrap()
.call(subgraph_req)
.await
.expect_err("should be an error");
assert_histogram_count!(
"http.client.request.duration",
1,
"message" =
"HTTP fetch failed from 'my_subgraph_name_error': cannot contact the subgraph",
"subgraph" = "my_subgraph_name_error",
"query_from_request" = "query { test }"
);
}
.with_metrics()
.await;
}
#[tokio::test(flavor = "multi_thread")]
async fn it_test_prometheus_wrong_endpoint() {
async {
let plugin =
create_plugin_with_config(include_str!("testdata/prometheus.router.yaml")).await;
let mut web_endpoint = plugin
.web_endpoints()
.into_iter()
.next()
.unwrap()
.1
.into_iter()
.next()
.unwrap()
.into_router();
let http_req_prom = http::Request::get("http://localhost:9090/WRONG/URL/metrics")
.body(crate::services::router::body::empty())
.unwrap();
let resp = <axum::Router as tower::ServiceExt<http::Request<axum::body::Body>>>::ready(
&mut web_endpoint,
)
.await
.unwrap()
.call(http_req_prom)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
}
.with_metrics()
.await;
}
#[tokio::test(flavor = "multi_thread")]
async fn it_test_prometheus_metrics() {
async {
let plugin =
create_plugin_with_config(include_str!("testdata/prometheus.router.yaml")).await;
plugin.activate();
u64_histogram!("apollo.test.histo", "it's a test", 1u64);
make_supergraph_request(plugin.as_ref()).await;
assert_prometheus_metrics!(plugin);
}
.with_metrics()
.await;
}
#[tokio::test(flavor = "multi_thread")]
async fn it_test_prometheus_metrics_custom_buckets() {
async {
let plugin = create_plugin_with_config(include_str!(
"testdata/prometheus_custom_buckets.router.yaml"
))
.await;
plugin.activate();
u64_histogram!("apollo.test.histo", "it's a test", 1u64);
make_supergraph_request(plugin.as_ref()).await;
assert_prometheus_metrics!(plugin);
}
.with_metrics()
.await;
}
#[tokio::test(flavor = "multi_thread")]
async fn it_test_prometheus_metrics_custom_buckets_for_specific_metrics() {
async {
let plugin = create_plugin_with_config(include_str!(
"testdata/prometheus_custom_buckets_specific_metrics.router.yaml"
))
.await;
plugin.activate();
make_supergraph_request(plugin.as_ref()).await;
u64_histogram!("apollo.test.histo", "it's a test", 1u64);
assert_prometheus_metrics!(plugin);
}
.with_metrics()
.await;
}
#[tokio::test(flavor = "multi_thread")]
async fn it_test_prometheus_metrics_custom_view_drop() {
async {
let plugin = create_plugin_with_config(include_str!(
"testdata/prometheus_custom_view_drop.router.yaml"
))
.await;
make_supergraph_request(plugin.as_ref()).await;
assert_prometheus_metrics!(plugin);
}
.with_metrics()
.await;
}
#[tokio::test(flavor = "multi_thread")]
async fn it_test_prometheus_metrics_units_are_included() {
async {
let plugin =
create_plugin_with_config(include_str!("testdata/prometheus.router.yaml")).await;
plugin.activate();
u64_histogram_with_unit!("apollo.test.histo1", "no unit", "{request}", 1u64);
f64_histogram_with_unit!("apollo.test.histo2", "unit", "s", 1f64);
make_supergraph_request(plugin.as_ref()).await;
assert_prometheus_metrics!(plugin);
}
.with_metrics()
.await;
}
#[test]
fn it_test_send_headers_to_studio() {
let fw_headers = ForwardHeaders::Only(vec![
HeaderName::from_static("test"),
HeaderName::from_static("apollo-x-name"),
]);
let mut headers = HeaderMap::new();
headers.insert(
HeaderName::from_static("authorization"),
HeaderValue::from_static("xxx"),
);
headers.insert(
HeaderName::from_static("test"),
HeaderValue::from_static("content"),
);
headers.insert(
HeaderName::from_static("referer"),
HeaderValue::from_static("test"),
);
headers.insert(
HeaderName::from_static("foo"),
HeaderValue::from_static("bar"),
);
headers.insert(
HeaderName::from_static("apollo-x-name"),
HeaderValue::from_static("polaris"),
);
let filtered_headers = super::filter_headers(&headers, &fw_headers);
assert_eq!(
filtered_headers.as_str(),
r#"{"apollo-x-name":["polaris"],"test":["content"]}"#
);
let filtered_headers = super::filter_headers(&headers, &ForwardHeaders::None);
assert_eq!(filtered_headers.as_str(), "{}");
}
#[tokio::test]
async fn test_custom_trace_id_propagator_strip_dashes_in_trace_id() {
let header = String::from("x-trace-id");
let trace_id = String::from("04f9e396-465c-4840-bc2b-f493b8b1a7fc");
let expected_trace_id = String::from("04f9e396465c4840bc2bf493b8b1a7fc");
let propagator = CustomTraceIdPropagator::new(header.clone(), TraceIdFormat::Uuid);
let mut headers: HashMap<String, String> = HashMap::new();
headers.insert(header, trace_id);
let span = propagator.extract_span_context(&headers);
assert!(span.is_some());
assert_eq!(span.unwrap().trace_id().to_string(), expected_trace_id);
}
#[test]
fn test_custom_trace_id_propagator_invalid_hex_characters() {
use crate::test_harness::tracing_test;
let _guard = tracing_test::dispatcher_guard();
let header = String::from("x-trace-id");
let invalid_trace_id = String::from("invalidhexchars");
let propagator = CustomTraceIdPropagator::new(header.clone(), TraceIdFormat::Uuid);
let mut headers: HashMap<String, String> = HashMap::new();
headers.insert(header, invalid_trace_id.clone());
let span = propagator.extract_span_context(&headers);
assert!(span.is_none());
assert!(tracing_test::logs_contain(
"cannot generate custom trace_id"
));
assert!(tracing_test::logs_contain(&invalid_trace_id));
}
#[test]
fn test_header_propagation_format() {
struct Injected(HashMap<String, String>);
impl Injector for Injected {
fn set(&mut self, key: &str, value: String) {
self.0.insert(key.to_string(), value);
}
}
let mut injected = Injected(HashMap::new());
let _ctx = opentelemetry::Context::new()
.with_remote_span_context(SpanContext::new(
TraceId::from(0x04f9e396465c4840bc2bf493b8b1a7fc),
SpanId::INVALID,
TraceFlags::default(),
false,
TraceState::default(),
))
.attach();
let propagator = CustomTraceIdPropagator::new("my_header".to_string(), TraceIdFormat::Uuid);
propagator.inject_context(&opentelemetry::Context::current(), &mut injected);
assert_eq!(
injected.0.get("my_header").unwrap(),
"04f9e396-465c-4840-bc2b-f493b8b1a7fc"
);
}
#[derive(Clone)]
struct CostContext {
pub(crate) estimated: f64,
pub(crate) actual: f64,
pub(crate) result: &'static str,
pub(crate) strategy: &'static str,
}
async fn make_failed_demand_control_request(plugin: &dyn DynPlugin, cost_details: CostContext) {
let mut mock_service = MockSupergraphService::new();
mock_service
.expect_call()
.times(1)
.returning(move |req: SupergraphRequest| {
req.context.extensions().with_lock(|lock| {
lock.insert(cost_details.clone());
});
req.context
.insert(COST_ESTIMATED_KEY, cost_details.estimated)
.unwrap();
req.context
.insert(COST_ACTUAL_KEY, cost_details.actual)
.unwrap();
req.context
.insert(COST_RESULT_KEY, cost_details.result.to_string())
.unwrap();
req.context
.insert(COST_STRATEGY_KEY, cost_details.strategy.to_string())
.unwrap();
let errors = if cost_details.result == "COST_ESTIMATED_TOO_EXPENSIVE" {
DemandControlError::EstimatedCostTooExpensive {
estimated_cost: cost_details.estimated,
max_cost: (cost_details.estimated - 5.0).max(0.0),
}
.into_graphql_errors()
.unwrap()
} else if cost_details.result == "COST_ACTUAL_TOO_EXPENSIVE" {
DemandControlError::ActualCostTooExpensive {
actual_cost: cost_details.actual,
max_cost: (cost_details.actual - 5.0).max(0.0),
}
.into_graphql_errors()
.unwrap()
} else {
Vec::new()
};
SupergraphResponse::fake_builder()
.context(req.context)
.data(
serde_json::to_value(graphql::Response::builder().errors(errors).build())
.unwrap(),
)
.build()
});
let mut service = plugin.supergraph_service(BoxService::new(mock_service));
let router_req = SupergraphRequest::fake_builder().build().unwrap();
let _router_response = service
.ready()
.await
.unwrap()
.call(router_req)
.await
.unwrap()
.next_response()
.await
.unwrap();
}
#[tokio::test(flavor = "multi_thread")]
async fn test_demand_control_delta_filter() {
async {
let plugin = create_plugin_with_config(include_str!(
"testdata/demand_control_delta_filter.router.yaml"
))
.await;
make_failed_demand_control_request(
plugin.as_ref(),
CostContext {
estimated: 10.0,
actual: 8.0,
result: "COST_ACTUAL_TOO_EXPENSIVE",
strategy: "static_estimated",
},
)
.await;
assert_histogram_sum!("cost.rejected.operations", 8.0);
}
.with_metrics()
.await;
}
#[tokio::test(flavor = "multi_thread")]
async fn test_demand_control_result_filter() {
async {
let plugin = create_plugin_with_config(include_str!(
"testdata/demand_control_result_filter.router.yaml"
))
.await;
make_failed_demand_control_request(
plugin.as_ref(),
CostContext {
estimated: 10.0,
actual: 0.0,
result: "COST_ESTIMATED_TOO_EXPENSIVE",
strategy: "static_estimated",
},
)
.await;
assert_histogram_sum!("cost.rejected.operations", 10.0);
}
.with_metrics()
.await;
}
#[tokio::test(flavor = "multi_thread")]
async fn test_demand_control_result_attributes() {
async {
let plugin = create_plugin_with_config(include_str!(
"testdata/demand_control_result_attribute.router.yaml"
))
.await;
make_failed_demand_control_request(
plugin.as_ref(),
CostContext {
estimated: 10.0,
actual: 0.0,
result: "COST_ESTIMATED_TOO_EXPENSIVE",
strategy: "static_estimated",
},
)
.await;
assert_histogram_sum!(
"cost.estimated",
10.0,
"cost.result" = "COST_ESTIMATED_TOO_EXPENSIVE"
);
}
.with_metrics()
.await;
}
}