use chrono::{DateTime, Utc};
use core::{f64, fmt};
use opentelemetry_sdk::metrics::data::{AggregatedMetrics, MetricData};
use opentelemetry_sdk::metrics::Temporality;
use opentelemetry_sdk::{
error::OTelSdkResult,
metrics::{
data::{
ExponentialHistogram, ExponentialHistogramDataPoint, Gauge, GaugeDataPoint, Histogram,
HistogramDataPoint, ResourceMetrics, ScopeMetrics, Sum, SumDataPoint,
},
exporter::PushMetricExporter,
},
};
use std::fmt::Debug;
use std::sync::atomic;
use std::time::Duration;
pub struct MetricExporter {
is_shutdown: atomic::AtomicBool,
temporality: Temporality,
}
impl MetricExporter {
pub fn builder() -> MetricExporterBuilder {
MetricExporterBuilder::default()
}
}
impl Default for MetricExporter {
fn default() -> Self {
MetricExporterBuilder::default().build()
}
}
impl fmt::Debug for MetricExporter {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("MetricExporter")
}
}
impl PushMetricExporter for MetricExporter {
async fn export(&self, metrics: &ResourceMetrics) -> OTelSdkResult {
if self.is_shutdown.load(atomic::Ordering::SeqCst) {
Err(opentelemetry_sdk::error::OTelSdkError::AlreadyShutdown)
} else {
println!("Metrics");
println!("Resource");
if let Some(schema_url) = metrics.resource().schema_url() {
println!("\tResource SchemaUrl: {schema_url:?}");
}
metrics.resource().iter().for_each(|(k, v)| {
println!("\t -> {k}={v:?}");
});
print_metrics(metrics.scope_metrics());
Ok(())
}
}
fn force_flush(&self) -> OTelSdkResult {
Ok(())
}
fn shutdown(&self) -> OTelSdkResult {
self.shutdown_with_timeout(Duration::from_secs(5))
}
fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
self.is_shutdown.store(true, atomic::Ordering::SeqCst);
Ok(())
}
fn temporality(&self) -> Temporality {
self.temporality
}
}
fn print_metrics<'a>(metrics: impl Iterator<Item = &'a ScopeMetrics>) {
for (i, metric) in metrics.enumerate() {
println!("\tInstrumentation Scope #{i}");
let scope = metric.scope();
println!("\t\tName : {}", scope.name());
if let Some(version) = scope.version() {
println!("\t\tVersion : {version:?}");
}
if let Some(schema_url) = scope.schema_url() {
println!("\t\tSchemaUrl: {schema_url:?}");
}
scope.attributes().enumerate().for_each(|(index, kv)| {
if index == 0 {
println!("\t\tScope Attributes:");
}
println!("\t\t\t -> {}: {}", kv.key, kv.value);
});
metric.metrics().enumerate().for_each(|(i, metric)| {
println!("Metric #{i}");
println!("\t\tName : {}", metric.name());
println!("\t\tDescription : {}", metric.description());
println!("\t\tUnit : {}", metric.unit());
fn print_info<T>(data: &MetricData<T>)
where
T: Debug + Copy,
{
match data {
MetricData::Gauge(gauge) => {
println!("\t\tType : Gauge");
print_gauge(gauge);
}
MetricData::Sum(sum) => {
println!("\t\tType : Sum");
print_sum(sum);
}
MetricData::Histogram(hist) => {
println!("\t\tType : Histogram");
print_histogram(hist);
}
MetricData::ExponentialHistogram(hist) => {
println!("\t\tType : Exponential Histogram");
print_exponential_histogram(hist);
}
}
}
match metric.data() {
AggregatedMetrics::F64(data) => print_info(data),
AggregatedMetrics::U64(data) => print_info(data),
AggregatedMetrics::I64(data) => print_info(data),
}
});
}
}
fn print_sum<T: Debug + Copy>(sum: &Sum<T>) {
println!("\t\tSum DataPoints");
println!("\t\tMonotonic : {}", sum.is_monotonic());
if sum.temporality() == Temporality::Cumulative {
println!("\t\tTemporality : Cumulative");
} else {
println!("\t\tTemporality : Delta");
}
let datetime: DateTime<Utc> = sum.start_time().into();
println!(
"\t\tStartTime : {}",
datetime.format("%Y-%m-%d %H:%M:%S%.6f")
);
let datetime: DateTime<Utc> = sum.time().into();
println!(
"\t\tEndTime : {}",
datetime.format("%Y-%m-%d %H:%M:%S%.6f")
);
print_sum_data_points(sum.data_points());
}
fn print_gauge<T: Debug + Copy>(gauge: &Gauge<T>) {
println!("\t\tGauge DataPoints");
if let Some(start_time) = gauge.start_time() {
let datetime: DateTime<Utc> = start_time.into();
println!(
"\t\tStartTime : {}",
datetime.format("%Y-%m-%d %H:%M:%S%.6f")
);
}
let datetime: DateTime<Utc> = gauge.time().into();
println!(
"\t\tEndTime : {}",
datetime.format("%Y-%m-%d %H:%M:%S%.6f")
);
print_gauge_data_points(gauge.data_points());
}
fn print_histogram<T: Debug + Copy>(histogram: &Histogram<T>) {
if histogram.temporality() == Temporality::Cumulative {
println!("\t\tTemporality : Cumulative");
} else {
println!("\t\tTemporality : Delta");
}
let datetime: DateTime<Utc> = histogram.start_time().into();
println!(
"\t\tStartTime : {}",
datetime.format("%Y-%m-%d %H:%M:%S%.6f")
);
let datetime: DateTime<Utc> = histogram.time().into();
println!(
"\t\tEndTime : {}",
datetime.format("%Y-%m-%d %H:%M:%S%.6f")
);
println!("\t\tHistogram DataPoints");
print_hist_data_points(histogram.data_points());
}
fn print_exponential_histogram<T: Debug + Copy>(histogram: &ExponentialHistogram<T>) {
if histogram.temporality() == Temporality::Cumulative {
println!("\t\tTemporality : Cumulative");
} else {
println!("\t\tTemporality : Delta");
}
let datetime: DateTime<Utc> = histogram.start_time().into();
println!(
"\t\tStartTime : {}",
datetime.format("%Y-%m-%d %H:%M:%S%.6f")
);
let datetime: DateTime<Utc> = histogram.time().into();
println!(
"\t\tEndTime : {}",
datetime.format("%Y-%m-%d %H:%M:%S%.6f")
);
println!("\t\tExponential Histogram DataPoints");
print_exponential_hist_data_points(histogram.data_points());
}
fn print_sum_data_points<'a, T: Debug + Copy + 'a>(
data_points: impl Iterator<Item = &'a SumDataPoint<T>>,
) {
for (i, data_point) in data_points.enumerate() {
println!("\t\tDataPoint #{i}");
println!("\t\t\tValue : {:#?}", data_point.value());
println!("\t\t\tAttributes :");
for kv in data_point.attributes() {
println!("\t\t\t\t -> {}: {}", kv.key, kv.value.as_str());
}
}
}
fn print_gauge_data_points<'a, T: Debug + Copy + 'a>(
data_points: impl Iterator<Item = &'a GaugeDataPoint<T>>,
) {
for (i, data_point) in data_points.enumerate() {
println!("\t\tDataPoint #{i}");
println!("\t\t\tValue : {:#?}", data_point.value());
println!("\t\t\tAttributes :");
for kv in data_point.attributes() {
println!("\t\t\t\t -> {}: {}", kv.key, kv.value.as_str());
}
}
}
fn print_hist_data_points<'a, T: Debug + Copy + 'a>(
data_points: impl Iterator<Item = &'a HistogramDataPoint<T>>,
) {
for (i, data_point) in data_points.enumerate() {
println!("\t\tDataPoint #{i}");
println!("\t\t\tCount : {}", data_point.count());
println!("\t\t\tSum : {:?}", data_point.sum());
if let Some(min) = &data_point.min() {
println!("\t\t\tMin : {min:?}");
}
if let Some(max) = &data_point.max() {
println!("\t\t\tMax : {max:?}");
}
println!("\t\t\tAttributes :");
for kv in data_point.attributes() {
println!("\t\t\t\t -> {}: {}", kv.key, kv.value.as_str());
}
let mut lower_bound = f64::NEG_INFINITY;
let bounds_iter = data_point.bounds();
let mut bucket_counts_iter = data_point.bucket_counts();
let mut header_printed = false;
for upper_bound in bounds_iter {
if !header_printed {
println!("\t\t\tBuckets");
header_printed = true;
}
let count = bucket_counts_iter.next().unwrap_or(0);
println!("\t\t\t\t {lower_bound} to {upper_bound} : {count}");
lower_bound = upper_bound;
}
if header_printed {
let last_count = bucket_counts_iter.next().unwrap_or(0);
println!("\t\t\t\t{lower_bound} to +Infinity : {last_count}");
}
}
}
fn print_exponential_hist_data_points<'a, T: Debug + Copy + 'a>(
data_points: impl Iterator<Item = &'a ExponentialHistogramDataPoint<T>>,
) {
for (i, data_point) in data_points.enumerate() {
println!("\t\tDataPoint #{i}");
println!("\t\t\tCount : {}", data_point.count());
println!("\t\t\tSum : {:?}", data_point.sum());
if let Some(min) = &data_point.min() {
println!("\t\t\tMin : {min:?}");
}
if let Some(max) = &data_point.max() {
println!("\t\t\tMax : {max:?}");
}
let scale = data_point.scale();
let base = 2.0f64.powf(2.0f64.powf(-scale as f64));
println!("\t\t\tScale : {:?}", scale);
println!("\t\t\tBase : {:?}", base);
println!("\t\t\tZeroCount : {}", data_point.zero_count());
println!("\t\t\tZeroThreshold : {}", data_point.zero_threshold());
println!("\t\t\tAttributes :");
for kv in data_point.attributes() {
println!("\t\t\t\t -> {} : {}", kv.key, kv.value.as_str());
}
let negative_bucket = data_point.negative_bucket();
let negative_offset = negative_bucket.offset();
println!("\t\t\tNegativeOffset : {}", negative_offset);
for (i, count) in negative_bucket
.counts()
.collect::<Vec<_>>()
.into_iter()
.enumerate()
.rev()
{
let lower = -base.powf(i as f64 + negative_offset as f64 + 1.0f64);
let upper = -base.powf(i as f64 + negative_offset as f64);
println!(
"\t\t\t\tBucket {} ({:?}, {:?}] : {}",
i, lower, upper, count
);
}
let positive_bucket = data_point.positive_bucket();
let positive_offset = positive_bucket.offset();
println!("\t\t\tPositiveOffset : {}", positive_offset);
for (i, count) in positive_bucket.counts().enumerate() {
let lower = base.powf(i as f64 + positive_offset as f64);
let upper = base.powf(i as f64 + positive_offset as f64 + 1.0f64);
println!(
"\t\t\t\tBucket {} ({:?}, {:?}] : {}",
i, lower, upper, count
);
}
}
}
#[derive(Default)]
pub struct MetricExporterBuilder {
temporality: Option<Temporality>,
}
impl MetricExporterBuilder {
pub fn with_temporality(mut self, temporality: Temporality) -> Self {
self.temporality = Some(temporality);
self
}
pub fn build(self) -> MetricExporter {
MetricExporter {
temporality: self.temporality.unwrap_or_default(),
is_shutdown: atomic::AtomicBool::new(false),
}
}
}
impl fmt::Debug for MetricExporterBuilder {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("MetricExporterBuilder")
}
}