use crate::error::{OtlpError, OtlpExportError};
use crate::otlp::metrics_data::*;
use arrow::record_batch::RecordBatch;
use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
use opentelemetry_proto::tonic::common::v1::{AnyValue, KeyValue as ProtoKeyValue};
use opentelemetry_proto::tonic::metrics::v1::{
Gauge, Histogram, HistogramDataPoint as ProtoHistogramDataPoint, Metric,
NumberDataPoint as ProtoNumberDataPoint, Sum,
};
use opentelemetry_proto::tonic::resource::v1::Resource as ProtoResource;
use std::sync::Arc;
impl InternalResourceMetrics {
pub fn to_protobuf(&self) -> Result<ExportMetricsServiceRequest, OtlpError> {
use opentelemetry_proto::tonic::common::v1::InstrumentationScope as ProtoInstrumentationScope;
use opentelemetry_proto::tonic::metrics::v1::ResourceMetrics as ProtoResourceMetrics;
use opentelemetry_proto::tonic::metrics::v1::ScopeMetrics as ProtoScopeMetrics;
let proto_resource = Some(ProtoResource {
attributes: self
.resource
.attributes
.iter()
.map(sdk_key_value_to_proto)
.collect(),
dropped_attributes_count: self.resource.dropped_attributes_count,
entity_refs: vec![], });
let scope_metrics = self
.scope_metrics
.iter()
.map(|sm| {
let scope = Some(ProtoInstrumentationScope {
name: sm.scope.name.clone(),
version: sm.scope.version.clone().unwrap_or_default(),
attributes: sm
.scope
.attributes
.iter()
.map(sdk_key_value_to_proto)
.collect(),
dropped_attributes_count: sm.scope.dropped_attributes_count,
});
let metrics = sm
.metrics
.iter()
.filter_map(internal_metric_to_proto)
.collect();
ProtoScopeMetrics {
scope,
metrics,
schema_url: sm.schema_url.clone(),
}
})
.collect();
let resource_metrics = ProtoResourceMetrics {
resource: proto_resource,
scope_metrics,
schema_url: self.schema_url.clone(),
};
Ok(ExportMetricsServiceRequest {
resource_metrics: vec![resource_metrics],
})
}
pub fn to_arrow_batch(&self) -> Result<RecordBatch, OtlpError> {
use arrow::array::*;
use arrow::datatypes::*;
if self.scope_metrics.is_empty() {
let schema = Schema::new(vec![
Field::new("metric_name", DataType::Utf8, false),
Field::new("value", DataType::Float64, false),
Field::new("timestamp_unix_nano", DataType::UInt64, false),
Field::new("metric_type", DataType::Utf8, false),
Field::new("attributes", DataType::Utf8, true),
]);
return RecordBatch::try_new(
Arc::new(schema),
vec![
Arc::new(StringArray::from(Vec::<String>::new())),
Arc::new(Float64Array::from(Vec::<f64>::new())),
Arc::new(UInt64Array::from(Vec::<u64>::new())),
Arc::new(StringArray::from(Vec::<String>::new())),
Arc::new(StringArray::from(Vec::<Option<String>>::new())),
],
)
.map_err(|e| OtlpError::Export(OtlpExportError::ArrowConversionError(e.to_string())));
}
let mut metric_names = Vec::new();
let mut values = Vec::new();
let mut timestamps = Vec::new();
let mut metric_types = Vec::new();
let mut attributes = Vec::new();
for scope_metric in &self.scope_metrics {
for metric in &scope_metric.metrics {
let metric_type_str = match &metric.data {
InternalMetricData::Gauge(_) => "gauge",
InternalMetricData::Sum(_) => "sum",
InternalMetricData::Histogram(_) => "histogram",
};
match &metric.data {
InternalMetricData::Gauge(gauge) => {
for dp in &gauge.data_points {
metric_names.push(Some(metric.name.clone()));
values.push(match &dp.value {
InternalNumberValue::AsInt(i) => *i as f64,
InternalNumberValue::AsDouble(d) => *d,
});
timestamps.push(dp.time_unix_nano);
metric_types.push(Some(metric_type_str.to_string()));
let mut attrs_map = serde_json::Map::new();
for kv in &dp.attributes {
let json_value = match &kv.value {
opentelemetry::Value::String(s) => {
serde_json::Value::String(s.to_string())
}
opentelemetry::Value::I64(i) => {
serde_json::Value::Number((*i).into())
}
opentelemetry::Value::F64(f) => serde_json::Value::Number(
serde_json::Number::from_f64(*f)
.unwrap_or(serde_json::Number::from(0)),
),
opentelemetry::Value::Bool(b) => serde_json::Value::Bool(*b),
_ => serde_json::Value::String(format!("{:?}", kv.value)),
};
attrs_map.insert(kv.key.as_str().to_string(), json_value);
}
let attrs_json = serde_json::to_string(&attrs_map)
.unwrap_or_else(|_| "{}".to_string());
attributes.push(Some(attrs_json));
}
}
InternalMetricData::Sum(sum) => {
for dp in &sum.data_points {
metric_names.push(Some(metric.name.clone()));
values.push(match &dp.value {
InternalNumberValue::AsInt(i) => *i as f64,
InternalNumberValue::AsDouble(d) => *d,
});
timestamps.push(dp.time_unix_nano);
metric_types.push(Some(metric_type_str.to_string()));
let mut attrs_map = serde_json::Map::new();
for kv in &dp.attributes {
let json_value = match &kv.value {
opentelemetry::Value::String(s) => {
serde_json::Value::String(s.to_string())
}
opentelemetry::Value::I64(i) => {
serde_json::Value::Number((*i).into())
}
opentelemetry::Value::F64(f) => serde_json::Value::Number(
serde_json::Number::from_f64(*f)
.unwrap_or(serde_json::Number::from(0)),
),
opentelemetry::Value::Bool(b) => serde_json::Value::Bool(*b),
_ => serde_json::Value::String(format!("{:?}", kv.value)),
};
attrs_map.insert(kv.key.as_str().to_string(), json_value);
}
let attrs_json = serde_json::to_string(&attrs_map)
.unwrap_or_else(|_| "{}".to_string());
attributes.push(Some(attrs_json));
}
}
InternalMetricData::Histogram(hist) => {
for dp in &hist.data_points {
let value = dp.sum.unwrap_or(dp.count as f64);
metric_names.push(Some(metric.name.clone()));
values.push(value);
timestamps.push(dp.time_unix_nano);
metric_types.push(Some(metric_type_str.to_string()));
let mut attrs_map = serde_json::Map::new();
for kv in &dp.attributes {
let json_value = match &kv.value {
opentelemetry::Value::String(s) => {
serde_json::Value::String(s.to_string())
}
opentelemetry::Value::I64(i) => {
serde_json::Value::Number((*i).into())
}
opentelemetry::Value::F64(f) => serde_json::Value::Number(
serde_json::Number::from_f64(*f)
.unwrap_or(serde_json::Number::from(0)),
),
opentelemetry::Value::Bool(b) => serde_json::Value::Bool(*b),
_ => serde_json::Value::String(format!("{:?}", kv.value)),
};
attrs_map.insert(kv.key.as_str().to_string(), json_value);
}
let attrs_json = serde_json::to_string(&attrs_map)
.unwrap_or_else(|_| "{}".to_string());
attributes.push(Some(attrs_json));
}
}
}
}
}
if metric_names.is_empty() {
return Err(OtlpError::Export(OtlpExportError::ArrowConversionError(
"No metric data points to convert".to_string(),
)));
}
let schema = Schema::new(vec![
Field::new("metric_name", DataType::Utf8, false),
Field::new("value", DataType::Float64, false),
Field::new("timestamp_unix_nano", DataType::UInt64, false),
Field::new("metric_type", DataType::Utf8, false),
Field::new("attributes", DataType::Utf8, true),
]);
let name_refs: Vec<Option<&str>> = metric_names
.iter()
.map(|opt| opt.as_ref().map(|s| s.as_ref()))
.collect();
let type_refs: Vec<Option<&str>> = metric_types
.iter()
.map(|opt| opt.as_ref().map(|s| s.as_ref()))
.collect();
let attr_refs: Vec<Option<&str>> = attributes
.iter()
.map(|opt| opt.as_ref().map(|s| s.as_ref()))
.collect();
let batch = RecordBatch::try_new(
Arc::new(schema),
vec![
Arc::new(StringArray::from(name_refs)),
Arc::new(Float64Array::from(values)),
Arc::new(UInt64Array::from(timestamps)),
Arc::new(StringArray::from(type_refs)),
Arc::new(StringArray::from(attr_refs)),
],
)
.map_err(|e| OtlpError::Export(OtlpExportError::ArrowConversionError(e.to_string())))?;
Ok(batch)
}
}
fn sdk_key_value_to_proto(kv: &opentelemetry::KeyValue) -> ProtoKeyValue {
use opentelemetry_proto::tonic::common::v1::any_value::Value;
let value = match &kv.value {
opentelemetry::Value::String(s) => Some(AnyValue {
value: Some(Value::StringValue(s.to_string())),
}),
opentelemetry::Value::I64(i) => Some(AnyValue {
value: Some(Value::IntValue(*i)),
}),
opentelemetry::Value::F64(f) => Some(AnyValue {
value: Some(Value::DoubleValue(*f)),
}),
opentelemetry::Value::Bool(b) => Some(AnyValue {
value: Some(Value::BoolValue(*b)),
}),
_ => None, };
ProtoKeyValue {
key: kv.key.as_str().to_string(),
value,
}
}
fn internal_metric_to_proto(metric: &InternalMetric) -> Option<Metric> {
let data = match &metric.data {
InternalMetricData::Gauge(gauge) => {
let data_points = gauge
.data_points
.iter()
.filter_map(internal_number_data_point_to_proto)
.collect();
Some(
opentelemetry_proto::tonic::metrics::v1::metric::Data::Gauge(Gauge { data_points }),
)
}
InternalMetricData::Sum(sum) => {
let data_points = sum
.data_points
.iter()
.filter_map(internal_number_data_point_to_proto)
.collect();
Some(opentelemetry_proto::tonic::metrics::v1::metric::Data::Sum(
Sum {
data_points,
aggregation_temporality: sum.aggregation_temporality,
is_monotonic: sum.is_monotonic,
},
))
}
InternalMetricData::Histogram(hist) => {
let data_points = hist
.data_points
.iter()
.filter_map(internal_histogram_data_point_to_proto)
.collect();
Some(
opentelemetry_proto::tonic::metrics::v1::metric::Data::Histogram(Histogram {
data_points,
aggregation_temporality: hist.aggregation_temporality,
}),
)
}
}?;
Some(Metric {
name: metric.name.clone(),
description: metric.description.clone().unwrap_or_default(),
unit: metric.unit.clone().unwrap_or_default(),
data: Some(data),
metadata: vec![], })
}
fn internal_number_data_point_to_proto(
dp: &InternalNumberDataPoint,
) -> Option<ProtoNumberDataPoint> {
use opentelemetry_proto::tonic::metrics::v1::number_data_point::Value as NumberValue;
let value = match &dp.value {
InternalNumberValue::AsInt(i) => Some(NumberValue::AsInt(*i)),
InternalNumberValue::AsDouble(d) => Some(NumberValue::AsDouble(*d)),
}?;
Some(ProtoNumberDataPoint {
attributes: dp.attributes.iter().map(sdk_key_value_to_proto).collect(),
start_time_unix_nano: dp.start_time_unix_nano.unwrap_or(0),
time_unix_nano: dp.time_unix_nano,
value: Some(value),
exemplars: vec![], flags: 0,
})
}
fn internal_histogram_data_point_to_proto(
dp: &InternalHistogramDataPoint,
) -> Option<ProtoHistogramDataPoint> {
Some(ProtoHistogramDataPoint {
attributes: dp.attributes.iter().map(sdk_key_value_to_proto).collect(),
start_time_unix_nano: dp.start_time_unix_nano.unwrap_or(0),
time_unix_nano: dp.time_unix_nano,
count: dp.count,
sum: dp.sum, bucket_counts: dp.bucket_counts.clone(),
explicit_bounds: dp.explicit_bounds.clone(),
exemplars: vec![], flags: 0,
min: dp.min, max: dp.max, })
}