use async_trait::async_trait;
use aws_config::BehaviorVersion;
use aws_sdk_cloudwatch::types::{Dimension, MetricDatum, StandardUnit};
use aws_smithy_types::DateTime;
use opentelemetry::{metrics::Result, Key};
use opentelemetry_semantic_conventions::resource::SERVICE_NAMESPACE;
use opentelemetry_sdk::metrics::{
data::{DataPoint, Gauge, Metric, ResourceMetrics, Sum},
reader::{
AggregationSelector, DefaultAggregationSelector, DefaultTemporalitySelector,
TemporalitySelector,
},
};
use super::{MetricsClient, MetricsExporter};
#[derive(Debug)]
struct CloudWatchMetricsClient {
cw_client: aws_sdk_cloudwatch::Client,
namespace: String,
}
fn get_namespace(metrics: &ResourceMetrics, namespace: String) -> String {
if namespace.is_empty() {
metrics
.resource
.get(Key::from_static_str(SERVICE_NAMESPACE))
.unwrap_or("default".into())
.to_string()
} else {
namespace
}
}
#[async_trait]
impl MetricsClient for CloudWatchMetricsClient {
async fn export(&self, metrics: &mut ResourceMetrics) -> Result<()> {
let cw_namespace = get_namespace(metrics, self.namespace.clone());
tracing::debug!("Prepare metric: {:?}", metrics);
let emf_metrics = metrics
.scope_metrics
.iter()
.flat_map(|scoped_metrics| scoped_metrics.metrics.iter())
.filter_map(|metric| {
if let Some(gauge) = metric.data.as_any().downcast_ref::<Gauge<i64>>() {
build_datums(metric, &gauge.data_points)
} else if let Some(gauge) = metric.data.as_any().downcast_ref::<Gauge<u64>>() {
build_datums(metric, &gauge.data_points)
} else if let Some(gauge) = metric.data.as_any().downcast_ref::<Gauge<f64>>() {
build_datums(metric, &gauge.data_points)
} else if let Some(sum) = metric.data.as_any().downcast_ref::<Sum<i64>>() {
build_datums(metric, &sum.data_points)
} else if let Some(sum) = metric.data.as_any().downcast_ref::<Sum<u64>>() {
build_datums(metric, &sum.data_points)
} else if let Some(sum) = metric.data.as_any().downcast_ref::<Sum<f64>>() {
build_datums(metric, &sum.data_points)
} else {
None
}
})
.flatten()
.collect::<Vec<MetricDatum>>();
if emf_metrics.is_empty() {
return Ok(());
}
tracing::debug!("Send custom metrics data.");
let metric_res = self
.cw_client
.put_metric_data()
.namespace(cw_namespace)
.set_metric_data(Some(emf_metrics))
.send()
.await;
let _ = match metric_res {
Ok(result) => {
tracing::debug!(result = format!("{:?}", result), "Metrics delivered to CW.");
}
Err(error) => {
tracing::error!({ %error }, "Problem delivering metrics.");
}
};
Ok(())
}
}
fn build_datums<T: num_traits::ToPrimitive + Copy>(
metric: &Metric,
data_points: &[DataPoint<T>],
) -> Option<Vec<MetricDatum>> {
let name = metric.name.to_string();
let unit = match StandardUnit::from(metric.unit.as_ref()) {
#[allow(deprecated)]
StandardUnit::Unknown(_) => None,
other => Some(other),
};
let datums = data_points
.iter()
.map(|data_point: &DataPoint<T>| {
MetricDatum::builder()
.metric_name(name.as_str())
.timestamp(DateTime::from(data_point.time.unwrap()))
.set_unit(unit.clone())
.value(data_point.value.to_f64().unwrap())
.set_dimensions(Some(
data_point
.attributes
.iter()
.map(|(name, value)| {
Dimension::builder()
.name(name.to_string())
.value(value.to_string())
.build()
})
.collect(),
))
.build()
})
.collect::<Vec<MetricDatum>>();
if datums.is_empty() {
None
} else {
Some(datums)
}
}
pub struct CloudWatchMetricsExporterBuilder {
client: Option<Box<dyn MetricsClient>>,
temporality_selector: Option<Box<dyn TemporalitySelector>>,
aggregation_selector: Option<Box<dyn AggregationSelector>>,
}
impl CloudWatchMetricsExporterBuilder {
pub async fn new(namespace: &str) -> CloudWatchMetricsExporterBuilder {
let config = aws_config::load_defaults(BehaviorVersion::latest()).await;
let cw_client = aws_sdk_cloudwatch::Client::new(&config);
let client = CloudWatchMetricsClient {
cw_client,
namespace: namespace.to_string(),
};
CloudWatchMetricsExporterBuilder {
client: Some(Box::new(client)),
temporality_selector: None,
aggregation_selector: None,
}
}
pub fn with_temporality_selector<S>(mut self, selector: S) -> Self
where
S: TemporalitySelector + 'static,
{
self.temporality_selector = Some(Box::new(selector));
self
}
pub fn with_aggregation_selector<S>(mut self, selector: S) -> Self
where
S: AggregationSelector + 'static,
{
self.aggregation_selector = Some(Box::new(selector));
self
}
pub fn build(self) -> MetricsExporter {
MetricsExporter {
client: self.client.expect(
"CloudWatchMetricsExporterBuilder build was attemted before providing client.",
),
temporality_selector: self
.temporality_selector
.unwrap_or_else(|| Box::new(DefaultTemporalitySelector::new())),
aggregation_selector: self
.aggregation_selector
.unwrap_or_else(|| Box::new(DefaultAggregationSelector::new())),
}
}
}