use std::borrow::Cow;
use std::time::{SystemTime, UNIX_EPOCH};
use futures_core::future::BoxFuture;
use serde::Serialize;
use serde_json::{json, Value};
use aws_config::BehaviorVersion;
use aws_sdk_xray::Client;
use opentelemetry::trace::{SpanContext, Status, TraceError, TraceResult};
use opentelemetry_sdk::export::trace::{ExportResult, SpanData, SpanExporter};
const AWS_XRAY_VERSION_KEY: &str = "1";
#[derive(Debug)]
pub struct XrayExporterBuilder {
sdk_client: Client,
}
impl XrayExporterBuilder {
pub async fn with_default_config() -> Self {
let config = aws_config::load_defaults(BehaviorVersion::latest()).await;
let sdk_client = Client::new(&config);
XrayExporterBuilder { sdk_client }
}
pub fn build_span_exporter(self) -> Result<opentelemetry_otlp::SpanExporter, TraceError> {
let client = XrayClient::new(self.sdk_client);
Ok(opentelemetry_otlp::SpanExporter::new(client))
}
}
#[derive(Debug)]
struct XrayClient {
sdk_client: Client,
}
impl XrayClient {
fn new(sdk_client: Client) -> Self {
XrayClient { sdk_client }
}
}
impl SpanExporter for XrayClient {
fn export(&mut self, batch: Vec<SpanData>) -> BoxFuture<'static, ExportResult> {
let body = match build_body(batch) {
Ok(body) => body,
Err(e) => return Box::pin(std::future::ready(Err(e))),
};
tracing::debug!("Send traces: {:#?}", body);
let mut trace_segment_builder = self.sdk_client.put_trace_segments();
for segment in body {
trace_segment_builder =
trace_segment_builder.trace_segment_documents(segment.to_string());
}
Box::pin(async move {
let trace_result = trace_segment_builder.send().await;
match trace_result {
Ok(res) => tracing::debug!("Traces delivered: {:?}", res),
Err(error) => {
tracing::error!("Problem delivering traces: {:?}", error);
return Err(TraceError::Other(error.into()));
}
};
Ok(())
})
}
fn shutdown(&mut self) {}
}
#[derive(Serialize)]
struct XraySdkAttributes {
sdk: Cow<'static, str>,
sdk_version: Option<Cow<'static, str>>,
auto_instrumentation: bool,
}
#[derive(Serialize)]
struct XrayTraceSegmentAwsAttributes {
xray: XraySdkAttributes,
}
#[derive(Serialize)]
struct XrayTraceSegment {
name: String,
r#type: String,
id: String,
parent_id: String,
trace_id: String,
start_time: f64,
end_time: f64,
fault: bool,
error: bool,
throttle: bool,
aws: XrayTraceSegmentAwsAttributes,
}
fn transform_otel_span_to_xray_segment(span: SpanData) -> Value {
let is_error = matches!(span.status, Status::Error { description: _ });
let segment = XrayTraceSegment {
name: span.name.to_string(),
id: span.span_context.span_id().to_string(),
start_time: to_xray_timestamp(span.start_time),
trace_id: extract_xray_trace_id(span.span_context),
end_time: to_xray_timestamp(span.end_time),
parent_id: span.parent_span_id.to_string(),
r#type: "subsegment".to_string(),
fault: is_error,
error: is_error,
throttle: false,
aws: XrayTraceSegmentAwsAttributes {
xray: XraySdkAttributes {
sdk: span.instrumentation_lib.name,
sdk_version: span.instrumentation_lib.version,
auto_instrumentation: false,
},
},
};
json!(segment)
}
fn to_xray_timestamp(time: SystemTime) -> f64 {
time.duration_since(UNIX_EPOCH).unwrap().as_micros() as f64 / 1000000_f64
}
fn build_body(spans: Vec<SpanData>) -> TraceResult<Vec<Value>> {
let payload: Vec<Value> = spans
.into_iter()
.map(transform_otel_span_to_xray_segment)
.collect();
Ok(payload)
}
fn extract_xray_trace_id(cx: SpanContext) -> String {
let trace_id_as_hex = cx.trace_id().to_string();
let (timestamp, xray_id) = trace_id_as_hex.split_at(8_usize);
let trace_id = format!("{}-{}-{}", AWS_XRAY_VERSION_KEY, timestamp, xray_id);
trace_id
}