use std::fmt::Debug;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use opentelemetry::metrics::{Counter, UpDownCounter};
use opentelemetry::{KeyValue, global};
use opentelemetry_sdk::Resource;
use opentelemetry_sdk::error::OTelSdkResult;
use opentelemetry_sdk::logs::{LogBatch, LogExporter};
use opentelemetry_sdk::trace::{SpanData, SpanExporter};
use url::Url;
use crate::error::ExporterKind;
static EXPORTER_INSTANCE_COUNTER: AtomicU64 = AtomicU64::new(0);
struct ExporterMetrics {
exported: Counter<u64>,
inflight: UpDownCounter<i64>,
base_attributes: Vec<KeyValue>,
failure_attributes: Vec<KeyValue>,
}
impl ExporterMetrics {
fn new_with_instance(
signal: &'static str,
exporter: ExporterKind,
endpoint: Option<&Url>,
instance_id: u64,
) -> Self {
let meter = global::meter("apollo-opentelemetry");
let exported = meter
.u64_counter(format!("otel.sdk.exporter.{signal}.exported"))
.with_description(format!(
"The number of {signal}s for which the export has finished"
))
.with_unit(format!("{{{signal}}}"))
.build();
let inflight = meter
.i64_up_down_counter(format!("otel.sdk.exporter.{signal}.inflight"))
.with_description(format!("Number of {signal}s currently being exported"))
.with_unit(format!("{{{signal}}}"))
.build();
let component_type = format!("{}_exporter", exporter.as_str());
let component_name = format!("{}_exporter/{}", exporter.as_str(), instance_id);
let mut base_attributes = vec![
KeyValue::new("otel.component.type", component_type.clone()),
KeyValue::new("otel.component.name", component_name.clone()),
];
if let Some(endpoint) = endpoint {
if let Some(host) = endpoint.host_str() {
base_attributes.push(KeyValue::new("server.address", host.to_string()));
}
if let Some(port) = endpoint.port_or_known_default() {
base_attributes.push(KeyValue::new("server.port", i64::from(port)));
}
}
let mut failure_attributes = base_attributes.clone();
failure_attributes.push(KeyValue::new("error.type", "export_failed"));
Self {
exported,
inflight,
base_attributes,
failure_attributes,
}
}
fn track_export(&self, count: u64) -> InflightGuard<'_> {
self.inflight.add(count as i64, &self.base_attributes);
InflightGuard {
metrics: self,
count,
success: false,
}
}
}
struct InflightGuard<'a> {
metrics: &'a ExporterMetrics,
count: u64,
success: bool,
}
impl InflightGuard<'_> {
fn set_success(&mut self, success: bool) {
self.success = success;
}
}
impl Drop for InflightGuard<'_> {
fn drop(&mut self) {
self.metrics
.inflight
.add(-(self.count as i64), &self.metrics.base_attributes);
let attrs = if self.success {
&self.metrics.base_attributes
} else {
&self.metrics.failure_attributes
};
self.metrics.exported.add(self.count, attrs);
}
}
pub struct InstrumentedSpanExporter<E> {
inner: E,
metrics: Arc<ExporterMetrics>,
instance_id: u64,
total_exported: AtomicU64,
total_failed: AtomicU64,
}
impl<E: Debug> Debug for InstrumentedSpanExporter<E> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("InstrumentedSpanExporter")
.field("inner", &self.inner)
.field(
"total_exported",
&self.total_exported.load(Ordering::Relaxed),
)
.field("total_failed", &self.total_failed.load(Ordering::Relaxed))
.finish()
}
}
impl<E> InstrumentedSpanExporter<E> {
pub fn new(inner: E, exporter: ExporterKind, endpoint: Option<&Url>) -> Self {
let instance_id = EXPORTER_INSTANCE_COUNTER.fetch_add(1, Ordering::Relaxed);
Self {
inner,
metrics: Arc::new(ExporterMetrics::new_with_instance(
"span",
exporter,
endpoint,
instance_id,
)),
instance_id,
total_exported: AtomicU64::new(0),
total_failed: AtomicU64::new(0),
}
}
pub fn instance_id(&self) -> u64 {
self.instance_id
}
}
impl<E: SpanExporter> SpanExporter for InstrumentedSpanExporter<E> {
async fn export(&self, batch: Vec<SpanData>) -> OTelSdkResult {
let count = batch.len() as u64;
let mut guard = self.metrics.track_export(count);
let result = self.inner.export(batch).await;
let success = result.is_ok();
guard.set_success(success);
if success {
self.total_exported.fetch_add(count, Ordering::Relaxed);
} else {
self.total_failed.fetch_add(count, Ordering::Relaxed);
}
result
}
fn shutdown_with_timeout(&mut self, timeout: std::time::Duration) -> OTelSdkResult {
self.inner.shutdown_with_timeout(timeout)
}
fn set_resource(&mut self, resource: &Resource) {
self.inner.set_resource(resource);
}
}
pub struct InstrumentedLogExporter<E> {
inner: E,
metrics: Arc<ExporterMetrics>,
instance_id: u64,
total_exported: AtomicU64,
total_failed: AtomicU64,
}
impl<E: Debug> Debug for InstrumentedLogExporter<E> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("InstrumentedLogExporter")
.field("inner", &self.inner)
.field(
"total_exported",
&self.total_exported.load(Ordering::Relaxed),
)
.field("total_failed", &self.total_failed.load(Ordering::Relaxed))
.finish()
}
}
impl<E> InstrumentedLogExporter<E> {
pub fn new(inner: E, exporter: ExporterKind, endpoint: Option<&Url>) -> Self {
let instance_id = EXPORTER_INSTANCE_COUNTER.fetch_add(1, Ordering::Relaxed);
Self {
inner,
metrics: Arc::new(ExporterMetrics::new_with_instance(
"log",
exporter,
endpoint,
instance_id,
)),
instance_id,
total_exported: AtomicU64::new(0),
total_failed: AtomicU64::new(0),
}
}
pub fn instance_id(&self) -> u64 {
self.instance_id
}
}
impl<E: LogExporter> LogExporter for InstrumentedLogExporter<E> {
async fn export(&self, batch: LogBatch<'_>) -> OTelSdkResult {
let count = batch.iter().count() as u64;
let mut guard = self.metrics.track_export(count);
let result = self.inner.export(batch).await;
let success = result.is_ok();
guard.set_success(success);
if success {
self.total_exported.fetch_add(count, Ordering::Relaxed);
} else {
self.total_failed.fetch_add(count, Ordering::Relaxed);
}
result
}
fn shutdown_with_timeout(&self, timeout: std::time::Duration) -> OTelSdkResult {
self.inner.shutdown_with_timeout(timeout)
}
fn set_resource(&mut self, resource: &Resource) {
self.inner.set_resource(resource);
}
}
#[cfg(test)]
mod tests {
use std::sync::atomic::AtomicUsize;
use apollo_opentelemetry_test::{TelemetryContext, assert_metric};
use opentelemetry::InstrumentationScope;
use super::*;
#[derive(Debug, Default)]
struct MockSpanExporter {
export_count: AtomicUsize,
should_fail: bool,
}
impl SpanExporter for MockSpanExporter {
async fn export(&self, batch: Vec<SpanData>) -> OTelSdkResult {
self.export_count.fetch_add(batch.len(), Ordering::SeqCst);
if self.should_fail {
Err(opentelemetry_sdk::error::OTelSdkError::InternalFailure(
"mock failure".to_string(),
))
} else {
Ok(())
}
}
fn shutdown_with_timeout(&mut self, _timeout: std::time::Duration) -> OTelSdkResult {
Ok(())
}
fn set_resource(&mut self, _resource: &Resource) {}
}
#[derive(Debug, Default)]
struct MockLogExporter {
export_count: AtomicUsize,
should_fail: bool,
}
impl LogExporter for MockLogExporter {
async fn export(&self, batch: LogBatch<'_>) -> OTelSdkResult {
self.export_count
.fetch_add(batch.iter().count(), Ordering::SeqCst);
if self.should_fail {
Err(opentelemetry_sdk::error::OTelSdkError::InternalFailure(
"mock failure".to_string(),
))
} else {
Ok(())
}
}
fn shutdown_with_timeout(&self, _timeout: std::time::Duration) -> OTelSdkResult {
Ok(())
}
fn set_resource(&mut self, _resource: &Resource) {}
}
fn create_test_span_data() -> SpanData {
use opentelemetry::trace::{
SpanContext, SpanId, SpanKind, Status, TraceFlags, TraceId, TraceState,
};
use std::borrow::Cow;
use std::time::SystemTime;
let span_context = SpanContext::new(
TraceId::from_hex("0102030405060708090a0b0c0d0e0f10").unwrap(),
SpanId::from_hex("0102030405060708").unwrap(),
TraceFlags::SAMPLED,
false,
TraceState::default(),
);
SpanData {
span_context,
parent_span_id: SpanId::INVALID,
parent_span_is_remote: false,
name: Cow::Borrowed("test-span"),
span_kind: SpanKind::Internal,
start_time: SystemTime::now(),
end_time: SystemTime::now(),
attributes: Vec::new(),
dropped_attributes_count: 0,
events: opentelemetry_sdk::trace::SpanEvents::default(),
links: opentelemetry_sdk::trace::SpanLinks::default(),
status: Status::Unset,
instrumentation_scope: InstrumentationScope::builder("test").build(),
}
}
fn test_endpoint() -> Url {
Url::parse("http://localhost:4318").unwrap()
}
#[tokio::test]
async fn instrumented_span_exporter_tracks_successful_exports() {
let mock = MockSpanExporter::default();
let exporter =
InstrumentedSpanExporter::new(mock, ExporterKind::OtlpHttp, Some(&test_endpoint()));
let batch = vec![create_test_span_data(), create_test_span_data()];
let result = exporter.export(batch).await;
assert!(result.is_ok());
assert_eq!(exporter.total_exported.load(Ordering::Relaxed), 2);
assert_eq!(exporter.total_failed.load(Ordering::Relaxed), 0);
}
#[tokio::test]
async fn instrumented_span_exporter_tracks_failed_exports() {
let mock = MockSpanExporter {
should_fail: true,
..Default::default()
};
let exporter =
InstrumentedSpanExporter::new(mock, ExporterKind::OtlpHttp, Some(&test_endpoint()));
let batch = vec![create_test_span_data()];
let result = exporter.export(batch).await;
assert!(result.is_err());
assert_eq!(exporter.total_exported.load(Ordering::Relaxed), 0);
assert_eq!(exporter.total_failed.load(Ordering::Relaxed), 1);
}
#[tokio::test]
async fn instrumented_span_exporter_emits_exported_metric_on_success() {
let ctx = TelemetryContext::new();
let mock = MockSpanExporter::default();
let exporter =
InstrumentedSpanExporter::new(mock, ExporterKind::OtlpHttp, Some(&test_endpoint()));
let batch = vec![create_test_span_data(), create_test_span_data()];
let result = exporter.export(batch).await;
assert!(result.is_ok());
assert_metric!(
ctx,
"otel.sdk.exporter.span.exported",
"otel.component.type" = "otlp_http_exporter",
"server.address" = "localhost",
"server.port" = 4318
);
}
#[tokio::test]
async fn instrumented_span_exporter_emits_exported_metric_with_error_type_on_failure() {
let ctx = TelemetryContext::new();
let mock = MockSpanExporter {
should_fail: true,
..Default::default()
};
let exporter =
InstrumentedSpanExporter::new(mock, ExporterKind::OtlpHttp, Some(&test_endpoint()));
let batch = vec![create_test_span_data()];
let result = exporter.export(batch).await;
assert!(result.is_err());
assert_metric!(
ctx,
"otel.sdk.exporter.span.exported",
"otel.component.type" = "otlp_http_exporter",
"server.address" = "localhost",
"server.port" = 4318,
"error.type" = "export_failed"
);
}
#[tokio::test]
async fn instrumented_log_exporter_emits_exported_metric_on_success() {
use opentelemetry::logs::{Logger, LoggerProvider};
use opentelemetry_sdk::logs::{SdkLogRecord, SdkLoggerProvider};
let ctx = TelemetryContext::new();
let mock = MockLogExporter::default();
let exporter =
InstrumentedLogExporter::new(mock, ExporterKind::OtlpHttp, Some(&test_endpoint()));
let provider = SdkLoggerProvider::builder().build();
let logger = provider.logger("test");
let log_record = logger.create_log_record();
let scope = InstrumentationScope::builder("test").build();
let logs: Vec<(&SdkLogRecord, &InstrumentationScope)> = vec![(&log_record, &scope)];
let batch = LogBatch::new(&logs);
let result = exporter.export(batch).await;
assert!(result.is_ok());
assert_metric!(
ctx,
"otel.sdk.exporter.log.exported",
"otel.component.type" = "otlp_http_exporter",
"server.address" = "localhost",
"server.port" = 4318
);
}
#[tokio::test]
async fn instrumented_log_exporter_emits_exported_metric_with_error_type_on_failure() {
use opentelemetry::logs::{Logger, LoggerProvider};
use opentelemetry_sdk::logs::{SdkLogRecord, SdkLoggerProvider};
let ctx = TelemetryContext::new();
let mock = MockLogExporter {
should_fail: true,
..Default::default()
};
let exporter =
InstrumentedLogExporter::new(mock, ExporterKind::OtlpHttp, Some(&test_endpoint()));
let provider = SdkLoggerProvider::builder().build();
let logger = provider.logger("test");
let log_record = logger.create_log_record();
let scope = InstrumentationScope::builder("test").build();
let logs: Vec<(&SdkLogRecord, &InstrumentationScope)> = vec![(&log_record, &scope)];
let batch = LogBatch::new(&logs);
let result = exporter.export(batch).await;
assert!(result.is_err());
assert_metric!(
ctx,
"otel.sdk.exporter.log.exported",
"otel.component.type" = "otlp_http_exporter",
"server.address" = "localhost",
"server.port" = 4318,
"error.type" = "export_failed"
);
}
#[test]
fn unix_socket_endpoint_has_no_server_attributes() {
let unix = Url::parse("unix:///var/run/otel.sock").unwrap();
assert_eq!(unix.scheme(), "unix");
assert_eq!(unix.host_str(), None);
assert_eq!(unix.port(), None);
assert_eq!(unix.path(), "/var/run/otel.sock");
}
#[test]
fn instrumented_span_exporter_instance_id_is_unique() {
let mock1 = MockSpanExporter::default();
let mock2 = MockSpanExporter::default();
let exporter1 =
InstrumentedSpanExporter::new(mock1, ExporterKind::OtlpHttp, Some(&test_endpoint()));
let exporter2 =
InstrumentedSpanExporter::new(mock2, ExporterKind::OtlpHttp, Some(&test_endpoint()));
assert_ne!(exporter1.instance_id(), exporter2.instance_id());
}
#[test]
fn instrumented_log_exporter_instance_id_is_unique() {
let mock1 = MockLogExporter::default();
let mock2 = MockLogExporter::default();
let exporter1 =
InstrumentedLogExporter::new(mock1, ExporterKind::OtlpHttp, Some(&test_endpoint()));
let exporter2 =
InstrumentedLogExporter::new(mock2, ExporterKind::OtlpHttp, Some(&test_endpoint()));
assert_ne!(exporter1.instance_id(), exporter2.instance_id());
}
#[test]
fn instrumented_span_exporter_debug_format() {
let mock = MockSpanExporter::default();
let exporter =
InstrumentedSpanExporter::new(mock, ExporterKind::OtlpHttp, Some(&test_endpoint()));
let debug_str = format!("{:?}", exporter);
assert!(debug_str.contains("InstrumentedSpanExporter"));
assert!(debug_str.contains("total_exported"));
assert!(debug_str.contains("total_failed"));
}
#[test]
fn instrumented_log_exporter_debug_format() {
let mock = MockLogExporter::default();
let exporter =
InstrumentedLogExporter::new(mock, ExporterKind::OtlpHttp, Some(&test_endpoint()));
let debug_str = format!("{:?}", exporter);
assert!(debug_str.contains("InstrumentedLogExporter"));
assert!(debug_str.contains("total_exported"));
assert!(debug_str.contains("total_failed"));
}
#[test]
fn instrumented_span_exporter_shutdown_delegates_to_inner() {
let mock = MockSpanExporter::default();
let mut exporter =
InstrumentedSpanExporter::new(mock, ExporterKind::OtlpHttp, Some(&test_endpoint()));
let result = exporter.shutdown_with_timeout(std::time::Duration::from_secs(5));
assert!(result.is_ok());
}
#[test]
fn instrumented_log_exporter_shutdown_delegates_to_inner() {
let mock = MockLogExporter::default();
let exporter =
InstrumentedLogExporter::new(mock, ExporterKind::OtlpHttp, Some(&test_endpoint()));
let result = exporter.shutdown_with_timeout(std::time::Duration::from_secs(5));
assert!(result.is_ok());
}
#[test]
fn instrumented_span_exporter_set_resource_delegates_to_inner() {
let mock = MockSpanExporter::default();
let mut exporter =
InstrumentedSpanExporter::new(mock, ExporterKind::OtlpHttp, Some(&test_endpoint()));
let resource = Resource::builder().build();
exporter.set_resource(&resource);
}
#[test]
fn instrumented_log_exporter_set_resource_delegates_to_inner() {
let mock = MockLogExporter::default();
let mut exporter =
InstrumentedLogExporter::new(mock, ExporterKind::OtlpHttp, Some(&test_endpoint()));
let resource = Resource::builder().build();
exporter.set_resource(&resource);
}
}