use std::collections::HashMap;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::time::Duration;
use opentelemetry::KeyValue;
use opentelemetry_otlp::MetricExporter;
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_otlp::WithHttpConfig;
use opentelemetry_otlp::WithTonicConfig;
use opentelemetry_sdk::Resource;
use opentelemetry_sdk::metrics::Aggregation;
use opentelemetry_sdk::metrics::Instrument;
use opentelemetry_sdk::metrics::InstrumentKind;
use opentelemetry_sdk::metrics::Stream;
use opentelemetry_sdk::metrics::Temporality;
use opentelemetry_sdk::metrics::periodic_reader_with_async_runtime::PeriodicReader;
use opentelemetry_sdk::runtime;
use sys_info::hostname;
use tonic::metadata::MetadataMap;
use tonic::transport::ClientTlsConfig;
use tower::BoxError;
use url::Url;
use crate::metrics::aggregation::MeterProviderType;
use crate::plugins::telemetry::apollo::ApolloUsageReportsBatchProcessorConfiguration;
use crate::plugins::telemetry::apollo::Config;
use crate::plugins::telemetry::apollo::OtlpMetricsBatchProcessorConfiguration;
use crate::plugins::telemetry::apollo::router_id;
use crate::plugins::telemetry::apollo_exporter::ApolloExporter;
use crate::plugins::telemetry::apollo_exporter::get_uname;
use crate::plugins::telemetry::config::ApolloMetricsReferenceMode;
use crate::plugins::telemetry::config::Conf;
use crate::plugins::telemetry::metrics::NamedMetricExporter;
use crate::plugins::telemetry::metrics::OverflowMetricExporter;
use crate::plugins::telemetry::metrics::RetryMetricExporter;
use crate::plugins::telemetry::otlp::Protocol;
use crate::plugins::telemetry::otlp::TelemetryDataKind;
use crate::plugins::telemetry::otlp::process_endpoint;
use crate::plugins::telemetry::reload::metrics::MetricsBuilder;
use crate::plugins::telemetry::reload::metrics::MetricsConfigurator;
pub(crate) mod histogram;
pub(crate) mod studio;
fn default_buckets() -> Vec<f64> {
vec![
0.001, 0.005, 0.015, 0.05, 0.1, 0.2, 0.3, 0.4, 0.5, 1.0, 5.0, 10.0,
]
}
fn realtime_buckets() -> Vec<f64> {
prometheus::exponential_buckets(0.001399084909, 1.1, 129)
.expect("failed to generate exponential buckets")
}
impl MetricsConfigurator for Config {
fn config(conf: &Conf) -> &Self {
&conf.apollo
}
fn is_enabled(&self) -> bool {
self.apollo_key.is_some() && self.apollo_graph_ref.is_some()
}
fn configure(&self, builder: &mut MetricsBuilder) -> Result<(), BoxError> {
tracing::debug!("configuring Apollo metrics");
static ENABLED: AtomicBool = AtomicBool::new(false);
if let Config {
endpoint,
experimental_otlp_endpoint: otlp_endpoint,
experimental_otlp_metrics_protocol: otlp_metrics_protocol,
apollo_key: Some(key),
apollo_graph_ref: Some(reference),
schema_id,
metrics,
metrics_reference_mode,
..
} = self
{
if !ENABLED.swap(true, Ordering::Relaxed) {
tracing::info!(
"Apollo Studio usage reporting is enabled. See https://go.apollo.dev/o/data for details"
);
}
Self::configure_apollo_metrics(
builder,
endpoint,
key,
reference,
schema_id,
&metrics.usage_reports.batch_processor,
*metrics_reference_mode,
)?;
if std::env::var("EXPERIMENTAL_APOLLO_OTLP_METRICS_ENABLED")
.unwrap_or_else(|_| "true".to_string())
== "true"
{
Self::configure_apollo_otlp_metrics(
builder,
otlp_endpoint,
otlp_metrics_protocol,
key,
reference,
schema_id,
&metrics.otlp.batch_processor,
)?;
}
}
Ok(())
}
}
impl Config {
fn configure_apollo_otlp_metrics(
builder: &mut MetricsBuilder,
endpoint: &Url,
otlp_protocol: &Protocol,
key: &str,
reference: &str,
schema_id: &str,
batch_config: &OtlpMetricsBatchProcessorConfiguration,
) -> Result<(), BoxError> {
tracing::info!("configuring Apollo OTLP metrics: {}", batch_config);
let (exporter, realtime_exporter) = match otlp_protocol {
Protocol::Grpc => {
let mut metadata = MetadataMap::new();
metadata.insert("apollo.api.key", key.parse()?);
let exporter = MetricExporter::builder()
.with_tonic()
.with_tls_config(ClientTlsConfig::new().with_native_roots())
.with_endpoint(endpoint.as_str())
.with_timeout(batch_config.max_export_timeout)
.with_metadata(metadata.clone())
.with_compression(opentelemetry_otlp::Compression::Gzip)
.with_temporality(Temporality::Delta)
.build()?;
let realtime_exporter = MetricExporter::builder()
.with_tonic()
.with_tls_config(ClientTlsConfig::new().with_native_roots())
.with_endpoint(endpoint.as_str())
.with_timeout(batch_config.max_export_timeout)
.with_metadata(metadata.clone())
.with_compression(opentelemetry_otlp::Compression::Gzip)
.with_temporality(Temporality::Delta)
.build()?;
(exporter, realtime_exporter)
}
Protocol::Http => {
let endpoint_str = process_endpoint(
&Some(endpoint.to_string()),
&TelemetryDataKind::Metrics,
&Protocol::Http,
)?
.ok_or("A valid HTTP OTLP endpoint is required when using the HTTP protocol")?;
let headers = HashMap::from([("x-api-key".to_string(), key.to_string())]);
let exporter = MetricExporter::builder()
.with_http()
.with_timeout(batch_config.max_export_timeout)
.with_temporality(Temporality::Delta)
.with_compression(opentelemetry_otlp::Compression::Gzip)
.with_headers(headers.clone())
.with_endpoint(endpoint_str.clone())
.build()?;
let realtime_exporter = MetricExporter::builder()
.with_http()
.with_timeout(batch_config.max_export_timeout)
.with_temporality(Temporality::Delta)
.with_compression(opentelemetry_otlp::Compression::Gzip)
.with_headers(headers)
.with_endpoint(endpoint_str)
.build()?;
(exporter, realtime_exporter)
}
};
let named_exporter = NamedMetricExporter::new(
OverflowMetricExporter::new_push(RetryMetricExporter::new(exporter)),
"apollo",
);
let named_realtime_exporter = NamedMetricExporter::new(
OverflowMetricExporter::new_push(RetryMetricExporter::new(realtime_exporter)),
"apollo",
);
let default_reader = PeriodicReader::builder(named_exporter, runtime::Tokio)
.with_interval(Duration::from_secs(60))
.build();
let realtime_reader = PeriodicReader::builder(named_realtime_exporter, runtime::Tokio)
.with_interval(batch_config.scheduled_delay)
.build();
let resource = Resource::builder_empty()
.with_attributes([
KeyValue::new("apollo.router.id", router_id()),
KeyValue::new("apollo.graph.ref", reference.to_string()),
KeyValue::new("apollo.schema.id", schema_id.to_string()),
KeyValue::new(
"apollo.user.agent",
format!(
"{}@{}",
std::env!("CARGO_PKG_NAME"),
std::env!("CARGO_PKG_VERSION")
),
),
KeyValue::new("apollo.client.host", hostname()?),
KeyValue::new("apollo.client.uname", get_uname()?),
])
.build();
builder
.with_reader(MeterProviderType::Apollo, default_reader)
.with_resource(MeterProviderType::Apollo, resource.clone());
let apollo_buckets = default_buckets();
builder.with_view(MeterProviderType::Apollo, move |instrument: &Instrument| {
if instrument.kind() == InstrumentKind::Histogram {
Some(
Stream::builder()
.with_aggregation(Aggregation::ExplicitBucketHistogram {
boundaries: apollo_buckets.clone(),
record_min_max: true,
})
.build()
.expect("Failed to create stream for apollo metrics"),
)
} else {
None
}
});
builder
.with_reader(MeterProviderType::ApolloRealtime, realtime_reader)
.with_resource(MeterProviderType::ApolloRealtime, resource.clone());
let realtime_histogram_buckets = realtime_buckets();
builder.with_view(
MeterProviderType::ApolloRealtime,
move |instrument: &Instrument| {
if instrument.kind() == InstrumentKind::Histogram {
Some(
Stream::builder()
.with_aggregation(Aggregation::ExplicitBucketHistogram {
boundaries: realtime_histogram_buckets.clone(),
record_min_max: true,
})
.build()
.expect("Failed to create stream for apollo realtime metrics"),
)
} else {
None
}
},
);
Ok(())
}
fn configure_apollo_metrics(
builder: &mut MetricsBuilder,
endpoint: &Url,
key: &str,
reference: &str,
schema_id: &str,
batch_config: &ApolloUsageReportsBatchProcessorConfiguration,
metrics_reference_mode: ApolloMetricsReferenceMode,
) -> Result<(), BoxError> {
tracing::info!("configuring Apollo usage report metrics: {}", batch_config);
let exporter = ApolloExporter::new(
endpoint,
batch_config,
key,
reference,
schema_id,
router_id(),
metrics_reference_mode,
)?;
builder.with_apollo_metrics_sender(exporter.start());
Ok(())
}
}
#[cfg(test)]
mod test {
use std::future::Future;
use std::time::Duration;
use serde_json::Value;
use tokio_stream::StreamExt;
use tokio_stream::wrappers::ReceiverStream;
use tower::ServiceExt;
use super::studio::SingleStatsReport;
use super::*;
use crate::Context;
use crate::TestHarness;
use crate::context::OPERATION_KIND;
use crate::plugin::Plugin;
use crate::plugin::PluginInit;
use crate::plugin::PluginPrivate;
use crate::plugins::subscription;
use crate::plugins::telemetry::STUDIO_EXCLUDE;
use crate::plugins::telemetry::Telemetry;
use crate::plugins::telemetry::apollo;
use crate::plugins::telemetry::apollo::ENDPOINT_DEFAULT;
use crate::plugins::telemetry::apollo_exporter::Sender;
use crate::query_planner::OperationKind;
use crate::services::SupergraphRequest;
#[tokio::test(flavor = "multi_thread")]
async fn apollo_metrics_disabled() -> Result<(), BoxError> {
let config = r#"
telemetry:
apollo:
endpoint: "http://example.com"
client_name_header: "name_header"
client_version_header: "version_header"
buffer_size: 10000
schema_id: "schema_sha"
"#;
let plugin = create_telemetry_plugin(config).await?;
assert!(matches!(plugin.apollo_metrics_sender, Sender::Noop));
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn apollo_metrics_enabled() -> Result<(), BoxError> {
let plugin = create_default_telemetry_plugin().await?;
assert!(matches!(plugin.apollo_metrics_sender, Sender::Apollo(_)));
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn apollo_metrics_single_operation() -> Result<(), BoxError> {
let query = "query {topProducts{name}}";
let results = get_metrics_for_request(query, None, None, false, None).await?;
let mut settings = insta::Settings::clone_current();
settings.set_sort_maps(true);
settings.add_redaction("[].request_id", "[REDACTED]");
settings.bind(|| {
insta::assert_json_snapshot!(results);
});
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn apollo_metrics_for_subscription() -> Result<(), BoxError> {
let query = "subscription {userWasCreated{name}}";
let context = Context::new();
let _ = context
.insert(OPERATION_KIND, OperationKind::Subscription)
.unwrap();
let results = get_metrics_for_request(query, None, Some(context), true, None).await?;
let mut settings = insta::Settings::clone_current();
settings.set_sort_maps(true);
settings.add_redaction("[].request_id", "[REDACTED]");
settings.bind(|| {
insta::assert_json_snapshot!(results);
});
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn apollo_metrics_for_subscription_error() -> Result<(), BoxError> {
let query = "subscription{reviewAdded{body}}";
let context = Context::new();
let _ = context
.insert(OPERATION_KIND, OperationKind::Subscription)
.unwrap();
let results = get_metrics_for_request(query, None, Some(context), true, None).await?;
let mut settings = insta::Settings::clone_current();
settings.set_sort_maps(true);
settings.add_redaction("[].request_id", "[REDACTED]");
settings.bind(|| {
insta::assert_json_snapshot!(results);
});
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn apollo_metrics_multiple_operations() -> Result<(), BoxError> {
let query = "query {topProducts{name}} query {topProducts{name}}";
let results = get_metrics_for_request(query, None, None, false, None).await?;
let mut settings = insta::Settings::clone_current();
settings.set_sort_maps(true);
settings.add_redaction("[].request_id", "[REDACTED]");
settings.bind(|| {
insta::assert_json_snapshot!(results);
});
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn apollo_metrics_parse_failure() -> Result<(), BoxError> {
let query = "garbage";
let results = get_metrics_for_request(query, None, None, false, None).await?;
let mut settings = insta::Settings::clone_current();
settings.set_sort_maps(true);
settings.add_redaction("[].request_id", "[REDACTED]");
settings.bind(|| {
insta::assert_json_snapshot!(results);
});
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn apollo_metrics_unknown_operation() -> Result<(), BoxError> {
let query = "query {topProducts{name}}";
let results = get_metrics_for_request(query, Some("UNKNOWN"), None, false, None).await?;
let mut settings = insta::Settings::clone_current();
settings.set_sort_maps(true);
settings.add_redaction("[].request_id", "[REDACTED]");
settings.bind(|| insta::assert_json_snapshot!(results));
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn apollo_metrics_validation_failure() -> Result<(), BoxError> {
let query = "query {topProducts(minStarRating: 4.7){name}}";
let results = get_metrics_for_request(query, None, None, false, None).await?;
let mut settings = insta::Settings::clone_current();
settings.set_sort_maps(true);
settings.add_redaction("[].request_id", "[REDACTED]");
settings.bind(|| {
insta::assert_json_snapshot!(results);
});
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn apollo_metrics_exclude() -> Result<(), BoxError> {
let query = "query {topProducts{name}}";
let context = Context::new();
context.insert(STUDIO_EXCLUDE, true)?;
let results = get_metrics_for_request(query, None, Some(context), false, None).await?;
let mut settings = insta::Settings::clone_current();
settings.set_sort_maps(true);
settings.add_redaction("[].request_id", "[REDACTED]");
settings.bind(|| {
insta::assert_json_snapshot!(results);
});
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn apollo_metrics_features_explicitly_enabled() -> Result<(), BoxError> {
let query = "query {topProducts{name}}";
let plugin = create_telemetry_plugin(include_str!(
"../../testdata/full_config_all_features_enabled.router.yaml"
))
.await?;
let results = get_metrics_for_request(query, None, None, false, Some(plugin)).await?;
let mut settings = insta::Settings::clone_current();
settings.set_sort_maps(true);
settings.add_redaction("[].request_id", "[REDACTED]");
settings.bind(|| {
insta::assert_json_snapshot!(results);
});
let query = "query {topProducts{name}}";
let plugin = create_telemetry_plugin(include_str!(
"../../testdata/full_config_all_features_enabled_response_cache.router.yaml"
))
.await?;
let results = get_metrics_for_request(query, None, None, false, Some(plugin)).await?;
let mut settings = insta::Settings::clone_current();
settings.set_sort_maps(true);
settings.add_redaction("[].request_id", "[REDACTED]");
settings.bind(|| {
insta::assert_json_snapshot!(results);
});
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn apollo_metrics_features_explicitly_disabled() -> Result<(), BoxError> {
let query = "query {topProducts{name}}";
let plugin = create_telemetry_plugin(include_str!(
"../../testdata/full_config_all_features_explicitly_disabled.router.yaml"
))
.await?;
let results = get_metrics_for_request(query, None, None, false, Some(plugin)).await?;
let mut settings = insta::Settings::clone_current();
settings.set_sort_maps(true);
settings.add_redaction("[].request_id", "[REDACTED]");
settings.bind(|| {
insta::assert_json_snapshot!(results);
});
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn apollo_metrics_features_disabled_when_defaulted() -> Result<(), BoxError> {
let query = "query {topProducts{name}}";
let plugin = create_telemetry_plugin(include_str!(
"../../testdata/full_config_all_features_defaults.router.yaml"
))
.await?;
let results = get_metrics_for_request(query, None, None, false, Some(plugin)).await?;
let mut settings = insta::Settings::clone_current();
settings.set_sort_maps(true);
settings.add_redaction("[].request_id", "[REDACTED]");
settings.bind(|| {
insta::assert_json_snapshot!(results);
});
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn apollo_metrics_distributed_apq_cache_feature_enabled_with_partial_defaults()
-> Result<(), BoxError> {
let query = "query {topProducts{name}}";
let plugin = create_telemetry_plugin(include_str!(
"../../testdata/full_config_apq_enabled_partial_defaults.router.yaml"
))
.await?;
let results = get_metrics_for_request(query, None, None, false, Some(plugin)).await?;
let mut settings = insta::Settings::clone_current();
settings.set_sort_maps(true);
settings.add_redaction("[].request_id", "[REDACTED]");
settings.bind(|| {
insta::assert_json_snapshot!(results);
});
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn apollo_metrics_distributed_apq_cache_feature_disabled_with_partial_defaults()
-> Result<(), BoxError> {
let query = "query {topProducts{name}}";
let plugin = create_telemetry_plugin(include_str!(
"../../testdata/full_config_apq_disabled_partial_defaults.router.yaml"
))
.await?;
let results = get_metrics_for_request(query, None, None, false, Some(plugin)).await?;
let mut settings = insta::Settings::clone_current();
settings.set_sort_maps(true);
settings.add_redaction("[].request_id", "[REDACTED]");
settings.bind(|| {
insta::assert_json_snapshot!(results);
});
Ok(())
}
async fn get_metrics_for_request(
query: &str,
operation_name: Option<&str>,
context: Option<Context>,
is_subscription: bool,
telemetry_plugin: Option<Telemetry>,
) -> Result<Vec<SingleStatsReport>, BoxError> {
let _ = tracing_subscriber::fmt::try_init();
let mut plugin = if let Some(p) = telemetry_plugin {
p
} else {
create_default_telemetry_plugin().await?
};
let (tx, rx) = tokio::sync::mpsc::channel(100);
plugin.apollo_metrics_sender = Sender::Apollo(tx);
let mut request_builder = SupergraphRequest::fake_builder()
.header("name_header", "test_client")
.header("version_header", "1.0-test")
.query(query)
.and_operation_name(operation_name)
.and_context(context);
if is_subscription {
request_builder =
request_builder.header("accept", "multipart/mixed;subscriptionSpec=1.0");
}
TestHarness::builder()
.extra_private_plugin(plugin)
.extra_plugin(create_subscription_plugin().await?)
.build_router()
.await?
.oneshot(request_builder.build()?.try_into().unwrap())
.await
.unwrap()
.next_response()
.await
.unwrap()
.unwrap();
let default_latency = Duration::from_millis(100);
let results = ReceiverStream::new(rx)
.collect::<Vec<_>>()
.await
.into_iter()
.filter_map(|m| match m {
apollo::SingleReport::Stats(mut m) => {
m.stats.iter_mut().for_each(|(_k, v)| {
v.stats_with_context.query_latency_stats.latency = default_latency
});
Some(m)
}
apollo::SingleReport::Traces(_) => None,
})
.collect();
Ok(results)
}
fn create_default_telemetry_plugin() -> impl Future<Output = Result<Telemetry, BoxError>> {
let config = format!(
r#"
telemetry:
apollo:
endpoint: "{ENDPOINT_DEFAULT}"
apollo_key: "key"
apollo_graph_ref: "ref"
client_name_header: "name_header"
client_version_header: "version_header"
buffer_size: 10000
schema_id: "schema_sha"
"#
);
async move { create_telemetry_plugin(&config).await }
}
async fn create_telemetry_plugin(full_config: &str) -> Result<Telemetry, BoxError> {
let full_config = serde_yaml::from_str::<Value>(full_config).expect("yaml must be valid");
let telemetry_config = full_config
.as_object()
.expect("must be an object")
.get("telemetry")
.expect("telemetry must be a root key");
let init = PluginInit::fake_builder()
.config(telemetry_config.clone())
.full_config(full_config)
.build()
.with_deserialized_config()
.expect("unable to deserialize telemetry config");
Telemetry::new(init).await
}
async fn create_subscription_plugin() -> Result<subscription::Subscription, BoxError> {
<subscription::Subscription as Plugin>::new(PluginInit::fake_new(
subscription::SubscriptionConfig::default(),
Default::default(),
))
.await
}
}