use std::time::{SystemTime, UNIX_EPOCH};
use crate::metrics::{LatencySnapshot, MetricsExporter};
fn encode_varint(mut value: u64, buf: &mut Vec<u8>) {
loop {
let byte = (value & 0x7F) as u8;
value >>= 7;
if value == 0 {
buf.push(byte);
return;
}
buf.push(byte | 0x80);
}
}
fn encode_tag(field: u32, wire_type: u8, buf: &mut Vec<u8>) {
encode_varint(((field as u64) << 3) | wire_type as u64, buf);
}
fn encode_len_field(field: u32, data: &[u8], buf: &mut Vec<u8>) {
encode_tag(field, 2, buf);
encode_varint(data.len() as u64, buf);
buf.extend_from_slice(data);
}
fn encode_string_field(field: u32, s: &str, buf: &mut Vec<u8>) {
if !s.is_empty() {
encode_len_field(field, s.as_bytes(), buf);
}
}
fn encode_varint_field(field: u32, value: u64, buf: &mut Vec<u8>) {
if value != 0 {
encode_tag(field, 0, buf);
encode_varint(value, buf);
}
}
fn encode_fixed64_field(field: u32, value: u64, buf: &mut Vec<u8>) {
encode_tag(field, 1, buf);
buf.extend_from_slice(&value.to_le_bytes());
}
fn encode_bool_field(field: u32, value: bool, buf: &mut Vec<u8>) {
if value {
encode_varint_field(field, 1, buf);
}
}
fn now_nanos() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos() as u64
}
fn encode_any_value_string(s: &str, buf: &mut Vec<u8>) {
encode_string_field(1, s, buf);
}
fn encode_key_value_string(key: &str, value: &str, buf: &mut Vec<u8>) {
encode_string_field(1, key, buf);
let mut av = Vec::new();
encode_any_value_string(value, &mut av);
encode_len_field(2, &av, buf);
}
fn encode_resource(attrs: &[(&str, &str)], buf: &mut Vec<u8>) {
for &(k, v) in attrs {
let mut kv_buf = Vec::new();
encode_key_value_string(k, v, &mut kv_buf);
encode_len_field(1, &kv_buf, buf);
}
}
fn encode_instrumentation_scope(name: &str, version: &str, buf: &mut Vec<u8>) {
encode_string_field(1, name, buf);
encode_string_field(2, version, buf);
}
fn encode_number_data_point_int(
value: i64,
time_nanos: u64,
start_time_nanos: u64,
buf: &mut Vec<u8>,
) {
encode_fixed64_field(2, start_time_nanos, buf);
encode_fixed64_field(3, time_nanos, buf);
encode_fixed64_field(6, value as u64, buf);
}
fn encode_number_data_point_double(
value: f64,
time_nanos: u64,
start_time_nanos: u64,
buf: &mut Vec<u8>,
) {
encode_fixed64_field(2, start_time_nanos, buf);
encode_fixed64_field(3, time_nanos, buf);
encode_fixed64_field(4, value.to_bits(), buf);
}
const AGGREGATION_TEMPORALITY_DELTA: u64 = 1;
const AGGREGATION_TEMPORALITY_CUMULATIVE: u64 = 2;
fn encode_metric_counter(
name: &str,
description: &str,
value: i64,
delta: bool,
time_nanos: u64,
start_time_nanos: u64,
buf: &mut Vec<u8>,
) {
encode_string_field(1, name, buf); encode_string_field(2, description, buf);
let mut sum = Vec::new();
let mut dp = Vec::new();
encode_number_data_point_int(value, time_nanos, start_time_nanos, &mut dp);
encode_len_field(1, &dp, &mut sum);
let temporality = if delta {
AGGREGATION_TEMPORALITY_DELTA
} else {
AGGREGATION_TEMPORALITY_CUMULATIVE
};
encode_varint_field(2, temporality, &mut sum);
encode_bool_field(3, true, &mut sum);
encode_len_field(7, &sum, buf);
}
fn encode_metric_gauge_int(
name: &str,
description: &str,
value: i64,
time_nanos: u64,
start_time_nanos: u64,
buf: &mut Vec<u8>,
) {
encode_string_field(1, name, buf);
encode_string_field(2, description, buf);
let mut gauge = Vec::new();
let mut dp = Vec::new();
encode_number_data_point_int(value, time_nanos, start_time_nanos, &mut dp);
encode_len_field(1, &dp, &mut gauge);
encode_len_field(5, &gauge, buf);
}
fn encode_metric_gauge_double(
name: &str,
description: &str,
value: f64,
time_nanos: u64,
start_time_nanos: u64,
buf: &mut Vec<u8>,
) {
encode_string_field(1, name, buf);
encode_string_field(2, description, buf);
let mut gauge = Vec::new();
let mut dp = Vec::new();
encode_number_data_point_double(value, time_nanos, start_time_nanos, &mut dp);
encode_len_field(1, &dp, &mut gauge);
encode_len_field(5, &gauge, buf);
}
pub struct OtlpExporter {
delta: bool,
time_nanos: u64,
start_time_nanos: u64,
metrics: Vec<Vec<u8>>,
resource_attrs: Vec<(String, String)>,
}
impl OtlpExporter {
pub fn new(delta: bool, start_time_nanos: u64) -> Self {
Self {
delta,
time_nanos: now_nanos(),
start_time_nanos,
metrics: Vec::with_capacity(32),
resource_attrs: Vec::new(),
}
}
pub(crate) fn with_timestamps(delta: bool, start_time_nanos: u64, time_nanos: u64) -> Self {
Self {
delta,
time_nanos,
start_time_nanos,
metrics: Vec::with_capacity(32),
resource_attrs: Vec::new(),
}
}
pub fn add_resource_attribute(&mut self, key: impl Into<String>, value: impl Into<String>) {
self.resource_attrs.push((key.into(), value.into()));
}
pub(crate) fn push_metric_bytes(&mut self, metric: Vec<u8>) {
self.metrics.push(metric);
}
pub(crate) fn into_metric_bytes(self) -> Vec<Vec<u8>> {
self.metrics
}
#[cfg(test)]
pub(crate) fn finish_metric_count(&self) -> usize {
self.metrics.len()
}
pub fn finish(self) -> Vec<u8> {
let mut scope_metrics = Vec::new();
let mut scope = Vec::new();
encode_instrumentation_scope("krafka", env!("CARGO_PKG_VERSION"), &mut scope);
encode_len_field(1, &scope, &mut scope_metrics);
for m in &self.metrics {
encode_len_field(2, m, &mut scope_metrics);
}
let mut resource_metrics = Vec::new();
if !self.resource_attrs.is_empty() {
let attrs: Vec<(&str, &str)> = self
.resource_attrs
.iter()
.map(|(k, v)| (k.as_str(), v.as_str()))
.collect();
let mut res = Vec::new();
encode_resource(&attrs, &mut res);
encode_len_field(1, &res, &mut resource_metrics);
}
encode_len_field(2, &scope_metrics, &mut resource_metrics);
let mut metrics_data = Vec::new();
encode_len_field(1, &resource_metrics, &mut metrics_data);
metrics_data
}
}
impl MetricsExporter for OtlpExporter {
fn export_counter(&mut self, name: &str, help: &str, value: u64) {
let mut buf = Vec::new();
encode_metric_counter(
name,
help,
value as i64,
self.delta,
self.time_nanos,
self.start_time_nanos,
&mut buf,
);
self.metrics.push(buf);
}
fn export_gauge(&mut self, name: &str, help: &str, value: u64) {
let mut buf = Vec::new();
encode_metric_gauge_int(
name,
help,
value as i64,
self.time_nanos,
self.start_time_nanos,
&mut buf,
);
self.metrics.push(buf);
}
fn export_latency(&mut self, name: &str, help: &str, snapshot: &LatencySnapshot) {
let name_count = format!("{name}_count");
let name_sum = format!("{name}_sum_seconds");
let mut buf = Vec::new();
encode_metric_gauge_int(
&name_count,
help,
snapshot.count as i64,
self.time_nanos,
self.start_time_nanos,
&mut buf,
);
self.metrics.push(buf);
let mut buf = Vec::new();
encode_metric_gauge_double(
&name_sum,
help,
snapshot.sum.as_secs_f64(),
self.time_nanos,
self.start_time_nanos,
&mut buf,
);
self.metrics.push(buf);
if let Some(min) = snapshot.min {
let mut buf = Vec::new();
encode_metric_gauge_double(
&format!("{name}_min_seconds"),
help,
min.as_secs_f64(),
self.time_nanos,
self.start_time_nanos,
&mut buf,
);
self.metrics.push(buf);
}
if let Some(max) = snapshot.max {
let mut buf = Vec::new();
encode_metric_gauge_double(
&format!("{name}_max_seconds"),
help,
max.as_secs_f64(),
self.time_nanos,
self.start_time_nanos,
&mut buf,
);
self.metrics.push(buf);
}
if let Some(avg) = snapshot.avg {
let mut buf = Vec::new();
encode_metric_gauge_double(
&format!("{name}_avg_seconds"),
help,
avg.as_secs_f64(),
self.time_nanos,
self.start_time_nanos,
&mut buf,
);
self.metrics.push(buf);
}
if let Some(p50) = snapshot.p50 {
let mut buf = Vec::new();
encode_metric_gauge_double(
&format!("{name}_p50_seconds"),
help,
p50.as_secs_f64(),
self.time_nanos,
self.start_time_nanos,
&mut buf,
);
self.metrics.push(buf);
}
if let Some(p95) = snapshot.p95 {
let mut buf = Vec::new();
encode_metric_gauge_double(
&format!("{name}_p95_seconds"),
help,
p95.as_secs_f64(),
self.time_nanos,
self.start_time_nanos,
&mut buf,
);
self.metrics.push(buf);
}
if let Some(p99) = snapshot.p99 {
let mut buf = Vec::new();
encode_metric_gauge_double(
&format!("{name}_p99_seconds"),
help,
p99.as_secs_f64(),
self.time_nanos,
self.start_time_nanos,
&mut buf,
);
self.metrics.push(buf);
}
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
mod tests {
use super::*;
use std::time::Duration;
#[test]
fn test_encode_varint_single_byte() {
let mut buf = Vec::new();
encode_varint(0, &mut buf);
assert_eq!(buf, vec![0]);
buf.clear();
encode_varint(127, &mut buf);
assert_eq!(buf, vec![127]);
}
#[test]
fn test_encode_varint_multi_byte() {
let mut buf = Vec::new();
encode_varint(300, &mut buf);
assert_eq!(buf, vec![0xAC, 0x02]);
}
#[test]
fn test_encode_string_field() {
let mut buf = Vec::new();
encode_string_field(1, "hello", &mut buf);
assert_eq!(&buf[0..2], &[0x0A, 5]);
assert_eq!(&buf[2..], b"hello");
}
#[test]
fn test_otlp_exporter_counter() {
let start = now_nanos().saturating_sub(1_000_000_000);
let mut exporter = OtlpExporter::new(false, start);
exporter.export_counter("test_counter", "A test counter", 42);
let data = exporter.finish();
assert!(!data.is_empty());
assert_eq!(data[0], 0x0A);
}
#[test]
fn test_otlp_exporter_gauge() {
let start = now_nanos().saturating_sub(1_000_000_000);
let mut exporter = OtlpExporter::new(false, start);
exporter.export_gauge("test_gauge", "A test gauge", 7);
let data = exporter.finish();
assert!(!data.is_empty());
}
#[test]
fn test_otlp_exporter_latency() {
let start = now_nanos().saturating_sub(1_000_000_000);
let mut exporter = OtlpExporter::new(true, start);
let snapshot = LatencySnapshot {
count: 10,
sum: Duration::from_millis(500),
min: Some(Duration::from_millis(10)),
max: Some(Duration::from_millis(100)),
avg: Some(Duration::from_millis(50)),
p50: Some(Duration::from_millis(45)),
p95: Some(Duration::from_millis(90)),
p99: Some(Duration::from_millis(99)),
};
exporter.export_latency("test_latency", "A test latency", &snapshot);
let data = exporter.finish();
assert!(!data.is_empty());
}
#[test]
fn test_otlp_exporter_with_resource_attrs() {
let start = now_nanos();
let mut exporter = OtlpExporter::new(false, start);
exporter.add_resource_attribute("service.name", "krafka");
exporter.add_resource_attribute("client_rack", "us-east-1a");
exporter.export_counter("c", "counter", 1);
let data = exporter.finish();
assert!(!data.is_empty());
}
#[test]
fn test_otlp_exporter_empty() {
let exporter = OtlpExporter::new(false, now_nanos());
let data = exporter.finish();
assert!(!data.is_empty());
}
#[test]
fn test_delta_vs_cumulative() {
let start = now_nanos();
let mut delta_exporter = OtlpExporter::new(true, start);
delta_exporter.export_counter("c", "counter", 5);
let delta_data = delta_exporter.finish();
let mut cumul_exporter = OtlpExporter::new(false, start);
cumul_exporter.export_counter("c", "counter", 5);
let cumul_data = cumul_exporter.finish();
assert_ne!(delta_data, cumul_data);
}
#[test]
fn test_otlp_exporter_latency_sparse() {
let start = now_nanos().saturating_sub(1_000_000_000);
let mut exporter = OtlpExporter::new(false, start);
let snapshot = LatencySnapshot {
count: 0,
sum: Duration::ZERO,
min: None,
max: None,
avg: None,
p50: None,
p95: None,
p99: None,
};
exporter.export_latency("test_latency", "A sparse latency", &snapshot);
assert_eq!(exporter.finish_metric_count(), 2);
}
}