use std::collections::BTreeMap;
use obs_core::{MetricEmitter, SchemaRegistry};
use serde::{Deserialize, Serialize};
use crate::{
env_config::{OtlpEndpoint, OtlpResourceAttrs},
logs::ResourceMessage,
mapping::MetricPoint,
};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OtlpMetricPayload {
pub resource: ResourceMessage,
pub endpoint: String,
pub points: Vec<MetricPoint>,
}
impl OtlpMetricPayload {
#[must_use]
pub fn from_envelopes(
envs: &[obs_proto::obs::v1::ObsEnvelope],
resource: &OtlpResourceAttrs,
endpoint: &OtlpEndpoint,
registry: &SchemaRegistry,
) -> Self {
let mut points: Vec<MetricPoint> = Vec::with_capacity(envs.len());
for env in envs {
let mut attrs: BTreeMap<String, String> = BTreeMap::new();
for (k, v) in env.labels.iter() {
attrs.insert(k.clone(), v.clone());
}
attrs.insert("event.name".to_string(), env.full_name.clone());
let mut emitter = CollectingEmitter::new(env.full_name.clone(), attrs.clone());
let mut emitted_any = false;
if let Some(schema) = registry.lookup(env)
&& schema.project_metrics(&env.payload, &mut emitter).is_ok()
{
emitted_any = !emitter.points.is_empty();
}
if emitted_any {
points.extend(emitter.points);
} else {
points.push(MetricPoint {
instrument: format!("{}.count", env.full_name),
unit: "1".to_string(),
kind: "counter".to_string(),
attributes: attrs,
value_u64: Some(1),
value_f64: None,
bounds: Vec::new(),
});
}
}
Self {
resource: ResourceMessage::from_attrs(resource),
endpoint: endpoint.url.clone(),
points,
}
}
}
struct CollectingEmitter {
full_name: String,
attributes: BTreeMap<String, String>,
points: Vec<MetricPoint>,
}
impl CollectingEmitter {
fn new(full_name: String, attributes: BTreeMap<String, String>) -> Self {
Self {
full_name,
attributes,
points: Vec::new(),
}
}
fn instrument(&self, field: &str) -> String {
compose_instrument_name(&self.full_name, field)
}
}
fn compose_instrument_name(full_name: &str, instrument: &str) -> String {
let qualified_prefix = format!("{full_name}.");
if instrument.starts_with(&qualified_prefix) {
instrument.to_string()
} else {
format!("{full_name}.{instrument}")
}
}
impl MetricEmitter for CollectingEmitter {
fn record_counter(&mut self, instrument: &'static str, value: u64, unit: Option<&'static str>) {
self.points.push(MetricPoint {
instrument: self.instrument(instrument),
unit: unit.unwrap_or("1").to_string(),
kind: "counter".to_string(),
attributes: self.attributes.clone(),
value_u64: Some(value),
value_f64: None,
bounds: Vec::new(),
});
}
fn record_gauge_u64(
&mut self,
instrument: &'static str,
value: u64,
unit: Option<&'static str>,
) {
self.points.push(MetricPoint {
instrument: self.instrument(instrument),
unit: unit.unwrap_or("1").to_string(),
kind: "gauge".to_string(),
attributes: self.attributes.clone(),
value_u64: Some(value),
value_f64: None,
bounds: Vec::new(),
});
}
fn record_gauge_f64(
&mut self,
instrument: &'static str,
value: f64,
unit: Option<&'static str>,
) {
self.points.push(MetricPoint {
instrument: self.instrument(instrument),
unit: unit.unwrap_or("1").to_string(),
kind: "gauge".to_string(),
attributes: self.attributes.clone(),
value_u64: None,
value_f64: Some(value),
bounds: Vec::new(),
});
}
fn record_histogram(
&mut self,
instrument: &'static str,
value: f64,
unit: Option<&'static str>,
bounds: &'static [f64],
) {
self.points.push(MetricPoint {
instrument: self.instrument(instrument),
unit: unit.unwrap_or("1").to_string(),
kind: "histogram".to_string(),
attributes: self.attributes.clone(),
value_u64: None,
value_f64: Some(value),
bounds: bounds.to_vec(),
});
}
fn with_attributes(&mut self, attrs: &[(&'static str, &str)]) {
for (k, v) in attrs {
self.attributes.insert((*k).to_string(), (*v).to_string());
}
}
}
#[cfg(test)]
mod tests {
use super::compose_instrument_name;
#[test]
fn compose_instrument_name_prefixes_relative_fields_once() {
assert_eq!(
compose_instrument_name("obs.v1.ObsHttpClientCompleted", "latency_ms"),
"obs.v1.ObsHttpClientCompleted.latency_ms"
);
}
#[test]
fn compose_instrument_name_keeps_qualified_fields_once() {
assert_eq!(
compose_instrument_name(
"obs.v1.ObsHttpClientCompleted",
"obs.v1.ObsHttpClientCompleted.latency_ms"
),
"obs.v1.ObsHttpClientCompleted.latency_ms"
);
}
}