use derivative::Derivative;
use opentelemetry::InstrumentationScope;
use opentelemetry::KeyValue;
use opentelemetry::trace::Event;
use opentelemetry::trace::SpanContext;
use opentelemetry::trace::Status;
use opentelemetry::trace::TraceFlags;
use opentelemetry::trace::TraceState;
use opentelemetry_otlp::SpanExporterBuilder;
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_otlp::WithTonicConfig;
use opentelemetry_sdk::Resource;
use opentelemetry_sdk::error::OTelSdkResult;
use opentelemetry_sdk::trace::SpanData;
use opentelemetry_sdk::trace::SpanEvents;
use opentelemetry_sdk::trace::SpanExporter;
use opentelemetry_sdk::trace::SpanLinks;
use sys_info::hostname;
use tonic::metadata::MetadataMap;
use tonic::metadata::MetadataValue;
use tonic::transport::ClientTlsConfig;
use tower::BoxError;
use url::Url;
use super::apollo::ErrorsConfiguration;
use super::config_new::subgraph::attributes::SUBGRAPH_NAME;
use super::otlp::Protocol;
use super::tracing::apollo_telemetry::APOLLO_PRIVATE_FTV1;
use super::tracing::apollo_telemetry::LightSpanData;
use super::tracing::apollo_telemetry::encode_ftv1_trace;
use super::tracing::apollo_telemetry::extract_ftv1_trace_with_error_count;
use super::tracing::apollo_telemetry::extract_string;
use crate::plugins::telemetry::GLOBAL_TRACER_NAME;
use crate::plugins::telemetry::apollo::router_id;
use crate::plugins::telemetry::apollo_exporter::ROUTER_REPORT_TYPE_TRACES;
use crate::plugins::telemetry::apollo_exporter::ROUTER_TRACING_PROTOCOL_OTLP;
use crate::plugins::telemetry::apollo_exporter::get_uname;
use crate::plugins::telemetry::consts::SUBGRAPH_SPAN_NAME;
use crate::plugins::telemetry::consts::SUPERGRAPH_SPAN_NAME;
use crate::plugins::telemetry::tracing::BatchProcessorConfig;
use crate::plugins::telemetry::tracing::apollo_telemetry::APOLLO_PRIVATE_OPERATION_SIGNATURE;
#[derive(Derivative)]
#[derivative(Debug)]
pub(crate) struct ApolloOtlpExporter {
instrumentation_scope: InstrumentationScope,
#[derivative(Debug = "ignore")]
otlp_exporter: opentelemetry_otlp::SpanExporter,
errors_configuration: ErrorsConfiguration,
}
impl ApolloOtlpExporter {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
endpoint: &Url,
protocol: &Protocol,
batch_config: &BatchProcessorConfig,
apollo_key: &str,
apollo_graph_ref: &str,
schema_id: &str,
errors_configuration: &ErrorsConfiguration,
) -> Result<ApolloOtlpExporter, BoxError> {
tracing::debug!(endpoint = %endpoint, "creating Apollo OTLP traces exporter");
let mut metadata = MetadataMap::new();
metadata.insert("apollo.api.key", MetadataValue::try_from(apollo_key)?);
let mut otlp_exporter = match protocol {
Protocol::Grpc => SpanExporterBuilder::new()
.with_tonic()
.with_tls_config(ClientTlsConfig::new().with_native_roots())
.with_timeout(batch_config.max_export_timeout)
.with_endpoint(endpoint.to_string())
.with_metadata(metadata)
.with_compression(opentelemetry_otlp::Compression::Gzip)
.build()?,
Protocol::Http => SpanExporterBuilder::new()
.with_http()
.with_timeout(batch_config.max_export_timeout)
.with_endpoint(endpoint.to_string())
.build()?,
};
otlp_exporter.set_resource(
&Resource::builder_empty()
.with_attributes([
KeyValue::new("apollo.router.id", router_id()),
KeyValue::new("apollo.graph.ref", apollo_graph_ref.to_string()),
KeyValue::new("apollo.schema.id", schema_id.to_string()),
KeyValue::new(
"apollo.user.agent",
format!(
"{}@{}",
std::env!("CARGO_PKG_NAME"),
std::env!("CARGO_PKG_VERSION")
),
),
KeyValue::new("apollo.client.host", hostname()?),
KeyValue::new("apollo.client.uname", get_uname()?),
])
.build(),
);
Ok(Self {
instrumentation_scope: InstrumentationScope::builder(GLOBAL_TRACER_NAME)
.with_version(format!(
"{}@{}",
std::env!("CARGO_PKG_NAME"),
std::env!("CARGO_PKG_VERSION")
))
.build(),
otlp_exporter,
errors_configuration: errors_configuration.clone(),
})
}
pub(crate) fn prepare_for_export(
&self,
trace_spans: Vec<LightSpanData>,
) -> Option<Vec<SpanData>> {
let mut export_spans: Vec<SpanData> = Vec::new();
let mut send_trace: bool = false;
trace_spans.into_iter().for_each(|span| {
tracing::debug!("apollo otlp: preparing span '{}'", span.name);
match span.name.as_ref() {
SUPERGRAPH_SPAN_NAME => {
if span
.attributes
.contains_key(&APOLLO_PRIVATE_OPERATION_SIGNATURE)
{
export_spans.push(self.base_prepare_span(span));
send_trace = true
}
}
SUBGRAPH_SPAN_NAME => export_spans.push(self.prepare_subgraph_span(span)),
_ => export_spans.push(self.base_prepare_span(span)),
};
});
if send_trace {
tracing::debug!("apollo otlp: sending trace");
Some(export_spans)
} else {
tracing::debug!("apollo otlp: dropping trace");
None
}
}
fn extract_span_events(span: &LightSpanData) -> SpanEvents {
let mut span_events = SpanEvents::default();
for light_event in &span.events {
span_events.events.push(Event::new(
light_event.name.clone(),
light_event.timestamp,
light_event
.attributes
.iter()
.map(|(k, v)| KeyValue::new(k.clone(), v.clone()))
.collect(),
0,
));
}
span_events
}
fn base_prepare_span(&self, span: LightSpanData) -> SpanData {
SpanData {
span_context: SpanContext::new(
span.trace_id,
span.span_id,
TraceFlags::default().with_sampled(true),
true,
TraceState::default(),
),
parent_span_id: span.parent_span_id,
parent_span_is_remote: false,
span_kind: span.span_kind.clone(),
name: span.name.clone(),
start_time: span.start_time,
end_time: span.end_time,
attributes: span
.attributes
.iter()
.map(|(k, v)| KeyValue::new(k.clone(), v.clone()))
.collect(),
events: Self::extract_span_events(&span),
links: SpanLinks::default(),
status: span.status,
instrumentation_scope: self.instrumentation_scope.clone(),
dropped_attributes_count: span.droppped_attribute_count,
}
}
fn prepare_subgraph_span(&self, mut span: LightSpanData) -> SpanData {
let mut status = Status::Unset;
if let Some(ftv1) = span.attributes.get(&APOLLO_PRIVATE_FTV1) {
let subgraph_name = span
.attributes
.get(&SUBGRAPH_NAME)
.and_then(extract_string)
.unwrap_or_default();
let subgraph_error_config = self
.errors_configuration
.subgraph
.get_error_config(&subgraph_name);
if let Some(Ok((trace_result, error_count))) =
extract_ftv1_trace_with_error_count(ftv1, subgraph_error_config)
{
if error_count > 0 {
status = Status::error("ftv1")
}
let encoded = encode_ftv1_trace(&trace_result);
span.attributes.insert(APOLLO_PRIVATE_FTV1, encoded.into());
}
}
SpanData {
span_context: SpanContext::new(
span.trace_id,
span.span_id,
TraceFlags::default().with_sampled(true),
true,
TraceState::default(),
),
parent_span_id: span.parent_span_id,
parent_span_is_remote: false,
span_kind: span.span_kind.clone(),
name: span.name.clone(),
start_time: span.start_time,
end_time: span.end_time,
attributes: span
.attributes
.iter()
.map(|(k, v)| KeyValue::new(k.clone(), v.clone()))
.collect(),
events: Self::extract_span_events(&span),
links: SpanLinks::default(),
status,
instrumentation_scope: self.instrumentation_scope.clone(),
dropped_attributes_count: span.droppped_attribute_count,
}
}
}
impl SpanExporter for ApolloOtlpExporter {
async fn export(&self, batch: Vec<SpanData>) -> OTelSdkResult {
self.otlp_exporter.export(batch).await?;
u64_counter!(
"apollo.router.telemetry.studio.reports",
"The number of reports submitted to Studio by the Router",
1,
report.type = ROUTER_REPORT_TYPE_TRACES,
report.protocol = ROUTER_TRACING_PROTOCOL_OTLP
);
Ok(())
}
fn shutdown_with_timeout(&mut self, timeout: std::time::Duration) -> OTelSdkResult {
self.otlp_exporter.shutdown_with_timeout(timeout)
}
}