use std::time::UNIX_EPOCH;
use prost::Message;
use crate::model::metric::MetricEvent;
use crate::{EncoderError, SondaError};
use super::Encoder;
pub const PROMETHEUS_STALE_NAN: f64 = f64::from_bits(0x7ff0000000000002);
#[derive(Clone, PartialEq, prost::Message)]
pub struct WriteRequest {
#[prost(message, repeated, tag = "1")]
pub timeseries: Vec<TimeSeries>,
}
#[derive(Clone, PartialEq, prost::Message)]
pub struct TimeSeries {
#[prost(message, repeated, tag = "1")]
pub labels: Vec<Label>,
#[prost(message, repeated, tag = "2")]
pub samples: Vec<Sample>,
}
#[derive(Clone, PartialEq, prost::Message)]
pub struct Label {
#[prost(string, tag = "1")]
pub name: String,
#[prost(string, tag = "2")]
pub value: String,
}
#[derive(Clone, PartialEq, prost::Message)]
pub struct Sample {
#[prost(double, tag = "1")]
pub value: f64,
#[prost(int64, tag = "2")]
pub timestamp: i64,
}
pub struct RemoteWriteEncoder;
impl RemoteWriteEncoder {
pub fn new() -> Self {
Self
}
}
impl Default for RemoteWriteEncoder {
fn default() -> Self {
Self::new()
}
}
impl Encoder for RemoteWriteEncoder {
fn encode_metric(&self, event: &MetricEvent, buf: &mut Vec<u8>) -> Result<(), SondaError> {
let mut labels = Vec::with_capacity(event.labels.len() + 1);
labels.push(Label {
name: "__name__".to_string(),
value: event.name.to_string(),
});
for (key, value) in event.labels.iter() {
labels.push(Label {
name: key.to_string(),
value: value.to_string(),
});
}
labels.sort_by(|a, b| a.name.cmp(&b.name));
let timestamp_ms = event
.timestamp
.duration_since(UNIX_EPOCH)
.map_err(|e| SondaError::Encoder(EncoderError::TimestampBeforeEpoch(e)))?
.as_millis() as i64;
let timeseries = TimeSeries {
labels,
samples: vec![Sample {
value: event.value,
timestamp: timestamp_ms,
}],
};
let encoded_len = timeseries.encoded_len();
let mut proto_bytes = Vec::with_capacity(encoded_len);
timeseries.encode(&mut proto_bytes).map_err(|e| {
SondaError::Encoder(EncoderError::Other(format!("protobuf encode error: {e}")))
})?;
let len = proto_bytes.len() as u32;
buf.extend_from_slice(&len.to_le_bytes());
buf.extend_from_slice(&proto_bytes);
Ok(())
}
}
pub fn parse_length_prefixed_timeseries(data: &[u8]) -> Result<Vec<TimeSeries>, SondaError> {
let mut result = Vec::new();
let mut offset = 0;
while offset < data.len() {
if offset + 4 > data.len() {
return Err(SondaError::Encoder(EncoderError::Other(
"truncated length prefix in TimeSeries buffer".into(),
)));
}
let len = u32::from_le_bytes([
data[offset],
data[offset + 1],
data[offset + 2],
data[offset + 3],
]) as usize;
offset += 4;
if offset + len > data.len() {
return Err(SondaError::Encoder(EncoderError::Other(format!(
"truncated TimeSeries protobuf: expected {} bytes, got {}",
len,
data.len() - offset
))));
}
let ts = TimeSeries::decode(&data[offset..offset + len]).map_err(|e| {
SondaError::Encoder(EncoderError::Other(format!("protobuf decode error: {e}")))
})?;
result.push(ts);
offset += len;
}
Ok(result)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::model::metric::{Labels, MetricEvent};
use std::time::{Duration, UNIX_EPOCH};
fn make_event(
name: &str,
value: f64,
label_pairs: &[(&str, &str)],
timestamp_ms: u64,
) -> MetricEvent {
let labels = Labels::from_pairs(label_pairs).expect("valid labels");
let ts = UNIX_EPOCH + Duration::from_millis(timestamp_ms);
MetricEvent::with_timestamp(name.to_string(), value, labels, ts).expect("valid metric name")
}
fn decode_timeseries(buf: &[u8]) -> TimeSeries {
assert!(buf.len() >= 4, "buffer must contain at least length prefix");
let len = u32::from_le_bytes([buf[0], buf[1], buf[2], buf[3]]) as usize;
let proto_bytes = &buf[4..4 + len];
TimeSeries::decode(proto_bytes).expect("protobuf decode")
}
fn decode_all_timeseries(buf: &[u8]) -> Vec<TimeSeries> {
let mut result = Vec::new();
let mut offset = 0;
while offset + 4 <= buf.len() {
let len = u32::from_le_bytes([
buf[offset],
buf[offset + 1],
buf[offset + 2],
buf[offset + 3],
]) as usize;
offset += 4;
let proto_bytes = &buf[offset..offset + len];
result.push(TimeSeries::decode(proto_bytes).expect("protobuf decode"));
offset += len;
}
result
}
#[test]
fn encode_metric_produces_nonempty_bytes() {
let encoder = RemoteWriteEncoder::new();
let event = make_event("cpu_usage", 42.5, &[("host", "server1")], 1_700_000_000_000);
let mut buf = Vec::new();
encoder.encode_metric(&event, &mut buf).expect("encode ok");
assert!(
buf.len() > 4,
"encoded output must contain length prefix + protobuf"
);
}
#[test]
fn length_prefix_matches_protobuf_length() {
let encoder = RemoteWriteEncoder::new();
let event = make_event("test_metric", 99.9, &[("env", "prod")], 1_700_000_000_000);
let mut buf = Vec::new();
encoder.encode_metric(&event, &mut buf).expect("encode ok");
let len = u32::from_le_bytes([buf[0], buf[1], buf[2], buf[3]]) as usize;
assert_eq!(
buf.len(),
4 + len,
"total buffer length must equal 4 (prefix) + declared protobuf length"
);
let ts = TimeSeries::decode(&buf[4..]).expect("protobuf decode should succeed");
assert_eq!(ts.samples.len(), 1, "TimeSeries should contain one sample");
}
#[test]
fn name_label_is_set_to_metric_name() {
let encoder = RemoteWriteEncoder::new();
let event = make_event("http_requests_total", 100.0, &[], 1_700_000_000_000);
let mut buf = Vec::new();
encoder.encode_metric(&event, &mut buf).expect("encode ok");
let ts = decode_timeseries(&buf);
let name_label = ts
.labels
.iter()
.find(|l| l.name == "__name__")
.expect("__name__ label must be present");
assert_eq!(
name_label.value, "http_requests_total",
"__name__ label value must match the metric name"
);
}
#[test]
fn labels_are_sorted_alphabetically() {
let encoder = RemoteWriteEncoder::new();
let event = make_event(
"my_metric",
1.0,
&[("zone", "eu1"), ("env", "prod"), ("host", "server1")],
1_700_000_000_000,
);
let mut buf = Vec::new();
encoder.encode_metric(&event, &mut buf).expect("encode ok");
let ts = decode_timeseries(&buf);
let label_names: Vec<&str> = ts.labels.iter().map(|l| l.name.as_str()).collect();
assert_eq!(
label_names,
vec!["__name__", "env", "host", "zone"],
"labels must be sorted alphabetically with __name__ first"
);
}
#[test]
fn sample_has_correct_value_and_timestamp() {
let encoder = RemoteWriteEncoder::new();
let event = make_event("gauge_metric", 3.14, &[], 1_700_000_000_500);
let mut buf = Vec::new();
encoder.encode_metric(&event, &mut buf).expect("encode ok");
let ts = decode_timeseries(&buf);
assert_eq!(ts.samples.len(), 1, "must contain exactly one sample");
let sample = &ts.samples[0];
assert!(
(sample.value - 3.14).abs() < f64::EPSILON,
"sample value must be 3.14, got {}",
sample.value
);
assert_eq!(
sample.timestamp, 1_700_000_000_500i64,
"timestamp must be in milliseconds since epoch"
);
}
#[test]
fn multiple_labels_are_included_in_output() {
let encoder = RemoteWriteEncoder::new();
let event = make_event(
"up",
1.0,
&[
("instance", "server-01"),
("job", "sonda"),
("env", "staging"),
],
1_700_000_000_000,
);
let mut buf = Vec::new();
encoder.encode_metric(&event, &mut buf).expect("encode ok");
let ts = decode_timeseries(&buf);
assert_eq!(
ts.labels.len(),
4,
"must have 3 user labels + 1 __name__ label"
);
let label_map: std::collections::HashMap<&str, &str> = ts
.labels
.iter()
.map(|l| (l.name.as_str(), l.value.as_str()))
.collect();
assert_eq!(label_map.get("instance"), Some(&"server-01"));
assert_eq!(label_map.get("job"), Some(&"sonda"));
assert_eq!(label_map.get("env"), Some(&"staging"));
assert_eq!(label_map.get("__name__"), Some(&"up"));
}
#[test]
fn empty_labels_produces_only_name_label() {
let encoder = RemoteWriteEncoder::new();
let event = make_event("bare_metric", 0.0, &[], 1_700_000_000_000);
let mut buf = Vec::new();
encoder.encode_metric(&event, &mut buf).expect("encode ok");
let ts = decode_timeseries(&buf);
assert_eq!(
ts.labels.len(),
1,
"with no user labels, only __name__ should be present"
);
assert_eq!(ts.labels[0].name, "__name__");
assert_eq!(ts.labels[0].value, "bare_metric");
}
#[test]
fn remote_write_encoder_is_send_and_sync() {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<RemoteWriteEncoder>();
}
#[test]
fn default_creates_valid_encoder() {
let encoder = RemoteWriteEncoder::default();
let event = make_event("test", 1.0, &[], 1_700_000_000_000);
let mut buf = Vec::new();
encoder.encode_metric(&event, &mut buf).expect("encode ok");
assert!(!buf.is_empty());
}
#[test]
fn write_request_roundtrips_through_protobuf() {
let wr = WriteRequest {
timeseries: vec![TimeSeries {
labels: vec![
Label {
name: "__name__".to_string(),
value: "test".to_string(),
},
Label {
name: "env".to_string(),
value: "prod".to_string(),
},
],
samples: vec![Sample {
value: 42.0,
timestamp: 1_700_000_000_000,
}],
}],
};
let mut encoded = Vec::new();
wr.encode(&mut encoded).expect("encode should succeed");
let decoded = WriteRequest::decode(encoded.as_slice()).expect("decode should succeed");
assert_eq!(wr, decoded, "roundtripped WriteRequest must match original");
}
#[test]
fn multiple_encode_calls_append_to_buffer() {
let encoder = RemoteWriteEncoder::new();
let event1 = make_event("metric_a", 1.0, &[], 1_700_000_000_000);
let event2 = make_event("metric_b", 2.0, &[], 1_700_000_001_000);
let mut buf = Vec::new();
encoder.encode_metric(&event1, &mut buf).expect("encode 1");
let len_after_first = buf.len();
assert!(len_after_first > 0, "first encode should produce bytes");
encoder.encode_metric(&event2, &mut buf).expect("encode 2");
assert!(
buf.len() > len_after_first,
"second encode should append more bytes"
);
let all_ts = decode_all_timeseries(&buf);
assert_eq!(all_ts.len(), 2, "should have two TimeSeries in buffer");
}
#[test]
fn timestamp_at_epoch_zero_produces_zero_ms() {
let encoder = RemoteWriteEncoder::new();
let event = make_event("epoch_test", 1.0, &[], 0);
let mut buf = Vec::new();
encoder.encode_metric(&event, &mut buf).expect("encode ok");
let ts = decode_timeseries(&buf);
let sample = &ts.samples[0];
assert_eq!(sample.timestamp, 0, "timestamp at epoch should be 0 ms");
}
#[test]
fn large_float_value_is_preserved() {
let encoder = RemoteWriteEncoder::new();
let event = make_event("big_metric", f64::MAX, &[], 1_700_000_000_000);
let mut buf = Vec::new();
encoder.encode_metric(&event, &mut buf).expect("encode ok");
let ts = decode_timeseries(&buf);
let sample = &ts.samples[0];
assert_eq!(sample.value, f64::MAX, "f64::MAX must be preserved");
}
#[test]
fn zero_value_is_preserved() {
let encoder = RemoteWriteEncoder::new();
let event = make_event("zero_metric", 0.0, &[], 1_700_000_000_000);
let mut buf = Vec::new();
encoder.encode_metric(&event, &mut buf).expect("encode ok");
let ts = decode_timeseries(&buf);
let sample = &ts.samples[0];
assert!(
sample.value == 0.0,
"zero value must be preserved, got {}",
sample.value
);
}
#[test]
fn prometheus_stale_nan_round_trips_through_prost_encoding() {
let encoder = RemoteWriteEncoder::new();
let event = make_event("stale_metric", PROMETHEUS_STALE_NAN, &[], 1_700_000_000_000);
let mut buf = Vec::new();
encoder.encode_metric(&event, &mut buf).expect("encode ok");
let ts = decode_timeseries(&buf);
assert_eq!(ts.samples.len(), 1);
assert_eq!(
ts.samples[0].value.to_bits(),
0x7ff0000000000002,
"stale-NaN bit pattern must survive prost f64 fixed64 encoding unchanged"
);
}
#[test]
fn prometheus_stale_nan_constant_is_a_signaling_nan() {
assert!(PROMETHEUS_STALE_NAN.is_nan(), "must be NaN");
assert_eq!(PROMETHEUS_STALE_NAN.to_bits(), 0x7ff0000000000002);
}
#[test]
fn encode_log_returns_not_supported_error() {
use crate::model::log::LogEvent;
use std::collections::BTreeMap;
let encoder = RemoteWriteEncoder::new();
let log_event = LogEvent::new(
crate::model::log::Severity::Info,
"test message".to_string(),
crate::model::metric::Labels::default(),
BTreeMap::new(),
);
let mut buf = Vec::new();
let result = encoder.encode_log(&log_event, &mut buf);
assert!(
result.is_err(),
"remote write encoder must not support log encoding"
);
let err = result.unwrap_err();
let msg = err.to_string();
assert!(
msg.contains("not supported"),
"error message should contain 'not supported', got: {msg}"
);
}
}