ts_opentelemetry_stdout 0.1.0-beta.1

A fork of an OpenTelemetry exporter for stdout
Documentation
use ts_opentelemetry_api::{global, metrics::MetricsError};
use ts_opentelemetry_sdk::metrics::data;
use serde::{Serialize, Serializer};
use std::{
    any::Any,
    borrow::Cow,
    time::{SystemTime, UNIX_EPOCH},
};

use crate::common::{AttributeSet, KeyValue, Resource, Scope};

/// Transformed metrics data that can be serialized
#[derive(Serialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct MetricsData {
    resource_metrics: ResourceMetrics,
}

impl From<&mut data::ResourceMetrics> for MetricsData {
    fn from(value: &mut data::ResourceMetrics) -> Self {
        MetricsData {
            resource_metrics: value.into(),
        }
    }
}

#[derive(Serialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub(crate) struct ResourceMetrics {
    resource: Resource,
    scope_metrics: Vec<ScopeMetrics>,
    #[serde(skip_serializing_if = "Option::is_none")]
    schema_url: Option<String>,
}

impl From<&mut data::ResourceMetrics> for ResourceMetrics {
    fn from(value: &mut data::ResourceMetrics) -> Self {
        ResourceMetrics {
            resource: Resource::from(&value.resource),
            scope_metrics: value.scope_metrics.drain(..).map(Into::into).collect(),
            schema_url: value.resource.schema_url().map(Into::into),
        }
    }
}

#[derive(Serialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
struct ScopeMetrics {
    scope: Scope,
    metrics: Vec<Metric>,
    #[serde(skip_serializing_if = "Option::is_none")]
    schema_url: Option<Cow<'static, str>>,
}

impl From<data::ScopeMetrics> for ScopeMetrics {
    fn from(value: data::ScopeMetrics) -> Self {
        let schema_url = value.scope.schema_url.clone();
        ScopeMetrics {
            scope: value.scope.into(),
            metrics: value.metrics.into_iter().map(Into::into).collect(),
            schema_url,
        }
    }
}

#[derive(Serialize, Debug, Clone)]
struct Unit(Cow<'static, str>);

impl Unit {
    fn is_empty(&self) -> bool {
        self.0.is_empty()
    }
}

impl From<ts_opentelemetry_api::metrics::Unit> for Unit {
    fn from(unit: ts_opentelemetry_api::metrics::Unit) -> Self {
        Unit(unit.as_str().to_string().into())
    }
}

#[derive(Serialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
struct Metric {
    name: Cow<'static, str>,
    #[serde(skip_serializing_if = "str::is_empty")]
    description: Cow<'static, str>,
    #[serde(skip_serializing_if = "Unit::is_empty")]
    unit: Unit,
    #[serde(flatten)]
    data: Option<MetricData>,
}

impl From<data::Metric> for Metric {
    fn from(value: data::Metric) -> Self {
        Metric {
            name: value.name,
            description: value.description,
            unit: value.unit.into(),
            data: map_data(value.data.as_any()),
        }
    }
}

fn map_data(data: &dyn Any) -> Option<MetricData> {
    if let Some(hist) = data.downcast_ref::<data::Histogram<i64>>() {
        Some(MetricData::Histogram(hist.into()))
    } else if let Some(hist) = data.downcast_ref::<data::Histogram<u64>>() {
        Some(MetricData::Histogram(hist.into()))
    } else if let Some(hist) = data.downcast_ref::<data::Histogram<f64>>() {
        Some(MetricData::Histogram(hist.into()))
    } else if let Some(sum) = data.downcast_ref::<data::Sum<u64>>() {
        Some(MetricData::Sum(sum.into()))
    } else if let Some(sum) = data.downcast_ref::<data::Sum<i64>>() {
        Some(MetricData::Sum(sum.into()))
    } else if let Some(sum) = data.downcast_ref::<data::Sum<f64>>() {
        Some(MetricData::Sum(sum.into()))
    } else if let Some(gauge) = data.downcast_ref::<data::Gauge<u64>>() {
        Some(MetricData::Gauge(gauge.into()))
    } else if let Some(gauge) = data.downcast_ref::<data::Gauge<i64>>() {
        Some(MetricData::Gauge(gauge.into()))
    } else if let Some(gauge) = data.downcast_ref::<data::Gauge<f64>>() {
        Some(MetricData::Gauge(gauge.into()))
    } else {
        global::handle_error(MetricsError::Other("unknown aggregator".into()));
        None
    }
}

#[derive(Serialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
enum MetricData {
    Gauge(Gauge),
    Sum(Sum),
    Histogram(Histogram),
}

#[derive(Serialize, Debug, Clone)]
#[serde(untagged)]
enum DataValue {
    F64(f64),
    I64(i64),
    U64(u64),
}

impl From<f64> for DataValue {
    fn from(value: f64) -> Self {
        DataValue::F64(value)
    }
}

impl From<i64> for DataValue {
    fn from(value: i64) -> Self {
        DataValue::I64(value)
    }
}

impl From<u64> for DataValue {
    fn from(value: u64) -> Self {
        DataValue::U64(value)
    }
}

#[derive(Serialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
struct Gauge {
    data_points: Vec<DataPoint>,
}

impl<T: Into<DataValue> + Copy> From<&data::Gauge<T>> for Gauge {
    fn from(value: &data::Gauge<T>) -> Self {
        Gauge {
            data_points: value.data_points.iter().map(Into::into).collect(),
        }
    }
}

#[derive(Debug, Clone, Copy)]
enum Temporality {
    #[allow(dead_code)]
    Unspecified = 0, // explicitly never used
    Delta = 1,
    Cumulative = 2,
}

impl Serialize for Temporality {
    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
    where
        S: Serializer,
    {
        serializer.serialize_u8(*self as u32 as u8)
    }
}

impl From<data::Temporality> for Temporality {
    fn from(value: data::Temporality) -> Self {
        match value {
            data::Temporality::Cumulative => Temporality::Cumulative,
            data::Temporality::Delta => Temporality::Delta,
            _ => panic!("unexpected temporality"),
        }
    }
}

#[derive(Serialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
struct Sum {
    data_points: Vec<DataPoint>,
    aggregation_temporality: Temporality,
    is_monotonic: bool,
}

impl<T: Into<DataValue> + Copy> From<&data::Sum<T>> for Sum {
    fn from(value: &data::Sum<T>) -> Self {
        Sum {
            data_points: value.data_points.iter().map(Into::into).collect(),
            aggregation_temporality: value.temporality.into(),
            is_monotonic: value.is_monotonic,
        }
    }
}

#[derive(Serialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
struct DataPoint {
    attributes: AttributeSet,
    #[serde(serialize_with = "as_opt_unix_nano")]
    start_time_unix_nano: Option<SystemTime>,
    #[serde(serialize_with = "as_opt_unix_nano")]
    time_unix_nano: Option<SystemTime>,
    value: DataValue,
    #[serde(skip_serializing_if = "Vec::is_empty")]
    exemplars: Vec<Exemplar>,
    #[serde(skip_serializing_if = "is_zero_u8")]
    flags: u8,
}

fn is_zero_u8(v: &u8) -> bool {
    *v == 0
}

impl<T: Into<DataValue> + Copy> From<&data::DataPoint<T>> for DataPoint {
    fn from(value: &data::DataPoint<T>) -> Self {
        DataPoint {
            attributes: AttributeSet::from(&value.attributes),
            start_time_unix_nano: value.start_time,
            time_unix_nano: value.time,
            value: value.value.into(),
            exemplars: value.exemplars.iter().map(Into::into).collect(),
            flags: 0,
        }
    }
}

#[derive(Serialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
struct Histogram {
    data_points: Vec<HistogramDataPoint>,
    aggregation_temporality: Temporality,
}

impl<T: Into<DataValue> + Copy> From<&data::Histogram<T>> for Histogram {
    fn from(value: &data::Histogram<T>) -> Self {
        Histogram {
            data_points: value.data_points.iter().map(Into::into).collect(),
            aggregation_temporality: value.temporality.into(),
        }
    }
}

#[derive(Serialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
struct HistogramDataPoint {
    attributes: AttributeSet,
    #[serde(serialize_with = "as_unix_nano")]
    start_time_unix_nano: SystemTime,
    #[serde(serialize_with = "as_unix_nano")]
    time_unix_nano: SystemTime,
    count: u64,
    explicit_bounds: Vec<f64>,
    bucket_counts: Vec<u64>,
    min: Option<DataValue>,
    max: Option<DataValue>,
    sum: DataValue,
    exemplars: Vec<Exemplar>,
    flags: u8,
}

impl<T: Into<DataValue> + Copy> From<&data::HistogramDataPoint<T>> for HistogramDataPoint {
    fn from(value: &data::HistogramDataPoint<T>) -> Self {
        HistogramDataPoint {
            attributes: AttributeSet::from(&value.attributes),
            start_time_unix_nano: value.start_time,
            time_unix_nano: value.time,
            count: value.count,
            explicit_bounds: value.bounds.clone(),
            bucket_counts: value.bucket_counts.clone(),
            min: value.min.map(Into::into),
            max: value.max.map(Into::into),
            sum: value.sum.into(),
            exemplars: value.exemplars.iter().map(Into::into).collect(),
            flags: 0,
        }
    }
}

#[derive(Serialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
struct Exemplar {
    filtered_attributes: Vec<KeyValue>,
    #[serde(serialize_with = "as_unix_nano")]
    time_unix_nano: SystemTime,
    value: DataValue,
    span_id: String,
    trace_id: String,
}

fn as_unix_nano<S>(time: &SystemTime, serializer: S) -> Result<S::Ok, S::Error>
where
    S: Serializer,
{
    let nanos = time
        .duration_since(UNIX_EPOCH)
        .unwrap_or_default()
        .as_nanos();

    serializer.serialize_u128(nanos)
}

fn as_opt_unix_nano<S>(time: &Option<SystemTime>, serializer: S) -> Result<S::Ok, S::Error>
where
    S: Serializer,
{
    match time {
        None => serializer.serialize_none(),
        Some(time) => as_unix_nano(time, serializer),
    }
}

impl<T: Into<DataValue> + Copy> From<&data::Exemplar<T>> for Exemplar {
    fn from(value: &data::Exemplar<T>) -> Self {
        Exemplar {
            filtered_attributes: value.filtered_attributes.iter().map(Into::into).collect(),
            time_unix_nano: value.time,
            value: value.value.into(),
            span_id: format!("{:016x}", u64::from_be_bytes(value.span_id)),
            trace_id: format!("{:032x}", u128::from_be_bytes(value.trace_id)),
        }
    }
}