pub mod influx;
pub mod json;
pub mod prometheus;
pub mod syslog;
use serde::Deserialize;
use crate::model::log::LogEvent;
use crate::model::metric::MetricEvent;
pub trait Encoder: Send + Sync {
fn encode_metric(
&self,
event: &MetricEvent,
buf: &mut Vec<u8>,
) -> Result<(), crate::SondaError>;
fn encode_log(&self, _event: &LogEvent, _buf: &mut Vec<u8>) -> Result<(), crate::SondaError> {
Err(crate::SondaError::Encoder(
"log encoding not supported by this encoder".into(),
))
}
}
#[derive(Debug, Clone, Deserialize)]
#[serde(tag = "type")]
pub enum EncoderConfig {
#[serde(rename = "prometheus_text")]
PrometheusText,
#[serde(rename = "influx_lp")]
InfluxLineProtocol {
field_key: Option<String>,
},
#[serde(rename = "json_lines")]
JsonLines,
#[serde(rename = "syslog")]
Syslog {
hostname: Option<String>,
app_name: Option<String>,
},
}
pub fn create_encoder(config: &EncoderConfig) -> Box<dyn Encoder> {
match config {
EncoderConfig::PrometheusText => Box::new(prometheus::PrometheusText::new()),
EncoderConfig::InfluxLineProtocol { field_key } => {
Box::new(influx::InfluxLineProtocol::new(field_key.clone()))
}
EncoderConfig::JsonLines => Box::new(json::JsonLines::new()),
EncoderConfig::Syslog { hostname, app_name } => {
Box::new(syslog::Syslog::new(hostname.clone(), app_name.clone()))
}
}
}
pub(crate) fn format_rfc3339_millis(
ts: std::time::SystemTime,
) -> Result<String, crate::SondaError> {
use std::time::UNIX_EPOCH;
let duration = ts
.duration_since(UNIX_EPOCH)
.map_err(|e| crate::SondaError::Encoder(format!("timestamp before Unix epoch: {e}")))?;
let total_secs = duration.as_secs();
let millis = duration.subsec_millis();
let days = total_secs / 86400;
let time_of_day = total_secs % 86400;
let hour = time_of_day / 3600;
let minute = (time_of_day % 3600) / 60;
let second = time_of_day % 60;
let z = days as i64 + 719468;
let era = if z >= 0 { z } else { z - 146096 } / 146097;
let doe = (z - era * 146097) as u64;
let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146096) / 365;
let y = yoe as i64 + era * 400;
let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
let mp = (5 * doy + 2) / 153;
let day = doy - (153 * mp + 2) / 5 + 1;
let month = if mp < 10 { mp + 3 } else { mp - 9 };
let year = if month <= 2 { y + 1 } else { y };
Ok(format!(
"{year:04}-{month:02}-{day:02}T{hour:02}:{minute:02}:{second:02}.{millis:03}Z",
))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn encoder_config_prometheus_text_deserializes_with_type_field() {
let yaml = "type: prometheus_text";
let config: EncoderConfig = serde_yaml::from_str(yaml).unwrap();
assert!(matches!(config, EncoderConfig::PrometheusText));
}
#[test]
fn encoder_config_json_lines_deserializes_with_type_field() {
let yaml = "type: json_lines";
let config: EncoderConfig = serde_yaml::from_str(yaml).unwrap();
assert!(matches!(config, EncoderConfig::JsonLines));
}
#[test]
fn encoder_config_influx_lp_without_field_key_deserializes_with_type_field() {
let yaml = "type: influx_lp";
let config: EncoderConfig = serde_yaml::from_str(yaml).unwrap();
assert!(matches!(
config,
EncoderConfig::InfluxLineProtocol { field_key: None }
));
}
#[test]
fn encoder_config_influx_lp_with_field_key_deserializes_with_type_field() {
let yaml = "type: influx_lp\nfield_key: requests";
let config: EncoderConfig = serde_yaml::from_str(yaml).unwrap();
assert!(matches!(
config,
EncoderConfig::InfluxLineProtocol { field_key: Some(ref k) } if k == "requests"
));
}
#[test]
fn encoder_config_unknown_type_returns_error() {
let yaml = "type: no_such_encoder";
let result: Result<EncoderConfig, _> = serde_yaml::from_str(yaml);
assert!(
result.is_err(),
"unknown type tag should fail deserialization"
);
}
#[test]
fn encoder_config_missing_type_field_returns_error() {
let yaml = "prometheus_text";
let result: Result<EncoderConfig, _> = serde_yaml::from_str(yaml);
assert!(
result.is_err(),
"missing type field should fail deserialization"
);
}
#[test]
fn encoder_config_old_external_tag_format_is_rejected() {
let yaml = "!prometheus_text";
let result: Result<EncoderConfig, _> = serde_yaml::from_str(yaml);
assert!(
result.is_err(),
"externally-tagged YAML format must be rejected in favour of internally-tagged"
);
}
#[test]
fn create_encoder_prometheus_text_succeeds() {
let config = EncoderConfig::PrometheusText;
let _enc = create_encoder(&config);
}
#[test]
fn create_encoder_json_lines_succeeds() {
let config = EncoderConfig::JsonLines;
let _enc = create_encoder(&config);
}
#[test]
fn create_encoder_influx_lp_no_field_key_succeeds() {
let config = EncoderConfig::InfluxLineProtocol { field_key: None };
let _enc = create_encoder(&config);
}
#[test]
fn create_encoder_influx_lp_with_field_key_succeeds() {
let config = EncoderConfig::InfluxLineProtocol {
field_key: Some("bytes".to_string()),
};
let _enc = create_encoder(&config);
}
#[test]
fn encoder_config_is_send_and_sync() {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<EncoderConfig>();
}
#[test]
fn encoder_config_prometheus_text_is_cloneable_and_debuggable() {
let config = EncoderConfig::PrometheusText;
let cloned = config.clone();
assert!(matches!(cloned, EncoderConfig::PrometheusText));
let s = format!("{config:?}");
assert!(s.contains("PrometheusText"));
}
#[test]
fn encoder_config_json_lines_is_cloneable_and_debuggable() {
let config = EncoderConfig::JsonLines;
let cloned = config.clone();
assert!(matches!(cloned, EncoderConfig::JsonLines));
let s = format!("{config:?}");
assert!(s.contains("JsonLines"));
}
#[test]
fn encoder_config_influx_lp_is_cloneable_and_debuggable() {
let config = EncoderConfig::InfluxLineProtocol {
field_key: Some("val".to_string()),
};
let cloned = config.clone();
assert!(matches!(
cloned,
EncoderConfig::InfluxLineProtocol { field_key: Some(ref k) } if k == "val"
));
let s = format!("{config:?}");
assert!(s.contains("InfluxLineProtocol"));
}
fn make_log_event() -> crate::model::log::LogEvent {
use std::collections::BTreeMap;
crate::model::log::LogEvent::new(
crate::model::log::Severity::Info,
"test message".to_string(),
BTreeMap::new(),
)
}
#[test]
fn prometheus_encoder_encode_log_returns_not_supported_error() {
let encoder = create_encoder(&EncoderConfig::PrometheusText);
let event = make_log_event();
let mut buf = Vec::new();
let result = encoder.encode_log(&event, &mut buf);
assert!(
result.is_err(),
"prometheus encoder must return an error for encode_log"
);
let err = result.unwrap_err();
let msg = err.to_string();
assert!(
msg.contains("not supported"),
"error message should contain 'not supported', got: {msg}"
);
}
#[test]
fn influx_encoder_encode_log_returns_not_supported_error() {
let encoder = create_encoder(&EncoderConfig::InfluxLineProtocol { field_key: None });
let event = make_log_event();
let mut buf = Vec::new();
let result = encoder.encode_log(&event, &mut buf);
assert!(
result.is_err(),
"influx encoder must return an error for encode_log"
);
let err = result.unwrap_err();
let msg = err.to_string();
assert!(
msg.contains("not supported"),
"error message should contain 'not supported', got: {msg}"
);
}
#[test]
fn json_lines_encoder_encode_log_succeeds() {
let encoder = create_encoder(&EncoderConfig::JsonLines);
let event = make_log_event();
let mut buf = Vec::new();
let result = encoder.encode_log(&event, &mut buf);
assert!(
result.is_ok(),
"json_lines encoder must support encode_log after slice 2.3"
);
assert!(!buf.is_empty(), "buffer must contain encoded data");
}
#[test]
fn encode_log_default_does_not_write_to_buffer() {
let encoder = create_encoder(&EncoderConfig::PrometheusText);
let event = make_log_event();
let mut buf = Vec::new();
let _ = encoder.encode_log(&event, &mut buf);
assert!(
buf.is_empty(),
"buffer must remain empty when encode_log returns an error"
);
}
#[test]
fn encode_log_error_is_encoder_variant() {
let encoder = create_encoder(&EncoderConfig::PrometheusText);
let event = make_log_event();
let mut buf = Vec::new();
let result = encoder.encode_log(&event, &mut buf);
let err = result.unwrap_err();
assert!(
matches!(err, crate::SondaError::Encoder(_)),
"error must be SondaError::Encoder variant, got: {err:?}"
);
}
}