use hashbrown::Equivalent;
use hashbrown::hash_map::EntryRef;
use itertools::Itertools;
use metrique_writer::sample::DefaultRng;
use metrique_writer::value::{FlagConstructor, ForceFlag, MetricOptions};
use metrique_writer_core::config::AllowUnroutableEntries;
use metrique_writer_core::format::Format;
use metrique_writer_core::sample::SampledFormat;
use metrique_writer_core::stream::IoStreamError;
use metrique_writer_core::{
Entry, EntryConfig, MetricFlags, Observation, Unit, ValidationError, ValidationErrorBuilder,
Value,
};
use rand::rngs::ThreadRng;
use rand::{Rng, RngCore};
use std::any::Any;
use std::fmt::{Display, Write};
use std::iter;
use std::mem;
use std::num::NonZero;
use std::ops::Deref;
use std::time::Duration;
use std::{borrow::Cow, io, time::SystemTime};
use smallvec::{SmallVec, smallvec};
use crate::json_string::JsonString as _;
use crate::rate_limit::rate_limited;
use super::buf::{PrefixedStringBuf, write_all_vectored};
#[derive(Clone, Default)]
struct Validation {
skip_validate_unique: bool,
skip_validate_dimensions_exist: bool,
skip_validate_names: bool,
}
#[derive(Clone)]
pub struct Emf {
state: State,
validation: Validation,
validation_map_base: hashbrown::HashMap<SCow<'static>, LineData>,
}
#[derive(Clone)]
struct State {
namespaces: Vec<JsonEncodedString>,
each_dimensions_str: Vec<JsonEncodedArray>,
log_group_and_timestamp: LogGroupNameAndTimestampString,
dimension_set_map: hashbrown::HashMap<DimensionSet, MetricsForDimensionSet>,
string_fields_buf: PrefixedStringBuf,
fields_buf: PrefixedStringBuf,
metrics_buf: PrefixedStringBuf,
dimensions_buf: PrefixedStringBuf,
after_namespace_index: usize,
counts_buf: PrefixedStringBuf,
decl_buf: PrefixedStringBuf,
allow_ignored_dimensions: bool,
}
#[derive(serde::Serialize, Clone, Debug)]
pub struct MetricDirective<'a> {
#[serde(rename = "Dimensions")]
pub dimensions: Vec<Vec<&'a str>>,
#[serde(rename = "Metrics")]
pub metrics: Vec<MetricDefinition<'a>>,
#[serde(rename = "Namespace")]
pub namespace: &'a str,
}
#[derive(serde::Serialize, Copy, Clone, Debug)]
pub struct MetricDefinition<'a> {
#[serde(rename = "Name")]
pub name: &'a str,
#[serde(rename = "Unit")]
pub unit: Unit,
#[serde(rename = "StorageResolution")]
#[serde(skip_serializing_if = "Option::is_none")]
pub storage_resolution: Option<StorageResolution>,
}
#[derive(Copy, Clone, Debug)]
pub enum StorageResolution {
Second = 1,
Minute = 60,
}
#[derive(Clone)]
struct JsonEncodedString {
encoded_str: String,
}
impl JsonEncodedString {
fn encode(input: &str) -> Self {
JsonEncodedString {
encoded_str: serde_json::to_string(input).expect("everything here is valid"),
}
}
}
impl Display for JsonEncodedString {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.encoded_str.fmt(f)
}
}
#[derive(Clone)]
struct JsonEncodedArray {
encoded_array: String,
}
impl JsonEncodedArray {
fn encode(input: &[String]) -> Self {
JsonEncodedArray {
encoded_array: serde_json::to_string(input).expect("everything here is valid"),
}
}
fn extend_with_strings<'a>(self, extend_with: impl Iterator<Item = &'a str>) -> Self {
let mut encoded_array = self.encoded_array;
let mut first: bool = encoded_array.len() == 2; encoded_array.pop();
for name in extend_with {
if !first {
encoded_array.push(',');
}
first = false;
encoded_array.json_string(name);
}
encoded_array.push(']');
JsonEncodedArray { encoded_array }
}
}
impl Display for JsonEncodedArray {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.encoded_array.fmt(f)
}
}
#[derive(Clone)]
struct LogGroupNameAndTimestampString {
encoded: String,
}
impl LogGroupNameAndTimestampString {
fn new(log_group: Option<String>) -> Self {
LogGroupNameAndTimestampString {
encoded: if let Some(log_group) = log_group {
format!(
r#"],"LogGroupName":{},"Timestamp":"#,
serde_json::to_string(&log_group).expect("everything here is valid")
)
} else {
r#"],"Timestamp":"#.to_string()
},
}
}
}
trait PushJsonSafeString {
fn push_json_safe_string<'a>(&'a mut self, s: &JsonEncodedString) -> &'a mut Self;
fn push_json_safe_array<'a>(&'a mut self, s: &JsonEncodedArray) -> &'a mut Self;
fn push_json_safe_log_group_and_timestamp<'a>(
&'a mut self,
s: &LogGroupNameAndTimestampString,
timestamp_str: &str,
) -> &'a mut Self;
}
impl PushJsonSafeString for PrefixedStringBuf {
fn push_json_safe_string<'a>(&'a mut self, s: &JsonEncodedString) -> &'a mut Self {
self.push_raw_str(&s.encoded_str)
}
fn push_json_safe_array<'a>(&'a mut self, s: &JsonEncodedArray) -> &'a mut Self {
self.push_raw_str(&s.encoded_array)
}
fn push_json_safe_log_group_and_timestamp<'a>(
&'a mut self,
s: &LogGroupNameAndTimestampString,
timestamp_str: &str,
) -> &'a mut Self {
self.push_raw_str(&s.encoded).push_raw_str(timestamp_str)
}
}
impl serde::Serialize for StorageResolution {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
(*self as u32).serialize(serializer)
}
}
impl Emf {
pub fn builder(namespace: String, default_dimensions: Vec<Vec<String>>) -> EmfBuilder {
assert!(
!default_dimensions.is_empty(),
"Without dimension sets no metrics can be published. Pass `default_dimensions=vec![vec![]]` to publish without dimensions"
);
EmfBuilder {
namespaces: vec![namespace],
default_dimensions,
allow_ignored_dimensions: false,
extra_directives: String::new(),
log_group_name: None,
#[cfg(debug_assertions)]
validation: Validation::default(),
#[cfg(not(debug_assertions))]
validation: Validation {
skip_validate_unique: true,
skip_validate_dimensions_exist: true,
skip_validate_names: true,
},
}
}
pub fn all_validations(namespace: String, default_dimensions: Vec<Vec<String>>) -> Self {
Self::builder(namespace, default_dimensions).build()
}
pub fn no_validations(namespace: String, default_dimensions: Vec<Vec<String>>) -> Self {
Self::builder(namespace, default_dimensions)
.skip_all_validations(true)
.build()
}
fn format_with_multiplicity(
&mut self,
entry: &impl Entry,
output: &mut impl io::Write,
multiplicity: Option<u64>,
) -> Result<(), IoStreamError> {
self.state.string_fields_buf.clear();
self.state.fields_buf.clear();
self.state.metrics_buf.clear();
self.state.decl_buf.clear();
self.state.dimension_set_map.clear();
let mut writer = EntryWriter {
validation_map: if self.validation.skip_validate_dimensions_exist {
hashbrown::HashMap::new()
} else {
self.validation_map_base.clone()
},
entry_dimensions: None,
state: &mut self.state,
multiplicity,
timestamp: None,
validations: &self.validation,
error: ValidationErrorBuilder::default(),
allow_split_entries: false,
is_allow_unroutable_entries: false,
};
entry.write(&mut writer);
writer.finish(output)
}
pub fn with_sampling(self) -> SampledEmf {
Self::with_sampling_and_rng(self, Default::default())
}
pub fn with_sampling_and_rng<R>(self, rng: R) -> SampledEmf<R> {
SampledEmf { emf: self, rng }
}
}
#[derive(Clone)]
pub struct EmfBuilder {
default_dimensions: Vec<Vec<String>>,
extra_directives: String,
namespaces: Vec<String>,
validation: Validation,
allow_ignored_dimensions: bool,
log_group_name: Option<String>,
}
impl EmfBuilder {
pub fn build(self) -> Emf {
let mut validation_map = hashbrown::HashMap::new();
for dimension_set in &self.default_dimensions {
for dimension in dimension_set {
validation_map.entry_ref(dimension).or_insert(LineData {
kind: LineKind::UnfoundDimension,
});
}
}
assert!(
!self.namespaces.is_empty(),
"must publish to at least 1 namespace"
);
let each_dimensions_str: Vec<JsonEncodedArray> = self
.default_dimensions
.iter()
.map(|x| JsonEncodedArray::encode(x))
.collect();
let namespaces: Vec<JsonEncodedString> = self
.namespaces
.iter()
.map(|x| JsonEncodedString::encode(x))
.collect();
let first_ns: &JsonEncodedString = &namespaces[0];
let dimensions_after_ns = r#","Dimensions":["#;
let dimensions_prefix = &format!(
r#"{{"_aws":{{"CloudWatchMetrics":[{{"Namespace":{first_ns}{dimensions_after_ns}"#
);
Emf {
state: State {
namespaces,
each_dimensions_str,
dimension_set_map: hashbrown::HashMap::new(),
after_namespace_index: dimensions_prefix.len() - dimensions_after_ns.len(),
dimensions_buf: PrefixedStringBuf::new(dimensions_prefix, 256),
fields_buf: PrefixedStringBuf::new("}", 2048),
string_fields_buf: PrefixedStringBuf::new("", 2048),
counts_buf: PrefixedStringBuf::new(r#"],"Counts":["#, 256),
metrics_buf: PrefixedStringBuf::new(r#"],"Metrics":["#, 2048),
decl_buf: PrefixedStringBuf::new(&self.extra_directives, 256),
allow_ignored_dimensions: self.allow_ignored_dimensions,
log_group_and_timestamp: LogGroupNameAndTimestampString::new(self.log_group_name),
},
validation_map_base: validation_map,
validation: self.validation,
}
}
pub fn directive(mut self, directive: MetricDirective) -> Self {
self.extra_directives.push(',');
self.extra_directives
.push_str(&serde_json::to_string(&directive).expect("nothing that can fail here"));
self
}
pub fn allow_ignored_dimensions(mut self, allow: bool) -> Self {
self.allow_ignored_dimensions = allow;
self
}
pub fn allow_dimensions_with_no_data(mut self, allow: bool) -> Self {
self.validation.skip_validate_dimensions_exist = allow;
self
}
pub fn skip_all_validations(mut self, skip: bool) -> Self {
self.validation.skip_validate_unique |= skip;
self.validation.skip_validate_dimensions_exist |= skip;
self.validation.skip_validate_names |= skip;
self
}
pub fn add_namespace(mut self, namespace: impl Into<String>) -> Self {
self.namespaces.push(namespace.into());
self
}
pub fn log_group_name(mut self, log_group_name: impl Into<String>) -> Self {
self.log_group_name = Some(log_group_name.into());
self
}
}
#[derive(Clone)]
enum LineKind {
String,
Metric { indexes: bit_set::BitSet<u32> },
UnfoundDimension,
}
#[derive(Clone)]
struct LineData {
kind: LineKind,
}
#[derive(Clone, Hash, PartialEq, Eq, Debug)]
struct DimensionSet {
entry: SmallVec<[(String, String); 2]>,
}
#[derive(Hash, PartialEq, Eq, Debug)]
struct DimensionSetKey<'a> {
entry: SmallVec<[(&'a str, &'a str); 2]>,
}
impl<'a> FromIterator<(&'a str, &'a str)> for DimensionSetKey<'a> {
fn from_iter<T: IntoIterator<Item = (&'a str, &'a str)>>(iter: T) -> Self {
let mut res = DimensionSetKey {
entry: FromIterator::from_iter(iter),
};
res.entry.sort();
res
}
}
#[derive(Clone, Hash, PartialEq, Eq)]
struct SCow<'a>(Cow<'a, str>);
impl<'a> Deref for SCow<'a> {
type Target = Cow<'a, str>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl Equivalent<SCow<'_>> for String {
fn equivalent(&self, key: &SCow<'_>) -> bool {
**self == *key.0
}
}
impl Equivalent<SCow<'_>> for &'_ str {
fn equivalent(&self, key: &SCow<'_>) -> bool {
**self == *key.0
}
}
impl std::borrow::Borrow<str> for SCow<'_> {
fn borrow(&self) -> &str {
&self.0
}
}
impl From<&'_ String> for SCow<'_> {
fn from(value: &String) -> Self {
SCow(Cow::Owned(value.clone()))
}
}
impl<'a> From<&'a str> for SCow<'a> {
fn from(value: &'a str) -> Self {
SCow(Cow::Borrowed(value))
}
}
impl<'a> From<&'_ SCow<'a>> for SCow<'a> {
fn from(value: &SCow<'a>) -> Self {
match value {
SCow(Cow::Borrowed(s)) => SCow(Cow::Borrowed(s)),
SCow(Cow::Owned(o)) => SCow(Cow::Owned(o.clone())),
}
}
}
impl Equivalent<DimensionSet> for DimensionSetKey<'_> {
fn equivalent(&self, key: &DimensionSet) -> bool {
self.entry.len() == key.entry.len()
&& self
.entry
.iter()
.zip(&key.entry)
.all(|((n1, v1), (n2, v2))| n1 == n2 && v1 == v2)
}
}
impl From<&'_ DimensionSetKey<'_>> for DimensionSet {
fn from(val: &'_ DimensionSetKey<'_>) -> Self {
Self {
entry: val
.entry
.iter()
.map(|&(k, v)| (k.to_owned(), v.to_owned()))
.collect(),
}
}
}
#[derive(Clone)]
struct MetricsForDimensionSet {
fields_buf: PrefixedStringBuf,
metrics_buf: PrefixedStringBuf,
after_namespace_index: usize,
index: NonZero<usize>,
}
impl MetricsForDimensionSet {
fn new(
namespace_str: &JsonEncodedString,
each_dimensions_str: &[JsonEncodedArray],
variable_dimensions: &DimensionSetKey<'_>,
index: NonZero<usize>,
) -> Self {
let dimensions_str = each_dimensions_str
.iter()
.map(|dim_s| {
dim_s.to_owned().extend_with_strings(
variable_dimensions.entry.iter().map(|&(name, _value)| name),
)
})
.join(",");
let mut metrics_buf = String::with_capacity(2048);
write!(
metrics_buf,
r#"{{"_aws":{{"CloudWatchMetrics":[{{"Namespace":{namespace_str}"#
)
.ok();
let after_namespace_index = metrics_buf.len();
write!(
metrics_buf,
r#","Dimensions":[{dimensions_str}],"Metrics":["#
)
.ok();
let mut fields_buf = String::with_capacity(2048);
fields_buf.push('}');
for (name, value) in &variable_dimensions.entry {
fields_buf.push(',');
fields_buf.json_string(name);
fields_buf.push(':');
fields_buf.json_string(value);
}
Self {
fields_buf: PrefixedStringBuf::from_prefix(fields_buf),
metrics_buf: PrefixedStringBuf::from_prefix(metrics_buf),
after_namespace_index,
index,
}
}
}
pub use metrique_writer_core::config::{AllowSplitEntries, EntryDimensions};
struct EntryWriter<'a> {
validation_map: hashbrown::HashMap<SCow<'a>, LineData>,
state: &'a mut State,
entry_dimensions: Option<Vec<JsonEncodedArray>>,
validations: &'a Validation,
timestamp: Option<SystemTime>,
multiplicity: Option<u64>,
error: ValidationErrorBuilder,
allow_split_entries: bool,
is_allow_unroutable_entries: bool,
}
impl<'a> metrique_writer_core::EntryWriter<'a> for EntryWriter<'a> {
fn timestamp(&mut self, timestamp: SystemTime) {
if self.timestamp.replace(timestamp).is_some() {
self.error.invalid_mut("multiple timestamps written");
}
}
fn value(&mut self, name: impl Into<Cow<'a, str>>, value: &(impl Value + ?Sized)) {
let name = name.into();
if self.validate_name(&name) {
value.write(ValueWriter {
name: SCow(name),
entry: self,
});
}
}
fn config(&mut self, config: &'a dyn EntryConfig) {
if let Some(dimensions) = (config as &dyn Any).downcast_ref::<EntryDimensions>() {
if !self.state.dimension_set_map.is_empty() {
self.error.invalid_mut("entry dimensions must be configured before emitting a metric with custom dimensions");
return;
}
if self.entry_dimensions.is_some() {
self.error
.invalid_mut("entry dimensions cannot be set twice");
return;
}
if dimensions.is_empty() {
self.error.invalid_mut("entry dimensions cannot be empty");
return;
}
if !self.validations.skip_validate_unique
|| !self.validations.skip_validate_dimensions_exist
{
for dim_set in dimensions.dim_sets() {
for dim in dim_set {
match self.validation_map.entry_ref(dim) {
hashbrown::hash_map::EntryRef::Occupied(e) => match e.get() {
LineData {
kind: LineKind::UnfoundDimension | LineKind::String,
} => {}
LineData {
kind: LineKind::Metric { .. },
} => {
if !self.validations.skip_validate_unique {
self.error.extend_mut(
ValidationError::invalid("duplicate field")
.for_field(dim),
);
}
}
},
hashbrown::hash_map::EntryRef::Vacant(v) => {
v.insert(LineData {
kind: LineKind::UnfoundDimension,
});
}
}
}
}
}
let dimensions: Vec<JsonEncodedArray> = self
.state
.each_dimensions_str
.iter()
.flat_map(|d| {
dimensions
.dim_sets()
.map(|e| d.clone().to_owned().extend_with_strings(e))
})
.collect();
self.entry_dimensions = Some(dimensions);
}
if (config as &dyn Any)
.downcast_ref::<AllowSplitEntries>()
.is_some()
{
self.allow_split_entries = true;
}
if (config as &dyn Any)
.downcast_ref::<AllowUnroutableEntries>()
.is_some()
{
self.is_allow_unroutable_entries = true;
}
}
}
impl EntryWriter<'_> {
fn finish(mut self, output: &mut impl io::Write) -> Result<(), IoStreamError> {
if !self.validations.skip_validate_dimensions_exist && !self.is_allow_unroutable_entries {
for (dim, value) in self.validation_map.iter_mut() {
if let LineData {
kind: LineKind::UnfoundDimension,
} = value
{
self.error
.extend_mut(ValidationError::invalid("missing dimension").for_field(dim));
}
}
}
let timestamp = self.timestamp.unwrap_or_else(SystemTime::now);
let unix = timestamp
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default();
let mut timestamp_buf = itoa::Buffer::new();
let timestamp_str = timestamp_buf.format(unix.as_millis());
self.error.build()?;
self.state
.decl_buf
.push_json_safe_log_group_and_timestamp(
&self.state.log_group_and_timestamp,
timestamp_str,
);
self.state.string_fields_buf.push_raw_str("}\n");
let mut emitted_any_dimension_metrics = false;
for entry in self.state.dimension_set_map.values_mut() {
entry.metrics_buf.push_raw_str("]}");
let metrics_len = entry.metrics_buf.as_str().len();
for namespace in &self.state.namespaces[1..] {
entry
.metrics_buf
.push_raw_str(r#",{"Namespace":"#)
.push_json_safe_string(namespace)
.extend_from_within_range(entry.after_namespace_index, metrics_len);
}
entry
.metrics_buf
.push_json_safe_log_group_and_timestamp(
&self.state.log_group_and_timestamp,
timestamp_str,
);
let buf: SmallVec<[_; 3]> = smallvec![
entry.metrics_buf.as_ref(),
entry.fields_buf.as_ref(),
self.state.string_fields_buf.as_ref(),
];
if entry.fields_buf.is_empty() {
continue;
}
emitted_any_dimension_metrics = true;
write_all_vectored(buf, output)?;
}
if !emitted_any_dimension_metrics || !self.state.fields_buf.is_empty() {
self.state.dimensions_buf.clear();
let mut first = true;
for dimension in self
.entry_dimensions
.as_deref()
.unwrap_or(&self.state.each_dimensions_str)
{
if !mem::replace(&mut first, false) {
self.state.dimensions_buf.push(',');
}
self.state.dimensions_buf.push_json_safe_array(dimension);
}
self.state.metrics_buf.push_raw_str("]}");
let metrics_len = self.state.metrics_buf.as_str().len();
for namespace in &self.state.namespaces[1..] {
self.state
.metrics_buf
.push_raw_str(r#",{"Namespace":"#)
.push_json_safe_string(namespace)
.push_raw_str(
&self.state.dimensions_buf.as_str()[self.state.after_namespace_index..],
)
.extend_from_within_range(0, metrics_len);
}
let buf: SmallVec<[_; 5]> = smallvec![
self.state.dimensions_buf.as_ref(),
self.state.metrics_buf.as_ref(),
self.state.decl_buf.as_ref(),
self.state.fields_buf.as_ref(),
self.state.string_fields_buf.as_ref(),
];
write_all_vectored(buf, output)?;
}
Ok(())
}
fn validate_name(&mut self, name: &str) -> bool {
if !self.validations.skip_validate_names {
if name.is_empty() {
self.error
.extend_mut(ValidationError::invalid("name can't be empty").for_field(""));
return false;
}
if name == "_aws" {
self.error
.extend_mut(ValidationError::invalid("name can't be `_aws`").for_field("_aws"));
return false;
}
}
true
}
}
struct FiniteFloat(f64);
fn clamp_to_finite(float: f64, name_for_log: &str) -> Option<FiniteFloat> {
let float = float.clamp(-f64::MAX, f64::MAX);
if !float.is_finite() {
rate_limited!(
Duration::from_secs(1),
tracing::error!(
message="skipping emitting metric with NaN value",
metric=%name_for_log,
)
);
None
} else {
Some(FiniteFloat(float))
}
}
struct MetricSkipped;
struct ValueWriter<'a, 'e> {
name: SCow<'e>,
entry: &'a mut EntryWriter<'e>,
}
impl ValueWriter<'_, '_> {
fn write_float(buf: &mut PrefixedStringBuf, v: FiniteFloat) {
assert!(v.0.is_finite(), "should be checked by the caller");
let mut dtoa_buf = dtoa::Buffer::new();
let as_str = dtoa_buf.format_finite(v.0);
buf.push_raw_str(as_str.strip_suffix(".0").unwrap_or(as_str));
}
fn write_observation(
buf: &mut PrefixedStringBuf,
counts: &mut PrefixedStringBuf,
observation: Observation,
multiplicity: Option<u64>,
name_for_log: &str,
) -> Result<(), MetricSkipped> {
let multiplicity = multiplicity.unwrap_or(1);
match observation {
Observation::Unsigned(v) => {
buf.push_integer(v);
counts.push_integer(multiplicity);
Ok(())
}
Observation::Floating(v) => {
if let Some(v) = clamp_to_finite(v, name_for_log) {
Self::write_float(buf, v);
counts.push_integer(multiplicity);
Ok(())
} else {
Err(MetricSkipped)
}
}
Observation::Repeated { total, occurrences } => {
let mean = if occurrences == 0 {
0.0
} else {
total / occurrences as f64
};
if let Some(mean) = clamp_to_finite(mean, name_for_log) {
Self::write_float(buf, mean);
counts.push_integer(occurrences.saturating_mul(multiplicity));
Ok(())
} else {
Err(MetricSkipped)
}
}
_ => {
rate_limited!(
Duration::from_secs(1),
tracing::error!(
message="skipping emitting metric due to unknown observation type",
metric=%name_for_log,
)
);
Err(MetricSkipped)
}
}
}
fn write_metric_value(
name: &str,
fields_buf: &mut PrefixedStringBuf,
counts_buf: &mut PrefixedStringBuf,
first: Observation,
mut distribution: impl Iterator<Item = Observation>,
multiplicity: Option<u64>,
) -> Result<(), MetricSkipped> {
let buf: &mut PrefixedStringBuf = fields_buf;
buf.push(',').json_string(name).push(':');
match (first, distribution.next()) {
(Observation::Unsigned(v), None) if multiplicity.is_none() => {
buf.push_integer(v);
Ok(())
}
(Observation::Floating(v), None) if multiplicity.is_none() => {
if let Some(v) = clamp_to_finite(v, name) {
Self::write_float(buf, v);
Ok(())
} else {
Err(MetricSkipped)
}
}
(first, second) => {
let counts = counts_buf;
buf.push_raw_str(r#"{"Values":["#);
let mut wrote_anything = false;
counts.clear(); for observation in iter::once(first).chain(second).chain(distribution) {
let prev_buf_len = buf.as_str().len();
let prev_counts_len = counts.as_str().len();
if wrote_anything {
buf.push(',');
counts.push(',');
}
if Self::write_observation(buf, counts, observation, multiplicity, name).is_ok()
{
wrote_anything = true;
} else {
buf.truncate(prev_buf_len);
counts.truncate(prev_counts_len);
}
}
buf.push_raw_str(counts.as_str());
counts.clear(); buf.push_raw_str("]}");
if wrote_anything {
Ok(())
} else {
Err(MetricSkipped)
}
}
}
}
#[allow(clippy::too_many_arguments)]
fn write_metric(
name: &str,
fields_buf: &mut PrefixedStringBuf,
metrics_buf: &mut PrefixedStringBuf,
counts_buf: &mut PrefixedStringBuf,
distribution: impl IntoIterator<Item = Observation>,
unit: Unit,
flags: MetricFlags<'_>,
multiplicity: Option<u64>,
) -> Result<(), ValidationError> {
let mut distribution = distribution.into_iter();
let Some(first) = distribution.next() else {
return Ok(()); };
let fields_buf_index = fields_buf.as_str().len();
if let Err(MetricSkipped) = Self::write_metric_value(
name,
fields_buf,
counts_buf,
first,
distribution,
multiplicity,
) {
fields_buf.truncate(fields_buf_index);
return Ok(()); }
let flags = flags.downcast();
if let Some(EmfOptions {
storage_mode: StorageMode::NoMetric,
..
}) = flags
{
return Ok(());
}
if !metrics_buf.is_empty() {
metrics_buf.push(',');
}
metrics_buf.push_raw_str(r#"{"Name":"#).json_string(name);
if unit != Unit::None {
metrics_buf
.push_raw_str(r#","Unit":"#)
.json_string(unit.name());
}
if let Some(EmfOptions {
storage_mode: StorageMode::HighStorageResolution,
..
}) = flags
{
metrics_buf.push_raw_str(r#","StorageResolution":1}"#);
} else {
metrics_buf.push('}');
}
Ok(())
}
fn validate_string(&mut self) {
match self.entry.validation_map.entry_ref(&self.name) {
EntryRef::Occupied(mut occupied_entry) => {
match occupied_entry.get_mut() {
LineData {
kind: LineKind::Metric { .. } | LineKind::String,
} => {
self.entry.error.extend_mut(
ValidationError::invalid("duplicate field").for_field(&self.name),
);
}
LineData {
kind: kind @ LineKind::UnfoundDimension,
} => {
*kind = LineKind::String;
}
}
}
EntryRef::Vacant(vacant_entry) => {
vacant_entry.insert(LineData {
kind: LineKind::String,
});
}
}
}
}
struct EmfArrayElementWriter<'a>(&'a mut PrefixedStringBuf);
impl metrique_writer_core::ValueWriter for EmfArrayElementWriter<'_> {
fn string(self, value: &str) {
self.0.json_string(value);
}
fn metric<'a>(
self,
distribution: impl IntoIterator<Item = Observation>,
_unit: Unit,
_dimensions: impl IntoIterator<Item = (&'a str, &'a str)>,
_flags: MetricFlags<'_>,
) {
let buf = self.0;
let mut iter = distribution.into_iter();
let Some(first) = iter.next() else { return };
match iter.next() {
None => write_emf_observation(buf, first),
Some(second) => {
buf.push('[');
let mut wrote_any = false;
for obs in std::iter::once(first)
.chain(std::iter::once(second))
.chain(iter)
{
let before = buf.as_str().len();
if wrote_any {
buf.push(',');
}
let after_sep = buf.as_str().len();
write_emf_observation(buf, obs);
if buf.as_str().len() > after_sep {
wrote_any = true;
} else {
buf.truncate(before);
}
}
buf.push(']');
}
}
}
fn error(self, _error: ValidationError) {}
}
fn write_emf_observation(buf: &mut PrefixedStringBuf, obs: Observation) {
match obs {
Observation::Unsigned(v) => {
buf.push_integer(v);
}
Observation::Floating(v) => {
if let Some(v) = clamp_to_finite(v, "") {
ValueWriter::write_float(buf, v);
}
}
Observation::Repeated { total, occurrences } => {
let mean = if occurrences == 0 {
0.0
} else {
total / occurrences as f64
};
if let Some(v) = clamp_to_finite(mean, "") {
ValueWriter::write_float(buf, v);
}
}
_ => {}
}
}
impl metrique_writer_core::ValueWriter for ValueWriter<'_, '_> {
fn string(mut self, value: &str) {
self.entry
.state
.string_fields_buf
.push(',')
.json_string(&self.name)
.push(':')
.json_string(value);
if !self.entry.validations.skip_validate_unique {
self.validate_string();
}
}
fn values<'a, V: Value + 'a>(mut self, values: impl IntoIterator<Item = &'a V>) {
let buf = &mut self.entry.state.string_fields_buf;
buf.push(',').json_string(&self.name).push(':').push('[');
let mut wrote_any = false;
for value in values {
let before = buf.as_str().len();
if wrote_any {
buf.push(',');
}
let after_sep = buf.as_str().len();
value.write(EmfArrayElementWriter(buf));
if buf.as_str().len() > after_sep {
wrote_any = true;
} else {
buf.truncate(before);
}
}
buf.push(']');
if !self.entry.validations.skip_validate_unique {
self.validate_string();
}
}
fn metric<'a>(
self,
distribution: impl IntoIterator<Item = Observation>,
unit: Unit,
dimensions: impl IntoIterator<Item = (&'a str, &'a str)>,
flags: MetricFlags<'_>,
) {
let mut dimensions = dimensions.into_iter().peekable();
let is_global = self.entry.state.allow_ignored_dimensions || dimensions.peek().is_none();
if !is_global && !self.entry.allow_split_entries {
self.entry.error.extend_mut(
ValidationError::invalid("can't use per-metric dimensions without split entries - you probably want to remove WithDimensions<>")
.for_field(&self.name),
);
}
let (metrics_buf, fields_buf, index) = if is_global {
(
&mut self.entry.state.metrics_buf,
&mut self.entry.state.fields_buf,
0,
)
} else {
let key = DimensionSetKey::from_iter(dimensions);
let index = NonZero::new(self.entry.state.dimension_set_map.len() + 1).unwrap();
let each_dimensions_str = self
.entry
.entry_dimensions
.as_deref()
.unwrap_or(&self.entry.state.each_dimensions_str);
let val = self
.entry
.state
.dimension_set_map
.entry_ref(&key)
.or_insert_with(|| {
MetricsForDimensionSet::new(
&self.entry.state.namespaces[0],
each_dimensions_str,
&key,
index,
)
});
(&mut val.metrics_buf, &mut val.fields_buf, val.index.into())
};
if !self.entry.validations.skip_validate_unique && !self.entry.is_allow_unroutable_entries {
match self
.entry
.validation_map
.entry_ref(&self.name)
.or_insert_with(|| LineData {
kind: LineKind::Metric {
indexes: bit_set::BitSet::new(),
},
})
.kind
{
LineKind::UnfoundDimension => {
self.entry.error.extend_mut(
ValidationError::invalid("can't use metric in dimension field")
.for_field(&self.name),
);
}
LineKind::Metric { ref mut indexes } => {
if !indexes.insert(index) {
self.entry.error.extend_mut(
ValidationError::invalid("duplicate field").for_field(&self.name),
);
}
}
LineKind::String => {
self.entry.error.extend_mut(
ValidationError::invalid("duplicate field").for_field(&self.name),
);
}
}
}
if let Err(err) = Self::write_metric(
&self.name,
fields_buf,
metrics_buf,
&mut self.entry.state.counts_buf,
distribution,
unit,
flags,
self.entry.multiplicity,
) {
self.error(err);
}
}
fn error(self, error: ValidationError) {
self.entry.error.extend_mut(error.for_field(&self.name));
}
}
impl Format for Emf {
fn format(
&mut self,
entry: &impl Entry,
output: &mut impl io::Write,
) -> Result<(), IoStreamError> {
self.format_with_multiplicity(entry, output, None)
}
}
#[derive(Clone, Copy, Debug, PartialOrd, Ord, PartialEq, Eq)]
enum StorageMode {
HighStorageResolution,
NoMetric,
}
#[derive(Debug)]
struct EmfOptions {
storage_mode: StorageMode,
}
impl MetricOptions for EmfOptions {
fn try_merge(&self, other: &dyn MetricOptions) -> Option<MetricFlags<'static>> {
(other as &dyn Any).downcast_ref::<EmfOptions>().map(|x| {
MetricFlags::upcast(match std::cmp::max(x.storage_mode, self.storage_mode) {
StorageMode::HighStorageResolution => &EmfOptions {
storage_mode: StorageMode::HighStorageResolution,
},
StorageMode::NoMetric => &EmfOptions {
storage_mode: StorageMode::NoMetric,
},
})
})
}
}
pub struct HighStorageResolutionCtor;
impl FlagConstructor for HighStorageResolutionCtor {
fn construct() -> MetricFlags<'static> {
MetricFlags::upcast(&EmfOptions {
storage_mode: StorageMode::HighStorageResolution,
})
}
}
pub struct NoMetricCtor;
impl FlagConstructor for NoMetricCtor {
fn construct() -> MetricFlags<'static> {
MetricFlags::upcast(&EmfOptions {
storage_mode: StorageMode::NoMetric,
})
}
}
pub type HighStorageResolution<T> = ForceFlag<T, HighStorageResolutionCtor>;
pub type NoMetric<T> = ForceFlag<T, NoMetricCtor>;
pub struct SampledEmf<R = DefaultRng<ThreadRng>> {
emf: Emf,
rng: R,
}
impl<R> Format for SampledEmf<R> {
fn format(
&mut self,
entry: &impl Entry,
output: &mut impl io::Write,
) -> Result<(), IoStreamError> {
self.emf.format(entry, output)
}
}
fn rate_to_n_alpha(rate: f32) -> (u64, f64) {
let rate = rate as f64;
let inv_rate = 1.0 / rate;
let inv_rate_int = inv_rate as u64;
(inv_rate_int, (inv_rate_int + 1) as f64 - inv_rate)
}
fn rate_to_n<R: RngCore>(rate: f32, rng: &mut R) -> u64 {
if rate < 1.0 / (i64::MAX as f32) {
u64::MAX
} else {
let (n, alpha) = rate_to_n_alpha(rate);
if rng.random::<f64>() < alpha {
n
} else {
n.saturating_add(1)
}
}
}
impl<R: RngCore> SampledFormat for SampledEmf<R> {
fn format_with_sample_rate(
&mut self,
entry: &impl Entry,
output: &mut impl io::Write,
rate: f32,
) -> Result<(), IoStreamError> {
if rate <= 0.0 || rate.is_nan() {
return Err(IoStreamError::Validation(ValidationError::invalid(
"format with non-positive sample rate",
)));
}
let n = rate_to_n(rate, &mut self.rng);
self.emf.format_with_multiplicity(entry, output, Some(n))
}
}
#[cfg(test)]
mod tests {
use assert_approx_eq::assert_approx_eq;
use assert_json_diff::assert_json_eq;
use rand::SeedableRng;
use super::*;
use core::{f32, f64};
use metrique_writer::value::{Distribution, Mean};
use metrique_writer::{EntryIoStreamExt, FormatExt};
use metrique_writer_core::{
EntryIoStream, EntryWriter, MetricValue,
unit::{BitPerSecond, Kilobyte, Millisecond, NegativeScale, Second},
value::WithDimension,
};
use rstest::rstest;
use std::time::Duration;
struct EmptyValue;
impl Value for EmptyValue {
fn write(&self, writer: impl metrique_writer_core::ValueWriter) {
writer.metric(vec![], Unit::Count, vec![], MetricFlags::empty());
}
}
impl MetricValue for EmptyValue {
type Unit = metrique_writer_core::unit::Count;
}
#[test]
fn test_rate_to_n_alpha() {
assert_eq!(rate_to_n_alpha(0.5), (2, 1.0));
let (n, alpha) = rate_to_n_alpha(0.4);
assert_eq!(n, 2);
assert_approx_eq!(alpha, 0.5, 0.001);
let (n, alpha) = rate_to_n_alpha(0.225);
assert_eq!(n, 4);
assert_approx_eq!(alpha, 0.55555, 0.001);
}
#[test]
fn test_rate_to_n() {
let mut rng = rand_chacha::ChaChaRng::seed_from_u64(0);
let mut total = 0;
const SAMPLES: usize = 10_000;
const RATE: f32 = 0.4;
for _ in 0..SAMPLES {
if rng.random::<f64>() >= RATE as f64 {
continue;
}
match rate_to_n(RATE, &mut rng) {
n @ (2 | 3) => total += n,
n => panic!("must be 2 or 3, found {n}"),
}
}
assert_approx_eq!((total as f64) / (SAMPLES as f64), 1.0, 0.01);
}
#[test]
fn test_validation_errors() {
struct TestEntry;
impl Entry for TestEntry {
fn write<'a>(&'a self, writer: &mut impl EntryWriter<'a>) {
writer.timestamp(
SystemTime::UNIX_EPOCH + Duration::from_secs_f64(1749475336.0157819),
);
writer.timestamp(
SystemTime::UNIX_EPOCH + Duration::from_secs_f64(1749475336.0157819),
);
writer.config(const { &EntryDimensions::new(Cow::Borrowed(&[])) });
writer.value("AWSAccountId", "012345678901");
writer.value("AWSAccountId", "012345678901");
writer.value(
"WithDimension",
&WithDimension::new_with_dimensions(&2.0, [("Dim", "Val")]),
);
writer.value("_aws", "some string value");
writer.value("", "some string value");
writer.value("MyDimension", &2u64);
writer.value("Metric", &2u64);
writer.value("Metric", &3u64);
writer.value("NaNMetric", &f64::NAN);
writer.value("NaNMetric", &1.0);
writer
.config(const { &EntryDimensions::new(Cow::Borrowed(&[Cow::Borrowed(&[])])) });
}
}
let mut emf = Emf::builder(
"TestNS".to_string(),
vec![
vec![],
vec!["MyDimension".to_string(), "MyOtherDimension".to_string()],
],
)
.skip_all_validations(false)
.build();
let errors = format!("{}", emf.format(&TestEntry, &mut vec![]).unwrap_err());
assert!(errors.contains("multiple timestamps written"));
assert!(errors.contains("for `AWSAccountId`: duplicate field"));
assert!(errors.contains("for `_aws`: name can't be `_aws`"));
assert!(errors.contains("for ``: name can't be empty"));
assert!(errors.contains("for `Metric`: duplicate field"));
assert!(errors.contains("for `NaNMetric`: duplicate field"));
assert!(errors.contains("for `MyDimension`: can't use metric in dimension field"));
assert!(errors.contains("for `MyOtherDimension`: missing dimension"));
assert!(errors.contains("entry dimensions cannot be empty"));
assert!(errors.contains(
"entry dimensions must be configured before emitting a metric with custom dimensions"
));
assert!(errors.contains("multiple timestamps written"));
}
#[test]
fn test_allow_dimensions_with_no_data() {
struct SuccessEntry;
impl Entry for SuccessEntry {
fn write<'a>(&'a self, writer: &mut impl EntryWriter<'a>) {
writer.timestamp(SystemTime::UNIX_EPOCH);
writer.value("Region", "us-east-1");
writer.value("MyMetric", &42u64);
}
}
let mut emf = Emf::builder(
"TestNS".to_string(),
vec![vec!["Region".to_string(), "AZ".to_string()]],
)
.skip_all_validations(false)
.allow_dimensions_with_no_data(true)
.build();
emf.format(&SuccessEntry, &mut vec![]).unwrap();
struct OtherValidationsEntry;
impl Entry for OtherValidationsEntry {
fn write<'a>(&'a self, writer: &mut impl EntryWriter<'a>) {
writer.timestamp(SystemTime::UNIX_EPOCH);
writer.value("Region", "us-east-1");
writer.value("MyMetric", &1u64);
writer.value("MyMetric", &2u64);
writer.value("_aws", "bad");
}
}
let errors = format!(
"{}",
emf.format(&OtherValidationsEntry, &mut vec![]).unwrap_err()
);
assert!(errors.contains("for `MyMetric`: duplicate field"));
assert!(errors.contains("for `_aws`: name can't be `_aws`"));
}
#[test]
fn test_validation_errors_multiple_config() {
struct TestEntry;
impl Entry for TestEntry {
fn write<'a>(&'a self, writer: &mut impl EntryWriter<'a>) {
writer.value("Metric", &2u64);
writer.config(
const {
&EntryDimensions::new(Cow::Borrowed(&[Cow::Borrowed(&[Cow::Borrowed(
"Metric",
)])]))
},
);
writer.config(
const {
&EntryDimensions::new(Cow::Borrowed(&[Cow::Borrowed(&[Cow::Borrowed(
"Metric",
)])]))
},
);
}
}
let mut emf: Emf = Emf::builder(
"TestNS".to_string(),
vec![
vec![],
vec!["MyDimension".to_string(), "MyOtherDimension".to_string()],
],
)
.skip_all_validations(true) .build();
let errors = format!("{}", emf.format(&TestEntry, &mut vec![]).unwrap_err());
assert!(errors.contains("entry dimensions cannot be set twice"));
let mut emf: Emf = Emf::builder(
"TestNS".to_string(),
vec![
vec![],
vec!["MyDimension".to_string(), "MyOtherDimension".to_string()],
],
)
.skip_all_validations(false) .build();
let errors = format!("{}", emf.format(&TestEntry, &mut vec![]).unwrap_err());
assert!(errors.contains("for `Metric`: duplicate field")); assert!(errors.contains("entry dimensions cannot be set twice"));
}
#[test]
fn test_validation_errors_dimensions() {
struct TestEntry;
impl Entry for TestEntry {
fn write<'a>(&'a self, writer: &mut impl EntryWriter<'a>) {
writer.timestamp(
SystemTime::UNIX_EPOCH + Duration::from_secs_f64(1749475336.0157819),
);
writer.value("WithDimension1", "012345678901");
writer.value(
"WithDimension1",
&WithDimension::new_with_dimensions(&2.0, [("Dim", "Val")]),
);
writer.value(
"WithDimension2",
&WithDimension::new_with_dimensions(&2.0, [("Dim", "Val")]),
);
writer.value("WithDimension2", "012345678901");
writer.value("MyOtherDimension", "foo");
writer.value(
"_aws",
&WithDimension::new_with_dimensions(&2.0, [("Dim", "Val")]),
);
writer.value(
"",
&WithDimension::new_with_dimensions(&2.0, [("Dim", "Val")]),
);
writer.value(
"WithDimension3",
&WithDimension::new_with_dimensions(&2.0, [("Dim", "Val")]),
);
writer.value(
"WithDimension3",
&WithDimension::new_with_dimensions(&2.0, [("Dim", "Val")]),
);
writer.value("WithDimensionOK", &2.0);
writer.value(
"WithDimensionOK",
&WithDimension::new_with_dimensions(&2.0, [("Dim", "Val")]),
);
writer.value("WithDimension4", &2.0);
writer.value(
"WithDimension4",
&WithDimension::new_with_dimensions(&2.0, [("Dim", "Val")]),
);
writer.value("WithDimension4", &2.0);
writer.value(
"WithDimension5",
&WithDimension::new_with_dimensions(&EmptyValue, [("Dim", "Val")]),
);
writer.value(
"WithDimension5",
&WithDimension::new_with_dimensions(&EmptyValue, [("Dim", "Val")]),
);
writer.value(
"WithDimension6",
&WithDimension::new_with_dimensions(&EmptyValue, [("Dim", "Val")]),
);
writer.value(
"WithDimension6",
&WithDimension::new_with_dimensions(&2.0, [("Dim", "Val")]),
);
writer.value(
"MyDimension",
&WithDimension::new_with_dimensions(&2.0, [("Dim", "Val")]),
);
}
}
fn check(allow_ignored: bool) {
let mut emf = Emf::builder(
"TestNS".to_string(),
vec![
vec![],
vec!["MyDimension".to_string(), "MyOtherDimension".to_string()],
],
)
.skip_all_validations(false)
.allow_ignored_dimensions(allow_ignored)
.build();
let errors = format!("{}", emf.format(&TestEntry, &mut vec![]).unwrap_err());
assert!(errors.contains("for `WithDimension1`: duplicate field"));
assert!(errors.contains("for `WithDimension2`: duplicate field"));
assert!(errors.contains("for `WithDimension3`: duplicate field"));
assert!(errors.contains("for `WithDimension4`: duplicate field"));
assert!(errors.contains("for `WithDimension5`: duplicate field"));
assert!(errors.contains("for `WithDimension6`: duplicate field"));
assert_eq!(
errors.contains("for `WithDimensionOK`: duplicate field"),
allow_ignored
);
assert!(errors.contains("for `_aws`: name can't be `_aws`"));
assert!(errors.contains("for ``: name can't be empty"));
assert!(errors.contains("for `MyDimension`: can't use metric in dimension field"));
assert!(errors.contains("for `MyDimension`: missing dimension"));
}
check(false);
check(true);
}
#[rstest]
#[case(false)]
#[case(true)]
fn test_validation_errors_dimensions_no_split(#[case] preset_split_entries: bool) {
struct TestEntry;
impl Entry for TestEntry {
fn write<'a>(&'a self, writer: &mut impl EntryWriter<'a>) {
writer.timestamp(
SystemTime::UNIX_EPOCH + Duration::from_secs_f64(1749475336.0157819),
);
writer.value("Normal", &1.0);
writer.value(
"WithDimension",
&WithDimension::new_with_dimensions(&2.0, [("Dim", "Val")]),
);
}
}
fn check(skip_validations: bool, preset_split_entries: bool) {
let mut emf = Emf::builder("TestNS".to_string(), vec![vec![]])
.skip_all_validations(skip_validations)
.build();
if preset_split_entries {
struct ForceSplitEntries;
impl Entry for ForceSplitEntries {
fn write<'a>(&'a self, writer: &mut impl EntryWriter<'a>) {
writer.config(&const { AllowSplitEntries::new() });
}
}
emf.format(&ForceSplitEntries, &mut vec![]).unwrap();
}
let errors = format!("{}", emf.format(&TestEntry, &mut vec![]).unwrap_err());
assert!(errors.contains("for `WithDimension`: can't use per-metric dimensions without split entries - you probably want to remove WithDimensions<>"));
assert!(!errors.contains("Normal"));
}
check(false, preset_split_entries);
check(true, preset_split_entries);
}
#[test]
fn test_sampling_bad_rate() {
struct TestEntry;
impl Entry for TestEntry {
fn write<'a>(&'a self, writer: &mut impl EntryWriter<'a>) {
writer.timestamp(SystemTime::UNIX_EPOCH);
writer.value(
"SomeRepeatedDuration",
&Mean::<Millisecond>::from_iter([1u32, 2, 3]),
);
}
}
let mut format = Emf::no_validations("MyNS".into(), vec![vec![]]).with_sampling();
assert!(
format
.format_with_sample_rate(&TestEntry, &mut vec![], 1.0)
.is_ok()
);
assert!(
format
.format_with_sample_rate(&TestEntry, &mut vec![], 0.0015)
.is_ok()
);
let mut infty = vec![];
assert!(
format
.format_with_sample_rate(&TestEntry, &mut infty, 1e-30)
.is_ok()
);
assert!(
String::from_utf8(infty).unwrap().contains(
r#""SomeRepeatedDuration":{"Values":[2],"Counts":[18446744073709551615]}"#
)
);
assert!(
format
.format_with_sample_rate(&TestEntry, &mut vec![], f32::NAN)
.is_err()
);
assert!(
format
.format_with_sample_rate(&TestEntry, &mut vec![], 0.0)
.is_err()
);
assert!(
format
.format_with_sample_rate(&TestEntry, &mut vec![], -1.0)
.is_err()
);
}
#[test]
fn test_missing_timestamp() {
struct TestEntry;
impl Entry for TestEntry {
fn write<'a>(&'a self, writer: &mut impl EntryWriter<'a>) {
writer.value("Metric", &2u64);
}
}
let mut emf = Emf::all_validations("TestNS".to_string(), vec![vec![]]);
let mut buf = vec![];
emf.format(&TestEntry, &mut buf).unwrap();
let emf: serde_json::Value = serde_json::from_slice(&buf).unwrap();
let now = i64::try_from(
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_millis(),
)
.unwrap();
let json_now = emf
.get("_aws")
.unwrap()
.get("Timestamp")
.unwrap()
.as_i64()
.unwrap();
if now.abs_diff(json_now) > 1_000_000_000 {
assert!(false, "time is not sane {now} {json_now}");
}
}
#[rstest]
#[case(None)]
#[case(Some(1))]
#[case(Some(2))]
fn formats_all_features(#[case] sample_rate: Option<u64>) {
struct TestEntry;
impl Entry for TestEntry {
fn write<'a>(&'a self, writer: &mut impl EntryWriter<'a>) {
writer.timestamp(
SystemTime::UNIX_EPOCH + Duration::from_secs_f64(1749475336.0157819),
);
writer.value("NaN", &f64::NAN);
writer.value("AWSAccountId", "012345678901");
writer.value("API", "MyAPI");
writer.value("StringProp", "some string value");
writer.value("HighResCount", &HighStorageResolution::from(1234u64));
writer.value("BasicIntCount", &1234u64);
writer.value("NoMetric", &NoMetric::from(&1235u64));
writer.value("BasicFloatCount", &5.4321f64);
writer.value("SomeDuration", &Duration::from_micros(12345678));
writer.value(
"SomeRepeatedDuration",
&Mean::<Millisecond>::from_iter([1u32, 2, 3]),
);
writer.value(
"Nothing",
&Observation::Repeated {
total: 0.0,
occurrences: 0,
},
);
writer.value(
"RepeatedDuration",
&Distribution::<_, 2>::from_iter([
Duration::from_micros(10),
Duration::from_micros(170),
]),
);
writer.value("CounterWithUnit", &99u64.with_unit::<Kilobyte>());
writer.value(
"MeanValue",
&WithDimension::new_with_dimensions(
Mean::<Second>::from_iter([1u8, 2, 3]),
[("Ignored", "X")],
),
);
writer.value(
"DistributionWithNonFinite",
&Distribution::<f64>::from_iter([
f64::NAN,
f64::INFINITY,
-f64::INFINITY,
f64::NAN,
1.0,
f64::NAN,
]),
);
writer.value("OtherNaN", &f64::NAN);
writer.value(
"DistributionWithOnlyNaN",
&Distribution::<f64>::from_iter([f64::NAN, f64::NAN]),
);
writer.value(
"ComplexDistribution",
&Distribution::<_, 3>::from_iter([456u32, 789u32, 123u32])
.with_unit::<BitPerSecond>(),
);
writer.value("NoObservations", &Distribution::<u64, 0>::from_iter([]));
}
}
fn check(format: Emf, extra: &str, sample_rate: Option<u64>) {
let mut sampled_format = format.with_sampling();
for _ in 0..3 {
let mut output = Vec::new();
if let Some(sample_rate) = sample_rate {
sampled_format
.format_with_sample_rate(
&TestEntry,
&mut output,
1.0 / (sample_rate as f32),
)
.unwrap();
} else {
sampled_format.format(&TestEntry, &mut output).unwrap();
}
let json: serde_json::Value = serde_json::from_slice(&output).unwrap();
let expected = format!(
"{}{}{}{}",
match sample_rate {
None =>
r#"
{
"BasicFloatCount": 5.4321,
"BasicIntCount": 1234,
"NoMetric": 1235,
"HighResCount": 1234,
"ComplexDistribution": {"Values": [456,789,123], "Counts": [1,1,1]},
"CounterWithUnit": 99,
"MeanValue": {"Values":[2], "Counts":[3]},
"RepeatedDuration": {"Values":[0.01, 0.17], "Counts": [1, 1]},
"SomeDuration": 12345.678,
"SomeRepeatedDuration": {"Values":[2], "Counts":[3]},
"Nothing": {"Values":[0], "Counts":[0]},
"DistributionWithNonFinite": {"Values":[1.7976931348623157e308,-1.7976931348623157e308,1], "Counts":[1,1,1]},"#,
Some(1) =>
r#"
{
"BasicFloatCount": {"Values": [5.4321], "Counts": [1]},
"BasicIntCount": {"Values": [1234], "Counts": [1]},
"NoMetric": {"Values": [1235], "Counts": [1]},
"HighResCount": {"Values": [1234], "Counts": [1]},
"ComplexDistribution": {"Values": [456,789,123], "Counts": [1,1,1]},
"CounterWithUnit": {"Values": [99], "Counts": [1]},
"MeanValue": {"Values": [2], "Counts": [3]},
"RepeatedDuration": {"Values": [0.01, 0.17], "Counts": [1, 1]},
"SomeDuration": {"Values": [12345.678], "Counts": [1]},
"SomeRepeatedDuration": {"Values": [2], "Counts": [3]},
"Nothing": {"Values": [0], "Counts": [0]},
"DistributionWithNonFinite": {"Values":[1.7976931348623157e308,-1.7976931348623157e308,1], "Counts":[1,1,1]},"#,
Some(2) =>
r#"
{
"BasicFloatCount": {"Values": [5.4321], "Counts": [2]},
"BasicIntCount": {"Values": [1234], "Counts": [2]},
"NoMetric": {"Values": [1235], "Counts": [2]},
"HighResCount": {"Values": [1234], "Counts": [2]},
"ComplexDistribution": {"Values": [456,789,123], "Counts": [2,2,2]},
"CounterWithUnit": {"Values": [99], "Counts": [2]},
"MeanValue": {"Values": [2], "Counts": [6]},
"RepeatedDuration": {"Values": [0.01, 0.17], "Counts": [2, 2]},
"SomeDuration": {"Values": [12345.678], "Counts": [2]},
"SomeRepeatedDuration": {"Values": [2], "Counts": [6]},
"Nothing": {"Values": [0], "Counts": [0]},
"DistributionWithNonFinite": {"Values":[1.7976931348623157e308,-1.7976931348623157e308,1], "Counts":[2,2,2]},"#,
_ => panic!("unknown sample rate"),
},
r#"
"API": "MyAPI",
"AWSAccountId": "012345678901",
"StringProp": "some string value",
"_aws": {
"CloudWatchMetrics": [
{
"Namespace": "TestNS",
"Dimensions": [
[
],
[
"AWSAccountId"
]
],
"Metrics": [
{
"Name": "HighResCount",
"StorageResolution": 1
},
{
"Name": "BasicIntCount"
},
{
"Name": "BasicFloatCount"
},
{
"Name": "SomeDuration",
"Unit": "Milliseconds"
},
{
"Name": "SomeRepeatedDuration",
"Unit": "Milliseconds"
},
{
"Name": "Nothing"
},
{
"Name": "RepeatedDuration",
"Unit": "Milliseconds"
},
{
"Name": "CounterWithUnit",
"Unit": "Kilobytes"
},
{
"Name": "MeanValue",
"Unit": "Seconds"
},
{
"Name": "DistributionWithNonFinite"
},
{
"Name": "ComplexDistribution",
"Unit": "Bits/Second"
}
]
}"#,
extra,
r#"
],
"Timestamp": 1749475336015
}
}
"#
);
assert_json_diff::assert_json_eq!(
json,
serde_json::from_str::<serde_json::Value>(&expected).unwrap()
);
}
}
check(
Emf::builder(
"TestNS".to_string(),
vec![vec![], vec!["AWSAccountId".to_string()]],
)
.directive(MetricDirective {
dimensions: vec![vec!["API"]],
metrics: vec![MetricDefinition {
name: "MeanValue",
unit: Unit::Second(NegativeScale::One),
storage_resolution: Some(StorageResolution::Second),
}],
namespace: "TestNS",
})
.allow_ignored_dimensions(true)
.skip_all_validations(true)
.build(),
r#" ,{ "Namespace": "TestNS", "Dimensions": [["API"]], "Metrics": [{"Name": "MeanValue", "Unit": "Seconds", "StorageResolution": 1}]}"#,
sample_rate,
);
check(
Emf::builder(
"TestNS".to_string(),
vec![vec![], vec!["AWSAccountId".to_string()]],
)
.directive(MetricDirective {
dimensions: vec![vec!["API"]],
metrics: vec![MetricDefinition {
name: "MeanValue",
unit: Unit::Second(NegativeScale::One),
storage_resolution: Some(StorageResolution::Second),
}],
namespace: "TestNS",
})
.allow_ignored_dimensions(true)
.skip_all_validations(false)
.build(),
r#" ,{ "Namespace": "TestNS", "Dimensions": [["API"]], "Metrics": [{"Name": "MeanValue", "Unit": "Seconds", "StorageResolution": 1}]}"#,
sample_rate,
);
}
#[test]
fn formats_all_features_dimensions() {
struct TestEntry;
impl Entry for TestEntry {
fn write<'a>(&'a self, writer: &mut impl EntryWriter<'a>) {
writer.timestamp(
SystemTime::UNIX_EPOCH + Duration::from_secs_f64(1749475336.0157819),
);
writer.config(const { &AllowSplitEntries::new() });
writer.value("AWSAccountId", "012345678901");
writer.value("API", "MyAPI");
writer.value("StringProp", "some string value");
writer.value("BasicIntCount", &1234u64);
writer.value("NoMetric", &NoMetric::from(&1235u64));
writer.value(
"DimensionedIntCount",
&WithDimension::new_with_dimensions(1235u64, [("Kind", "Bar")]),
);
writer.value(
"MeanValue",
&Mean::<Second>::from_iter([1u8, 2, 3, 4, 5, 6]),
);
writer.value(
"MeanValue",
&WithDimension::new_with_dimensions(
Mean::<Second>::from_iter([1u8, 2, 3]),
[("Kind", "Foo")],
),
);
writer.value(
"MeanValue",
&WithDimension::new_with_dimensions(&EmptyValue, [("Kind", "Empty")]),
);
writer.value(
"MeanValue",
&WithDimension::new_with_dimensions(
Mean::<Second>::from_iter([4u8, 5, 6]),
[("Kind", "Bar")],
),
);
writer.value(
"MeanValue",
&WithDimension::new_with_dimensions(
Mean::<Second>::from_iter([4u8, 5, 6]),
[("Kind", "Bar"), ("Type", "Baz")],
),
);
}
}
fn check(mut format: Emf, extra: &str) {
for _ in 0..3 {
let mut output = Vec::new();
format.format(&TestEntry, &mut output).unwrap();
let mut output: Vec<&[u8]> = output.split(|c| *c == b'\n').collect();
assert_eq!(output.pop().unwrap(), b"");
assert_eq!(output.len(), 4);
output.sort();
let json: serde_json::Value = serde_json::from_slice(&output[0]).unwrap();
let expected = r#"
{
"API": "MyAPI",
"AWSAccountId": "012345678901",
"StringProp": "some string value",
"Kind": "Bar",
"Type": "Baz",
"MeanValue": {"Values":[5], "Counts":[3]},
"_aws": {
"CloudWatchMetrics": [
{
"Namespace": "TestNS",
"Dimensions": [
[
"Kind",
"Type"
],
[
"AWSAccountId",
"Kind",
"Type"
]
],
"Metrics": [
{
"Name": "MeanValue",
"Unit": "Seconds"
}
]
}
],
"Timestamp": 1749475336015
}
}
"#;
assert_json_diff::assert_json_eq!(
json,
serde_json::from_str::<serde_json::Value>(&expected).unwrap()
);
let json: serde_json::Value = serde_json::from_slice(&output[1]).unwrap();
let expected: &str = r#"
{
"API": "MyAPI",
"AWSAccountId": "012345678901",
"StringProp": "some string value",
"Kind": "Bar",
"MeanValue": {"Values":[5], "Counts":[3]},
"DimensionedIntCount": 1235,
"_aws": {
"CloudWatchMetrics": [
{
"Namespace": "TestNS",
"Dimensions": [
[
"Kind"
],
[
"AWSAccountId",
"Kind"
]
],
"Metrics": [
{
"Name": "DimensionedIntCount"
},
{
"Name": "MeanValue",
"Unit": "Seconds"
}
]
}
],
"Timestamp": 1749475336015
}
}
"#;
assert_json_diff::assert_json_eq!(
json,
serde_json::from_str::<serde_json::Value>(&expected).unwrap()
);
let expected: &str = r#"
{
"API": "MyAPI",
"AWSAccountId": "012345678901",
"StringProp": "some string value",
"Kind": "Foo",
"MeanValue": {"Values":[2], "Counts":[3]},
"_aws": {
"CloudWatchMetrics": [
{
"Namespace": "TestNS",
"Dimensions": [
[
"Kind"
],
[
"AWSAccountId",
"Kind"
]
],
"Metrics": [
{
"Name": "MeanValue",
"Unit": "Seconds"
}
]
}
],
"Timestamp": 1749475336015
}
}
"#;
let json: serde_json::Value = serde_json::from_slice(&output[2]).unwrap();
assert_json_diff::assert_json_eq!(
json,
serde_json::from_str::<serde_json::Value>(&expected).unwrap()
);
let expected = format!(
"{}{}{}",
r#"
{
"API": "MyAPI",
"AWSAccountId": "012345678901",
"StringProp": "some string value",
"MeanValue": {"Values":[3.5], "Counts":[6]},
"BasicIntCount": 1234,
"NoMetric": 1235,
"_aws": {
"CloudWatchMetrics": [
{
"Namespace": "TestNS",
"Dimensions": [
[
],
[
"AWSAccountId"
]
],
"Metrics": [
{
"Name": "BasicIntCount"
},
{
"Name": "MeanValue",
"Unit": "Seconds"
}
]
}"#,
extra,
r#"
],
"Timestamp": 1749475336015
}
}
"#
);
let json: serde_json::Value = serde_json::from_slice(&output[3]).unwrap();
assert_json_diff::assert_json_eq!(
json,
serde_json::from_str::<serde_json::Value>(&expected).unwrap()
);
}
}
check(
Emf::builder(
"TestNS".to_string(),
vec![vec![], vec!["AWSAccountId".to_string()]],
)
.directive(MetricDirective {
dimensions: vec![vec!["API"]],
metrics: vec![MetricDefinition {
name: "MeanValue",
unit: Unit::Second(NegativeScale::One),
storage_resolution: Some(StorageResolution::Second),
}],
namespace: "TestNS",
})
.skip_all_validations(true)
.build(),
r#" ,{ "Namespace": "TestNS", "Dimensions": [["API"]], "Metrics": [{"Name": "MeanValue", "Unit": "Seconds", "StorageResolution": 1}]}"#,
);
check(
Emf::builder(
"TestNS".to_string(),
vec![vec![], vec!["AWSAccountId".to_string()]],
)
.directive(MetricDirective {
dimensions: vec![vec!["API"]],
metrics: vec![MetricDefinition {
name: "MeanValue",
unit: Unit::Second(NegativeScale::One),
storage_resolution: Some(StorageResolution::Second),
}],
namespace: "TestNS",
})
.skip_all_validations(false)
.build(),
r#" ,{ "Namespace": "TestNS", "Dimensions": [["API"]], "Metrics": [{"Name": "MeanValue", "Unit": "Seconds", "StorageResolution": 1}]}"#,
);
check(
Emf::no_validations(
"TestNS".to_string(),
vec![vec![], vec!["AWSAccountId".to_string()]],
),
"",
);
check(
Emf::all_validations(
"TestNS".to_string(),
vec![vec![], vec!["AWSAccountId".to_string()]],
),
"",
);
}
#[test]
fn formats_all_features_dimensions_per_entry() {
struct TestEntry;
impl Entry for TestEntry {
fn write<'a>(&'a self, writer: &mut impl EntryWriter<'a>) {
writer.timestamp(
SystemTime::UNIX_EPOCH + Duration::from_secs_f64(1749475336.0157819),
);
writer.config(const { &AllowSplitEntries::new() });
writer.value("AWSAccountId", "012345678901");
writer.value("API", "MyAPI");
writer.value("BasicIntCount", &1234u64);
writer.config(
const {
&EntryDimensions::new(Cow::Borrowed(&[
Cow::Borrowed(&[Cow::Borrowed("API")]),
Cow::Borrowed(&[Cow::Borrowed("API"), Cow::Borrowed("StringProp")]),
]))
},
);
writer.value("StringProp", "some string value");
writer.value(
"MeanValue",
&Mean::<Second>::from_iter([1u8, 2, 3, 4, 5, 6]),
);
writer.value(
"MeanValue",
&WithDimension::new_with_dimensions(
Mean::<Second>::from_iter([1u8, 2, 3]),
[("Kind", "Foo")],
),
);
}
}
fn check(mut format: Emf, log_group_name: &str, extra: &str) {
for _ in 0..3 {
let mut output = Vec::new();
format.format(&TestEntry, &mut output).unwrap();
let mut output: Vec<&[u8]> = output.split(|c| *c == b'\n').collect();
assert_eq!(output.pop().unwrap(), b"");
assert_eq!(output.len(), 2);
output.sort();
let expected: String = format!(
"{}{}{}",
r#"
{
"API": "MyAPI",
"AWSAccountId": "012345678901",
"StringProp": "some string value",
"Kind": "Foo",
"MeanValue": {"Values":[2], "Counts":[3]},
"_aws": {
"CloudWatchMetrics": [
{
"Namespace": "TestNS",
"Dimensions": [
["API", "Kind"],
["API", "StringProp", "Kind"],
["AWSAccountId", "API", "Kind"],
["AWSAccountId", "API", "StringProp", "Kind"]
],
"Metrics": [
{
"Name": "MeanValue",
"Unit": "Seconds"
}
]
}
],"#,
log_group_name,
r#"
"Timestamp": 1749475336015
}
}
"#
);
let json: serde_json::Value = serde_json::from_slice(&output[0]).unwrap();
assert_json_diff::assert_json_eq!(
json,
serde_json::from_str::<serde_json::Value>(&expected).unwrap()
);
let expected = format!(
"{}{}{}{}{}",
r#"
{
"API": "MyAPI",
"AWSAccountId": "012345678901",
"StringProp": "some string value",
"MeanValue": {"Values":[3.5], "Counts":[6]},
"BasicIntCount": 1234,
"_aws": {
"CloudWatchMetrics": [
{
"Namespace": "TestNS",
"Dimensions": [
["API"],
["API", "StringProp"],
["AWSAccountId", "API"],
["AWSAccountId", "API", "StringProp"]
],
"Metrics": [
{
"Name": "BasicIntCount"
},
{
"Name": "MeanValue",
"Unit": "Seconds"
}
]
}"#,
extra,
r"],",
log_group_name,
r#""Timestamp": 1749475336015
}
}
"#
);
let json: serde_json::Value = serde_json::from_slice(&output[1]).unwrap();
assert_json_diff::assert_json_eq!(
json,
serde_json::from_str::<serde_json::Value>(&expected).unwrap()
);
}
}
check(
Emf::builder(
"TestNS".to_string(),
vec![vec![], vec!["AWSAccountId".to_string()]],
)
.directive(MetricDirective {
dimensions: vec![vec!["API"]],
metrics: vec![MetricDefinition {
name: "MeanValue",
unit: Unit::Second(NegativeScale::One),
storage_resolution: Some(StorageResolution::Second),
}],
namespace: "TestNS",
})
.skip_all_validations(true)
.build(),
"",
r#" ,{ "Namespace": "TestNS", "Dimensions": [["API"]], "Metrics": [{"Name": "MeanValue", "Unit": "Seconds", "StorageResolution": 1}]}"#,
);
check(
Emf::builder(
"TestNS".to_string(),
vec![vec![], vec!["AWSAccountId".to_string()]],
)
.directive(MetricDirective {
dimensions: vec![vec!["API"]],
metrics: vec![MetricDefinition {
name: "MeanValue",
unit: Unit::Second(NegativeScale::One),
storage_resolution: Some(StorageResolution::Second),
}],
namespace: "TestNS",
})
.skip_all_validations(false)
.build(),
"",
r#" ,{ "Namespace": "TestNS", "Dimensions": [["API"]], "Metrics": [{"Name": "MeanValue", "Unit": "Seconds", "StorageResolution": 1}]}"#,
);
check(
Emf::no_validations(
"TestNS".to_string(),
vec![vec![], vec!["AWSAccountId".to_string()]],
),
"",
"",
);
check(
Emf::all_validations(
"TestNS".to_string(),
vec![vec![], vec!["AWSAccountId".to_string()]],
),
"",
"",
);
check(
Emf::builder(
"TestNS".to_string(),
vec![vec![], vec!["AWSAccountId".to_string()]],
)
.build(),
"",
"",
);
check(
Emf::builder(
"TestNS".to_string(),
vec![vec![], vec!["AWSAccountId".to_string()]],
)
.log_group_name("Bar")
.build(),
r#""LogGroupName": "Bar","#,
"",
);
}
#[rstest]
#[case(false)]
#[case(true)]
fn test_multiple_namespaces(#[case] split_entries: bool) {
struct TestEntry {
split_entries: bool,
}
impl Entry for TestEntry {
fn write<'a>(&'a self, writer: &mut impl EntryWriter<'a>) {
writer.timestamp(
SystemTime::UNIX_EPOCH + Duration::from_secs_f64(1749475336.0157819),
);
if self.split_entries {
writer.config(const { &AllowSplitEntries::new() });
}
writer.value("AWSAccountId", "012345678901");
writer.value("BasicIntCount", &1234u64);
if self.split_entries {
writer.value(
"MeanValue",
&WithDimension::new_with_dimensions(
Mean::<Second>::from_iter([1u8, 2, 3]),
[("Kind", "Foo")],
),
);
}
}
}
fn check(mut format: Emf, extra: &str, split_entries: bool) {
for _ in 0..3 {
let mut output = Vec::new();
format
.format(&TestEntry { split_entries }, &mut output)
.unwrap();
let mut output: Vec<&[u8]> = output.split(|c| *c == b'\n').collect();
assert_eq!(output.pop().unwrap(), b"");
assert_eq!(output.len(), if split_entries { 2 } else { 1 });
output.sort();
if split_entries {
let expected: &str = r#"
{
"AWSAccountId": "012345678901",
"Kind": "Foo",
"MeanValue": {
"Counts": [3],
"Values": [2]
},
"_aws": {
"CloudWatchMetrics": [
{
"Dimensions": [["Kind"], ["AWSAccountId", "Kind"]],
"Metrics": [{"Name": "MeanValue", "Unit": "Seconds"}],
"Namespace": "TestNS"
},
{
"Dimensions": [["Kind"], ["AWSAccountId", "Kind"]],
"Metrics": [{"Name": "MeanValue", "Unit": "Seconds"}],
"Namespace": "OtherNS"
},
{
"Dimensions": [["Kind"], ["AWSAccountId", "Kind"]],
"Metrics": [{"Name": "MeanValue", "Unit": "Seconds"}],
"Namespace": "ThirdNS"
}
],
"Timestamp": 1749475336015
}
}
"#;
let json: serde_json::Value =
serde_json::from_slice(&output.remove(0)).unwrap();
assert_json_diff::assert_json_eq!(
json,
serde_json::from_str::<serde_json::Value>(&expected).unwrap()
);
}
let expected = format!(
"{}{}{}",
r#"
{
"BasicIntCount": 1234,
"AWSAccountId": "012345678901",
"_aws": {
"CloudWatchMetrics": [
{
"Namespace": "TestNS",
"Dimensions": [[], ["AWSAccountId"]],
"Metrics": [{ "Name": "BasicIntCount" }]
},
{
"Namespace": "OtherNS",
"Dimensions": [[], ["AWSAccountId"]],
"Metrics": [{ "Name": "BasicIntCount" }]
},
{
"Namespace": "ThirdNS",
"Dimensions": [[], ["AWSAccountId"]],
"Metrics": [{ "Name": "BasicIntCount" }]
}
"#,
extra,
r#"
],
"Timestamp": 1749475336015
}
}
"#
);
let json: serde_json::Value = serde_json::from_slice(&output[0]).unwrap();
assert_json_diff::assert_json_eq!(
json,
serde_json::from_str::<serde_json::Value>(&expected).unwrap()
);
}
}
check(
Emf::builder(
"TestNS".to_string(),
vec![vec![], vec!["AWSAccountId".to_string()]],
)
.skip_all_validations(false)
.add_namespace("OtherNS".to_string())
.add_namespace("ThirdNS".to_string())
.build(),
"",
split_entries,
);
check(
Emf::builder(
"TestNS".to_string(),
vec![vec![], vec!["AWSAccountId".to_string()]],
)
.skip_all_validations(true)
.add_namespace("OtherNS".to_string())
.add_namespace("ThirdNS".to_string())
.build(),
"",
split_entries,
);
check(
Emf::builder(
"TestNS".to_string(),
vec![vec![], vec!["AWSAccountId".to_string()]],
)
.add_namespace("OtherNS".to_string())
.add_namespace("ThirdNS".to_string())
.directive(MetricDirective {
dimensions: vec![vec!["API"]],
metrics: vec![MetricDefinition {
name: "MeanValue",
unit: Unit::Second(NegativeScale::One),
storage_resolution: Some(StorageResolution::Second),
}],
namespace: "TestNS",
})
.skip_all_validations(false)
.build(),
r#" ,{ "Namespace": "TestNS", "Dimensions": [["API"]], "Metrics": [{"Name": "MeanValue", "Unit": "Seconds", "StorageResolution": 1}]}"#,
split_entries,
);
}
#[test]
fn formats_dimensions_only() {
struct TestEntry;
impl Entry for TestEntry {
fn write<'a>(&'a self, writer: &mut impl EntryWriter<'a>) {
writer.timestamp(
SystemTime::UNIX_EPOCH + Duration::from_secs_f64(1749475336.0157819),
);
writer.config(const { &AllowSplitEntries::new() });
writer.value("AWSAccountId", "012345678901");
writer.value(
"MeanValue",
&WithDimension::new_with_dimensions(
Mean::<Second>::from_iter([1u8, 2, 3]),
[("Kind", "Foo")],
),
);
}
}
fn check(mut format: Emf) {
for _ in 0..3 {
let mut output = Vec::new();
format.format(&TestEntry, &mut output).unwrap();
let mut output: Vec<&[u8]> = output.split(|c| *c == b'\n').collect();
assert_eq!(output.pop().unwrap(), b"");
assert_eq!(output.len(), 1);
output.sort();
eprintln!("{}", str::from_utf8(&output[0]).unwrap());
let json: serde_json::Value = serde_json::from_slice(&output[0]).unwrap();
let expected = r#"
{
"AWSAccountId": "012345678901",
"Kind": "Foo",
"MeanValue": {"Values":[2], "Counts":[3]},
"_aws": {
"CloudWatchMetrics": [
{
"Namespace": "TestNS",
"Dimensions": [
[
"Kind"
],
[
"AWSAccountId",
"Kind"
]
],
"Metrics": [
{
"Name": "MeanValue",
"Unit": "Seconds"
}
]
}
],
"Timestamp": 1749475336015
}
}
"#;
assert_json_diff::assert_json_eq!(
json,
serde_json::from_str::<serde_json::Value>(&expected).unwrap()
);
}
}
check(Emf::no_validations(
"TestNS".to_string(),
vec![vec![], vec!["AWSAccountId".to_string()]],
));
check(Emf::all_validations(
"TestNS".to_string(),
vec![vec![], vec!["AWSAccountId".to_string()]],
));
}
#[test]
fn formats_dimensions_only_no_metric() {
struct TestEntry;
impl Entry for TestEntry {
fn write<'a>(&'a self, writer: &mut impl EntryWriter<'a>) {
writer.timestamp(
SystemTime::UNIX_EPOCH + Duration::from_secs_f64(1749475336.0157819),
);
writer.config(const { &AllowSplitEntries::new() });
writer.value("AWSAccountId", "012345678901");
writer.value(
"MeanValue",
&NoMetric::from(WithDimension::new_with_dimensions(
Mean::<Second>::from_iter([1u8, 2, 3]),
[("Kind", "Foo")],
)),
);
}
}
fn check(mut format: Emf) {
for _ in 0..3 {
let mut output = Vec::new();
format.format(&TestEntry, &mut output).unwrap();
let mut output: Vec<&[u8]> = output.split(|c| *c == b'\n').collect();
assert_eq!(output.pop().unwrap(), b"");
assert_eq!(output.len(), 1);
output.sort();
let json: serde_json::Value = serde_json::from_slice(&output[0]).unwrap();
let expected = r#"
{
"AWSAccountId": "012345678901",
"Kind": "Foo",
"MeanValue": {"Values":[2], "Counts":[3]},
"_aws": {
"CloudWatchMetrics": [
{
"Namespace": "TestNS",
"Dimensions": [
[
"Kind"
],
[
"AWSAccountId",
"Kind"
]
],
"Metrics": []
}
],
"Timestamp": 1749475336015
}
}
"#;
assert_eq!(
json,
serde_json::from_str::<serde_json::Value>(&expected).unwrap()
);
}
}
check(Emf::no_validations(
"TestNS".to_string(),
vec![vec![], vec!["AWSAccountId".to_string()]],
));
check(Emf::all_validations(
"TestNS".to_string(),
vec![vec![], vec!["AWSAccountId".to_string()]],
));
}
#[test]
fn formats_empty() {
struct TestEntry;
impl Entry for TestEntry {
fn write<'a>(&'a self, writer: &mut impl EntryWriter<'a>) {
writer.timestamp(
SystemTime::UNIX_EPOCH + Duration::from_secs_f64(1749475336.0157819),
);
}
}
fn check(mut format: Emf) {
for _ in 0..3 {
let mut output = Vec::new();
format.format(&TestEntry, &mut output).unwrap();
let mut output: Vec<&[u8]> = output.split(|c| *c == b'\n').collect();
assert_eq!(output.pop().unwrap(), b"");
assert_eq!(output.len(), 1);
output.sort();
let json: serde_json::Value = serde_json::from_slice(&output[0]).unwrap();
let expected = r#"
{
"_aws": {
"CloudWatchMetrics": [
{
"Namespace": "TestNS",
"Dimensions": [[]],
"Metrics": []
}
],
"Timestamp": 1749475336015
}
}
"#;
assert_json_diff::assert_json_eq!(
json,
serde_json::from_str::<serde_json::Value>(&expected).unwrap()
);
}
}
check(Emf::no_validations("TestNS".to_string(), vec![vec![]]));
check(Emf::all_validations("TestNS".to_string(), vec![vec![]]));
}
const STORAGE_HIRES: &'static EmfOptions = &EmfOptions {
storage_mode: StorageMode::HighStorageResolution,
};
const STORAGE_NO_METRIC: &'static EmfOptions = &EmfOptions {
storage_mode: StorageMode::NoMetric,
};
#[rstest]
#[case(STORAGE_HIRES, STORAGE_HIRES, STORAGE_HIRES)]
#[case(STORAGE_HIRES, STORAGE_NO_METRIC, STORAGE_NO_METRIC)]
#[case(STORAGE_NO_METRIC, STORAGE_HIRES, STORAGE_NO_METRIC)]
fn test_try_merge(
#[case] lhs: &EmfOptions,
#[case] rhs: &EmfOptions,
#[case] result: &EmfOptions,
) {
assert_eq!(
MetricFlags::upcast(lhs)
.try_merge(MetricFlags::upcast(rhs))
.downcast::<EmfOptions>()
.unwrap()
.storage_mode,
result.storage_mode
);
}
#[test]
fn test_force_storage_resolution_emf() {
use std::time::{Duration, SystemTime};
struct TestEntry;
impl Entry for TestEntry {
fn write<'a>(&'a self, writer: &mut impl metrique_writer_core::EntryWriter<'a>) {
writer.timestamp(SystemTime::UNIX_EPOCH + Duration::from_secs_f64(12345.6789));
writer.value("Time", &Duration::from_millis(42));
writer.value("Operation", "Foo");
writer.value("BasicIntCount", &1234u64);
writer.value("BasicFloatCount", &5.4321f64);
writer.value("SomeDuration", &Duration::from_micros(12345678));
writer.value(
"SomeRepeatedDuration",
&Mean::<Millisecond>::from_iter([1u32, 2, 3]),
);
writer.value(
"RepeatedDuration",
&Distribution::<_, 2>::from_iter([
Duration::from_micros(10),
Duration::from_micros(170),
]),
);
writer.value(
"CounterWithUnit",
&HighStorageResolution::from(99u64.with_unit::<Kilobyte>()),
);
writer.value("MeanValue", &Mean::<Second>::from_iter([1u8, 2, 3]));
}
}
let mut output = Vec::new();
let stream = Emf::all_validations("MyNS".to_owned(), vec![vec![]]).output_to(&mut output);
let mut stream = HighStorageResolution::from(stream);
stream.next(&TestEntry).unwrap();
stream.flush().unwrap();
let json: serde_json::Value = serde_json::from_slice(&output).unwrap();
let expected = r#"
{
"_aws": {
"CloudWatchMetrics": [
{
"Namespace": "MyNS",
"Dimensions": [
[]
],
"Metrics": [
{
"Name": "Time",
"Unit": "Milliseconds",
"StorageResolution": 1
},
{
"Name": "BasicIntCount",
"StorageResolution": 1
},
{
"Name": "BasicFloatCount",
"StorageResolution": 1
},
{
"Name": "SomeDuration",
"Unit": "Milliseconds",
"StorageResolution": 1
},
{
"Name": "SomeRepeatedDuration",
"Unit": "Milliseconds",
"StorageResolution": 1
},
{
"Name": "RepeatedDuration",
"Unit": "Milliseconds",
"StorageResolution": 1
},
{
"Name": "CounterWithUnit",
"Unit": "Kilobytes",
"StorageResolution": 1
},
{
"Name": "MeanValue",
"Unit": "Seconds",
"StorageResolution": 1
}
]
}
],
"Timestamp": 12345678
},
"Time": 42,
"BasicIntCount": 1234,
"BasicFloatCount": 5.4321,
"SomeDuration": 12345.678,
"SomeRepeatedDuration": {
"Values": [
2
],
"Counts": [
3
]
},
"RepeatedDuration": {
"Values": [
0.01,
0.17
],
"Counts": [
1,
1
]
},
"CounterWithUnit": 99,
"MeanValue": {
"Values": [
2
],
"Counts": [
3
]
},
"Operation": "Foo"
}
"#;
assert_json_diff::assert_json_eq!(
json,
serde_json::from_str::<serde_json::Value>(&expected).unwrap()
);
}
#[rstest]
#[case("Foo", "Region", true)]
#[case("Foo", "_aws", false)]
#[case("Foo", "MetriqueValidationError", false)]
#[case("MetriqueValidationError", "Region", true)]
fn test_report_error(#[case] dim: &str, #[case] merged_dim: &str, #[case] is_valid: bool) {
struct MergeUsEast1<'a> {
dim: &'a str,
}
impl Entry for MergeUsEast1<'_> {
fn write<'a>(&'a self, writer: &mut impl EntryWriter<'a>) {
writer.value(self.dim, "us-east-1");
}
}
let writer = Emf::all_validations("Foo".into(), vec![vec![dim.into()]]);
let mut w1 = vec![];
let res = writer
.output_to(&mut w1)
.merge_globals(MergeUsEast1 { dim: merged_dim })
.report_error("basic error");
if is_valid {
res.unwrap();
} else {
res.unwrap_err();
return;
}
let mut actual =
serde_json::from_str::<serde_json::Value>(&String::from_utf8(w1).unwrap()).unwrap();
actual["_aws"]["Timestamp"] = 0.into();
let expected = serde_json::json!({
"_aws": serde_json::json!({
"CloudWatchMetrics": [{
"Namespace": "Foo",
"Dimensions": [[dim]],
"Metrics": []
}],
"Timestamp": 0,
}),
merged_dim: "us-east-1",
"MetriqueValidationError": "basic error"
});
assert_json_eq!(expected, actual);
}
#[test]
fn trailing_nan_in_distribution_produces_valid_json() {
struct TestEntry;
impl Entry for TestEntry {
fn write<'a>(&'a self, writer: &mut impl EntryWriter<'a>) {
writer.timestamp(SystemTime::UNIX_EPOCH + Duration::from_secs(1700000000));
writer.value(
"MetricWithTrailingNaN",
&Distribution::<f64>::from_iter([1.0, 2.0, f64::NAN]),
);
}
}
let mut emf = Emf::builder("TestNS".to_string(), vec![vec![]]).build();
let mut output = Vec::new();
emf.format(&TestEntry, &mut output).unwrap();
let json: serde_json::Value = serde_json::from_slice(&output).unwrap_or_else(|e| {
panic!(
"EMF produced invalid JSON: {e}\nOutput: {}",
String::from_utf8_lossy(&output)
);
});
assert_json_eq!(
json["MetricWithTrailingNaN"],
serde_json::json!({
"Values": [1, 2],
"Counts": [1, 1],
})
);
}
}