metry 0.1.1

All-in-one telemetry framework, based on tracing crate.
Documentation
//! CloudWatch metric exporter
//!
//! Defines a [MetricsExporter] to send metric data to backend via CloudWatch protocol.

use async_trait::async_trait;
use aws_config::BehaviorVersion;
use aws_sdk_cloudwatch::types::{Dimension, MetricDatum, StandardUnit};
use aws_smithy_types::DateTime;
use opentelemetry::{metrics::Result, Key};
use opentelemetry_semantic_conventions::resource::SERVICE_NAMESPACE;

use opentelemetry_sdk::metrics::{
    data::{DataPoint, Gauge, Metric, ResourceMetrics, Sum},
    reader::{
        AggregationSelector, DefaultAggregationSelector, DefaultTemporalitySelector,
        TemporalitySelector,
    },
};

use super::{MetricsClient, MetricsExporter};

#[derive(Debug)]
struct CloudWatchMetricsClient {
    cw_client: aws_sdk_cloudwatch::Client,
    namespace: String,
}

/// Retrieves namespace for given set of metrics from user config.
fn get_namespace(metrics: &ResourceMetrics, namespace: String) -> String {
    if namespace.is_empty() {
        metrics
            .resource
            .get(Key::from_static_str(SERVICE_NAMESPACE))
            .unwrap_or("default".into())
            .to_string()
    } else {
        namespace
    }
}

#[async_trait]
impl MetricsClient for CloudWatchMetricsClient {
    async fn export(&self, metrics: &mut ResourceMetrics) -> Result<()> {
        let cw_namespace = get_namespace(metrics, self.namespace.clone());
        tracing::debug!("Prepare metric: {:?}", metrics);

        // translate OTel to EMF metric
        let emf_metrics = metrics
            .scope_metrics
            .iter()
            .flat_map(|scoped_metrics| scoped_metrics.metrics.iter())
            .filter_map(|metric| {
                if let Some(gauge) = metric.data.as_any().downcast_ref::<Gauge<i64>>() {
                    // NOTE: Gauge is not supported until tracing-opentelemetry = { version = "0.23.0", features = ["metrics_gauge_unstable"] }.
                    // However this requires update to opentelemetry@0.23.0, which is yet uncompatible with opentelemetry-aws<=0.11.0 (which is not published so far).
                    build_datums(metric, &gauge.data_points)
                } else if let Some(gauge) = metric.data.as_any().downcast_ref::<Gauge<u64>>() {
                    build_datums(metric, &gauge.data_points)
                } else if let Some(gauge) = metric.data.as_any().downcast_ref::<Gauge<f64>>() {
                    build_datums(metric, &gauge.data_points)
                } else if let Some(sum) = metric.data.as_any().downcast_ref::<Sum<i64>>() {
                    build_datums(metric, &sum.data_points)
                } else if let Some(sum) = metric.data.as_any().downcast_ref::<Sum<u64>>() {
                    build_datums(metric, &sum.data_points)
                } else if let Some(sum) = metric.data.as_any().downcast_ref::<Sum<f64>>() {
                    build_datums(metric, &sum.data_points)
                } else {
                    // TODO Histagrom and ExponentialHistogram
                    None
                }
            })
            .flatten()
            .collect::<Vec<MetricDatum>>();

        if emf_metrics.is_empty() {
            return Ok(());
        }

        tracing::debug!("Send custom metrics data.");
        let metric_res = self
            .cw_client
            .put_metric_data()
            .namespace(cw_namespace)
            .set_metric_data(Some(emf_metrics))
            .send()
            .await;

        let _ = match metric_res {
            Ok(result) => {
                tracing::debug!(result = format!("{:?}", result), "Metrics delivered to CW.");
            }
            Err(error) => {
                tracing::error!({ %error }, "Problem delivering metrics.");
            }
        };
        Ok(())
    }
}

/// Build MetricDatum for CloudWatch client from OTEL Metric
fn build_datums<T: num_traits::ToPrimitive + Copy>(
    metric: &Metric,
    data_points: &[DataPoint<T>],
) -> Option<Vec<MetricDatum>> {
    let name = metric.name.to_string();
    let unit = match StandardUnit::from(metric.unit.as_ref()) {
        // NOTE: Only Unknown should be discarded. All other types are put into the datum.
        #[allow(deprecated)]
        StandardUnit::Unknown(_) => None,
        other => Some(other),
    };
    let datums = data_points
        .iter()
        .map(|data_point: &DataPoint<T>| {
            MetricDatum::builder()
                .metric_name(name.as_str())
                .timestamp(DateTime::from(data_point.time.unwrap()))
                .set_unit(unit.clone())
                .value(data_point.value.to_f64().unwrap())
                .set_dimensions(Some(
                    data_point
                        .attributes
                        .iter()
                        .map(|(name, value)| {
                            Dimension::builder()
                                .name(name.to_string())
                                .value(value.to_string())
                                .build()
                        })
                        .collect(),
                ))
                .build()
        })
        .collect::<Vec<MetricDatum>>();
    if datums.is_empty() {
        None
    } else {
        Some(datums)
    }
}

/// Configuration for the CloudWatch metrics exporter
pub struct CloudWatchMetricsExporterBuilder {
    client: Option<Box<dyn MetricsClient>>,
    temporality_selector: Option<Box<dyn TemporalitySelector>>,
    aggregation_selector: Option<Box<dyn AggregationSelector>>,
}

impl CloudWatchMetricsExporterBuilder {
    pub async fn new(namespace: &str) -> CloudWatchMetricsExporterBuilder {
        let config = aws_config::load_defaults(BehaviorVersion::latest()).await;
        let cw_client = aws_sdk_cloudwatch::Client::new(&config);
        let client = CloudWatchMetricsClient {
            cw_client,
            namespace: namespace.to_string(),
        };
        CloudWatchMetricsExporterBuilder {
            client: Some(Box::new(client)),
            temporality_selector: None,
            aggregation_selector: None,
        }
    }

    /// Set the temporality exporter for the exporter
    pub fn with_temporality_selector<S>(mut self, selector: S) -> Self
    where
        S: TemporalitySelector + 'static,
    {
        self.temporality_selector = Some(Box::new(selector));
        self
    }

    /// Set the aggregation exporter for the exporter
    pub fn with_aggregation_selector<S>(mut self, selector: S) -> Self
    where
        S: AggregationSelector + 'static,
    {
        self.aggregation_selector = Some(Box::new(selector));
        self
    }

    /// Create a metrics exporter with the current configuration
    pub fn build(self) -> MetricsExporter {
        MetricsExporter {
            client: self.client.expect(
                "CloudWatchMetricsExporterBuilder build was attemted before providing client.",
            ),
            temporality_selector: self
                .temporality_selector
                .unwrap_or_else(|| Box::new(DefaultTemporalitySelector::new())),
            aggregation_selector: self
                .aggregation_selector
                .unwrap_or_else(|| Box::new(DefaultAggregationSelector::new())),
        }
    }
}