use crate::error::{OTelSdkError, OTelSdkResult};
use crate::metrics::data::{
ExponentialHistogram, Gauge, Histogram, MetricData, ResourceMetrics, Sum,
};
use crate::metrics::exporter::PushMetricExporter;
use crate::metrics::Temporality;
use std::collections::VecDeque;
use std::fmt;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use super::data::{AggregatedMetrics, Metric, ScopeMetrics};
pub struct InMemoryMetricExporter {
metrics: Arc<Mutex<VecDeque<ResourceMetrics>>>,
temporality: Temporality,
}
impl Clone for InMemoryMetricExporter {
fn clone(&self) -> Self {
InMemoryMetricExporter {
metrics: self.metrics.clone(),
temporality: self.temporality,
}
}
}
impl fmt::Debug for InMemoryMetricExporter {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("InMemoryMetricExporter").finish()
}
}
impl Default for InMemoryMetricExporter {
fn default() -> Self {
InMemoryMetricExporterBuilder::new().build()
}
}
pub struct InMemoryMetricExporterBuilder {
temporality: Option<Temporality>,
}
impl fmt::Debug for InMemoryMetricExporterBuilder {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("InMemoryMetricExporterBuilder").finish()
}
}
impl Default for InMemoryMetricExporterBuilder {
fn default() -> Self {
Self::new()
}
}
impl InMemoryMetricExporterBuilder {
pub fn new() -> Self {
Self { temporality: None }
}
pub fn with_temporality(mut self, temporality: Temporality) -> Self {
self.temporality = Some(temporality);
self
}
pub fn build(self) -> InMemoryMetricExporter {
InMemoryMetricExporter {
metrics: Arc::new(Mutex::new(VecDeque::new())),
temporality: self.temporality.unwrap_or_default(),
}
}
}
impl InMemoryMetricExporter {
pub fn get_finished_metrics(&self) -> Result<Vec<ResourceMetrics>, OTelSdkError> {
let metrics = self
.metrics
.lock()
.map(|metrics_guard| metrics_guard.iter().map(Self::clone_metrics).collect())
.map_err(OTelSdkError::from)?;
Ok(metrics)
}
pub fn reset(&self) {
let _ = self
.metrics
.lock()
.map(|mut metrics_guard| metrics_guard.clear());
}
fn clone_metrics(metric: &ResourceMetrics) -> ResourceMetrics {
ResourceMetrics {
resource: metric.resource.clone(),
scope_metrics: metric
.scope_metrics
.iter()
.map(|scope_metric| ScopeMetrics {
scope: scope_metric.scope.clone(),
metrics: scope_metric
.metrics
.iter()
.map(|metric| Metric {
name: metric.name.clone(),
description: metric.description.clone(),
unit: metric.unit.clone(),
data: Self::clone_data(&metric.data),
})
.collect(),
})
.collect(),
}
}
fn clone_data(data: &AggregatedMetrics) -> AggregatedMetrics {
fn clone_inner<T: Clone>(data: &MetricData<T>) -> MetricData<T> {
match data {
MetricData::Gauge(gauge) => Gauge {
data_points: gauge.data_points.clone(),
start_time: gauge.start_time,
time: gauge.time,
}
.into(),
MetricData::Sum(sum) => Sum {
data_points: sum.data_points.clone(),
start_time: sum.start_time,
time: sum.time,
temporality: sum.temporality,
is_monotonic: sum.is_monotonic,
}
.into(),
MetricData::Histogram(histogram) => Histogram {
data_points: histogram.data_points.clone(),
start_time: histogram.start_time,
time: histogram.time,
temporality: histogram.temporality,
}
.into(),
MetricData::ExponentialHistogram(exponential_histogram) => ExponentialHistogram {
data_points: exponential_histogram.data_points.clone(),
start_time: exponential_histogram.start_time,
time: exponential_histogram.time,
temporality: exponential_histogram.temporality,
}
.into(),
}
}
match data {
AggregatedMetrics::F64(metric_data) => AggregatedMetrics::F64(clone_inner(metric_data)),
AggregatedMetrics::U64(metric_data) => AggregatedMetrics::U64(clone_inner(metric_data)),
AggregatedMetrics::I64(metric_data) => AggregatedMetrics::I64(clone_inner(metric_data)),
}
}
}
impl PushMetricExporter for InMemoryMetricExporter {
async fn export(&self, metrics: &ResourceMetrics) -> OTelSdkResult {
self.metrics
.lock()
.map(|mut metrics_guard| {
metrics_guard.push_back(InMemoryMetricExporter::clone_metrics(metrics))
})
.map_err(|_| OTelSdkError::InternalFailure("Failed to lock metrics".to_string()))
}
fn force_flush(&self) -> OTelSdkResult {
Ok(()) }
fn shutdown(&self) -> OTelSdkResult {
Ok(())
}
fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
Ok(())
}
fn temporality(&self) -> Temporality {
self.temporality
}
}