metry 0.1.1

All-in-one telemetry framework, based on tracing crate.
Documentation
//! [SpanExporter] for exporting OTEL traces to Xray backend.

use std::borrow::Cow;
use std::time::{SystemTime, UNIX_EPOCH};

use futures_core::future::BoxFuture;
use serde::Serialize;
use serde_json::{json, Value};

use aws_config::BehaviorVersion;
use aws_sdk_xray::Client;

use opentelemetry::trace::{SpanContext, Status, TraceError, TraceResult};
use opentelemetry_sdk::export::trace::{ExportResult, SpanData, SpanExporter};

// Ref https://github.com/open-telemetry/opentelemetry-rust-contrib/blob/main/opentelemetry-aws/src/trace/xray_propagator.rs#L271
const AWS_XRAY_VERSION_KEY: &str = "1";

#[derive(Debug)]
pub struct XrayExporterBuilder {
    /// The Xray client
    sdk_client: Client,
}

impl XrayExporterBuilder {
    /// Load default AWS config and create xray client
    pub async fn with_default_config() -> Self {
        let config = aws_config::load_defaults(BehaviorVersion::latest()).await;
        let sdk_client = Client::new(&config);
        XrayExporterBuilder { sdk_client }
    }

    /// Create a span exporter with the current configuration
    pub fn build_span_exporter(self) -> Result<opentelemetry_otlp::SpanExporter, TraceError> {
        let client = XrayClient::new(self.sdk_client);
        Ok(opentelemetry_otlp::SpanExporter::new(client))
    }
}

#[derive(Debug)]
struct XrayClient {
    sdk_client: Client,
}

impl XrayClient {
    fn new(sdk_client: Client) -> Self {
        XrayClient { sdk_client }
    }
}

impl SpanExporter for XrayClient {
    fn export(&mut self, batch: Vec<SpanData>) -> BoxFuture<'static, ExportResult> {
        let body = match build_body(batch) {
            Ok(body) => body,
            Err(e) => return Box::pin(std::future::ready(Err(e))),
        };

        tracing::debug!("Send traces: {:#?}", body);
        let mut trace_segment_builder = self.sdk_client.put_trace_segments();
        for segment in body {
            trace_segment_builder =
                trace_segment_builder.trace_segment_documents(segment.to_string());
        }

        Box::pin(async move {
            // Send the tracing data
            let trace_result = trace_segment_builder.send().await;

            match trace_result {
                Ok(res) => tracing::debug!("Traces delivered: {:?}", res),
                Err(error) => {
                    tracing::error!("Problem delivering traces: {:?}", error);
                    return Err(TraceError::Other(error.into()));
                }
            };

            Ok(())
        })
    }

    fn shutdown(&mut self) {}
}

#[derive(Serialize)]
struct XraySdkAttributes {
    sdk: Cow<'static, str>,
    sdk_version: Option<Cow<'static, str>>,
    auto_instrumentation: bool,
}

#[derive(Serialize)]
struct XrayTraceSegmentAwsAttributes {
    xray: XraySdkAttributes,
}

#[derive(Serialize)]
struct XrayTraceSegment {
    name: String,
    r#type: String,
    id: String,
    parent_id: String,
    trace_id: String,
    start_time: f64,
    end_time: f64,
    // The following properties are not necessary. Copied them from OTEL traces.
    fault: bool,
    error: bool,
    throttle: bool,
    aws: XrayTraceSegmentAwsAttributes,
}

fn transform_otel_span_to_xray_segment(span: SpanData) -> Value {
    let is_error = matches!(span.status, Status::Error { description: _ });
    let segment = XrayTraceSegment {
        name: span.name.to_string(),
        id: span.span_context.span_id().to_string(),
        start_time: to_xray_timestamp(span.start_time),
        trace_id: extract_xray_trace_id(span.span_context),
        end_time: to_xray_timestamp(span.end_time),
        parent_id: span.parent_span_id.to_string(),
        r#type: "subsegment".to_string(),
        // The following properties are not necessary. Copied them from OTEL traces.
        fault: is_error,
        error: is_error,
        throttle: false,
        aws: XrayTraceSegmentAwsAttributes {
            xray: XraySdkAttributes {
                sdk: span.instrumentation_lib.name,
                sdk_version: span.instrumentation_lib.version,
                auto_instrumentation: false,
            },
        },
    };
    json!(segment)
}

/// Converts system timestamp into seconds with microsecond precision.
fn to_xray_timestamp(time: SystemTime) -> f64 {
    time.duration_since(UNIX_EPOCH).unwrap().as_micros() as f64 / 1000000_f64
}

/// Convert a batch of spans into JSON array.
fn build_body(spans: Vec<SpanData>) -> TraceResult<Vec<Value>> {
    let payload: Vec<Value> = spans
        .into_iter()
        .map(transform_otel_span_to_xray_segment)
        .collect();

    Ok(payload)
}

/// Convert OTLP Trace ID into Xray format.
fn extract_xray_trace_id(cx: SpanContext) -> String {
    let trace_id_as_hex = cx.trace_id().to_string();
    let (timestamp, xray_id) = trace_id_as_hex.split_at(8_usize);
    let trace_id = format!("{}-{}-{}", AWS_XRAY_VERSION_KEY, timestamp, xray_id);
    trace_id
}