use std::collections::{BTreeMap, BTreeSet};
use super::normalize::{NormalizedEntry, NormalizedFile};
use super::AfterClause;
use crate::config::{
BurstConfig, CardinalitySpikeConfig, DistributionConfig, DynamicLabelConfig, GapConfig,
};
use crate::encoder::EncoderConfig;
use crate::generator::{GeneratorConfig, LogGeneratorConfig};
use crate::packs::{MetricOverride, MetricPackDef};
use crate::sink::SinkConfig;
#[derive(Debug, thiserror::Error)]
#[non_exhaustive]
pub enum ExpandError {
#[error("pack '{reference}' could not be resolved: {message}")]
ResolveFailed {
reference: String,
message: String,
},
#[error(
"override references unknown metric '{key}'; pack '{pack_name}' contains: {available}"
)]
UnknownOverrideKey {
key: String,
pack_name: String,
available: String,
},
#[error("pack '{pack_name}' contains no metrics")]
EmptyPack {
pack_name: String,
},
#[error(
"duplicate entry id '{id}' after pack expansion: \
{first_source} conflicts with {second_source}"
)]
DuplicateEntryId {
id: String,
first_source: String,
second_source: String,
},
}
pub trait PackResolver {
fn resolve(&self, reference: &str) -> Result<MetricPackDef, PackResolveError>;
}
#[derive(Debug, thiserror::Error)]
#[error("{message}")]
pub struct PackResolveError {
pub message: String,
pub origin: PackResolveOrigin,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PackResolveOrigin {
Name,
FilePath,
}
impl PackResolveError {
pub fn new(message: impl Into<String>, origin: PackResolveOrigin) -> Self {
Self {
message: message.into(),
origin,
}
}
}
pub fn classify_pack_reference(reference: &str) -> PackResolveOrigin {
if reference.contains('/') || reference.starts_with('.') {
PackResolveOrigin::FilePath
} else {
PackResolveOrigin::Name
}
}
#[derive(Debug, Default, Clone)]
pub struct InMemoryPackResolver {
packs: BTreeMap<String, MetricPackDef>,
}
impl InMemoryPackResolver {
pub fn new() -> Self {
Self::default()
}
pub fn insert(&mut self, reference: impl Into<String>, pack: MetricPackDef) {
self.packs.insert(reference.into(), pack);
}
}
impl PackResolver for InMemoryPackResolver {
fn resolve(&self, reference: &str) -> Result<MetricPackDef, PackResolveError> {
match self.packs.get(reference) {
Some(pack) => Ok(pack.clone()),
None => Err(PackResolveError::new(
format!("pack reference '{reference}' not found in resolver"),
classify_pack_reference(reference),
)),
}
}
}
#[derive(Debug, Clone)]
#[cfg_attr(feature = "config", derive(serde::Serialize))]
pub struct ExpandedFile {
pub version: u32,
pub entries: Vec<ExpandedEntry>,
}
#[derive(Debug, Clone)]
#[cfg_attr(feature = "config", derive(serde::Serialize))]
pub struct ExpandedEntry {
pub id: Option<String>,
pub signal_type: String,
pub name: String,
pub rate: f64,
pub duration: Option<String>,
pub generator: Option<GeneratorConfig>,
pub log_generator: Option<LogGeneratorConfig>,
pub labels: Option<BTreeMap<String, String>>,
pub dynamic_labels: Option<Vec<DynamicLabelConfig>>,
pub encoder: EncoderConfig,
pub sink: SinkConfig,
pub jitter: Option<f64>,
pub jitter_seed: Option<u64>,
pub gaps: Option<GapConfig>,
pub bursts: Option<BurstConfig>,
pub cardinality_spikes: Option<Vec<CardinalitySpikeConfig>>,
pub phase_offset: Option<String>,
pub clock_group: Option<String>,
pub after: Option<AfterClause>,
pub distribution: Option<DistributionConfig>,
pub buckets: Option<Vec<f64>>,
pub quantiles: Option<Vec<f64>>,
pub observations_per_tick: Option<u32>,
pub mean_shift_per_sec: Option<f64>,
pub seed: Option<u64>,
}
pub fn expand<R: PackResolver>(
file: NormalizedFile,
resolver: &R,
) -> Result<ExpandedFile, ExpandError> {
let defaults_labels = file.defaults_labels;
let mut entries: Vec<ExpandedEntry> = Vec::with_capacity(file.entries.len());
let mut id_registry: BTreeMap<String, String> = BTreeMap::new();
for (index, entry) in file.entries.into_iter().enumerate() {
if entry.pack.is_some() {
expand_pack_entry(
entry,
index,
defaults_labels.as_ref(),
resolver,
&mut entries,
&mut id_registry,
)?;
} else {
let expanded = expand_inline_entry(entry);
if let Some(id) = expanded.id.as_ref() {
record_id(&mut id_registry, id, format!("inline entry '{id}'"))?;
}
entries.push(expanded);
}
}
Ok(ExpandedFile {
version: file.version,
entries,
})
}
fn record_id(
registry: &mut BTreeMap<String, String>,
id: &str,
source: String,
) -> Result<(), ExpandError> {
if let Some(prior) = registry.get(id) {
return Err(ExpandError::DuplicateEntryId {
id: id.to_string(),
first_source: prior.clone(),
second_source: source,
});
}
registry.insert(id.to_string(), source);
Ok(())
}
fn expand_inline_entry(entry: NormalizedEntry) -> ExpandedEntry {
ExpandedEntry {
id: entry.id,
signal_type: entry.signal_type,
name: entry.name.unwrap_or_default(),
rate: entry.rate,
duration: entry.duration,
generator: entry.generator,
log_generator: entry.log_generator,
labels: entry.labels,
dynamic_labels: entry.dynamic_labels,
encoder: entry.encoder,
sink: entry.sink,
jitter: entry.jitter,
jitter_seed: entry.jitter_seed,
gaps: entry.gaps,
bursts: entry.bursts,
cardinality_spikes: entry.cardinality_spikes,
phase_offset: entry.phase_offset,
clock_group: entry.clock_group,
after: entry.after,
distribution: entry.distribution,
buckets: entry.buckets,
quantiles: entry.quantiles,
observations_per_tick: entry.observations_per_tick,
mean_shift_per_sec: entry.mean_shift_per_sec,
seed: entry.seed,
}
}
fn expand_pack_entry<R: PackResolver>(
entry: NormalizedEntry,
entry_index: usize,
defaults_labels: Option<&BTreeMap<String, String>>,
resolver: &R,
out: &mut Vec<ExpandedEntry>,
id_registry: &mut BTreeMap<String, String>,
) -> Result<(), ExpandError> {
let reference = entry
.pack
.as_deref()
.expect("expand_pack_entry called with non-pack entry; caller must check");
let pack = resolver
.resolve(reference)
.map_err(|e| ExpandError::ResolveFailed {
reference: reference.to_string(),
message: e.message,
})?;
if pack.metrics.is_empty() {
return Err(ExpandError::EmptyPack {
pack_name: pack.name,
});
}
validate_override_keys(&pack, entry.overrides.as_ref())?;
let (effective_entry_id, effective_id_source) = match entry.id.clone() {
Some(id) => (id.clone(), format!("pack entry '{id}' (user-provided id)")),
None => {
let synthesized = format!("{}_{}", pack.name, entry_index);
(
synthesized.clone(),
format!(
"pack entry at index {entry_index} (auto-generated id '{synthesized}' \
from pack '{}')",
pack.name
),
)
}
};
record_id(id_registry, &effective_entry_id, effective_id_source)?;
let duplicate_metric_names = duplicate_metric_names(&pack);
for (spec_index, metric) in pack.metrics.iter().enumerate() {
let override_for_metric = entry
.overrides
.as_ref()
.and_then(|map| map.get(&metric.name));
let labels = compose_pack_metric_labels(
defaults_labels,
pack.shared_labels.as_ref(),
metric.labels.as_ref(),
entry.labels.as_ref(),
override_for_metric.and_then(|o| o.labels.as_ref()),
);
let generator = select_pack_metric_generator(metric, override_for_metric);
let after = override_for_metric
.and_then(|o| o.after.clone())
.or_else(|| entry.after.clone());
let sub_signal_id = if duplicate_metric_names.contains(metric.name.as_str()) {
format!("{}.{}#{}", effective_entry_id, metric.name, spec_index)
} else {
format!("{}.{}", effective_entry_id, metric.name)
};
record_id(
id_registry,
&sub_signal_id,
format!(
"pack sub-signal '{sub_signal_id}' (pack '{}', metric '{}' at index {spec_index})",
pack.name, metric.name
),
)?;
out.push(ExpandedEntry {
id: Some(sub_signal_id),
signal_type: "metrics".to_string(),
name: metric.name.clone(),
rate: entry.rate,
duration: entry.duration.clone(),
generator: Some(generator),
log_generator: None,
labels,
dynamic_labels: entry.dynamic_labels.clone(),
encoder: entry.encoder.clone(),
sink: entry.sink.clone(),
jitter: entry.jitter,
jitter_seed: entry.jitter_seed,
gaps: entry.gaps.clone(),
bursts: entry.bursts.clone(),
cardinality_spikes: entry.cardinality_spikes.clone(),
phase_offset: entry.phase_offset.clone(),
clock_group: entry.clock_group.clone(),
after,
distribution: None,
buckets: None,
quantiles: None,
observations_per_tick: None,
mean_shift_per_sec: None,
seed: None,
});
}
Ok(())
}
fn duplicate_metric_names(pack: &MetricPackDef) -> BTreeSet<&str> {
let mut seen: BTreeSet<&str> = BTreeSet::new();
let mut duplicates: BTreeSet<&str> = BTreeSet::new();
for metric in &pack.metrics {
if !seen.insert(metric.name.as_str()) {
duplicates.insert(metric.name.as_str());
}
}
duplicates
}
fn validate_override_keys(
pack: &MetricPackDef,
overrides: Option<&BTreeMap<String, MetricOverride>>,
) -> Result<(), ExpandError> {
let Some(overrides) = overrides else {
return Ok(());
};
if overrides.is_empty() {
return Ok(());
}
let metric_names: BTreeSet<&str> = pack.metrics.iter().map(|m| m.name.as_str()).collect();
for key in overrides.keys() {
if !metric_names.contains(key.as_str()) {
let available: Vec<&str> = pack.metrics.iter().map(|m| m.name.as_str()).collect();
return Err(ExpandError::UnknownOverrideKey {
key: key.clone(),
pack_name: pack.name.clone(),
available: available.join(", "),
});
}
}
Ok(())
}
fn compose_pack_metric_labels(
defaults_labels: Option<&BTreeMap<String, String>>,
pack_shared_labels: Option<&std::collections::HashMap<String, String>>,
pack_metric_labels: Option<&std::collections::HashMap<String, String>>,
entry_labels: Option<&BTreeMap<String, String>>,
override_labels: Option<&BTreeMap<String, String>>,
) -> Option<BTreeMap<String, String>> {
let mut merged: BTreeMap<String, String> = BTreeMap::new();
if let Some(src) = defaults_labels {
for (k, v) in src {
merged.insert(k.clone(), v.clone());
}
}
if let Some(src) = pack_shared_labels {
for (k, v) in src {
merged.insert(k.clone(), v.clone());
}
}
if let Some(src) = pack_metric_labels {
for (k, v) in src {
merged.insert(k.clone(), v.clone());
}
}
if let Some(src) = entry_labels {
for (k, v) in src {
merged.insert(k.clone(), v.clone());
}
}
if let Some(src) = override_labels {
for (k, v) in src {
merged.insert(k.clone(), v.clone());
}
}
if merged.is_empty() {
None
} else {
Some(merged)
}
}
fn select_pack_metric_generator(
metric: &crate::packs::MetricSpec,
metric_override: Option<&MetricOverride>,
) -> GeneratorConfig {
if let Some(over) = metric_override {
if let Some(gen) = over.generator.clone() {
return gen;
}
}
metric
.generator
.clone()
.unwrap_or(GeneratorConfig::Constant { value: 0.0 })
}
#[cfg(test)]
mod tests {
use super::*;
use crate::compiler::normalize::normalize;
use crate::compiler::parse::parse;
use crate::compiler::AfterOp;
use crate::packs::MetricSpec;
use std::collections::HashMap;
fn telegraf_pack() -> MetricPackDef {
let mut shared = HashMap::new();
shared.insert("device".to_string(), String::new());
shared.insert("job".to_string(), "snmp".to_string());
MetricPackDef {
name: "telegraf_snmp_interface".to_string(),
description: "test".to_string(),
category: "network".to_string(),
shared_labels: Some(shared),
metrics: vec![
MetricSpec {
name: "ifOperStatus".to_string(),
labels: None,
generator: Some(GeneratorConfig::Constant { value: 1.0 }),
},
MetricSpec {
name: "ifHCInOctets".to_string(),
labels: None,
generator: Some(GeneratorConfig::Step {
start: Some(0.0),
step_size: 125_000.0,
max: None,
}),
},
],
}
}
fn node_cpu_pack() -> MetricPackDef {
let mut shared = HashMap::new();
shared.insert("job".to_string(), "node_exporter".to_string());
let mut user_labels = HashMap::new();
user_labels.insert("mode".to_string(), "user".to_string());
let mut system_labels = HashMap::new();
system_labels.insert("mode".to_string(), "system".to_string());
MetricPackDef {
name: "node_exporter_cpu".to_string(),
description: "test".to_string(),
category: "infrastructure".to_string(),
shared_labels: Some(shared),
metrics: vec![
MetricSpec {
name: "node_cpu_seconds_total".to_string(),
labels: Some(user_labels),
generator: Some(GeneratorConfig::Step {
start: Some(0.0),
step_size: 0.25,
max: None,
}),
},
MetricSpec {
name: "node_cpu_seconds_total".to_string(),
labels: Some(system_labels),
generator: Some(GeneratorConfig::Step {
start: Some(0.0),
step_size: 0.10,
max: None,
}),
},
],
}
}
fn expand_yaml(yaml: &str, resolver: &InMemoryPackResolver) -> ExpandedFile {
let parsed = parse(yaml).expect("parse must succeed");
let normalized = normalize(parsed).expect("normalize must succeed");
expand(normalized, resolver).expect("expand must succeed")
}
#[rustfmt::skip]
#[rstest::rstest]
#[case::plain_name("telegraf_snmp_interface", PackResolveOrigin::Name)]
#[case::dot_relative("./packs/custom.yaml", PackResolveOrigin::FilePath)]
#[case::absolute_path("/abs/path/pack.yaml", PackResolveOrigin::FilePath)]
#[case::plain_relative("rel/pack.yaml", PackResolveOrigin::FilePath)]
fn classify_pack_reference_distinguishes_name_and_file_path(
#[case] reference: &str,
#[case] expected: PackResolveOrigin,
) {
assert_eq!(classify_pack_reference(reference), expected);
}
#[test]
fn in_memory_resolver_returns_registered_pack() {
let mut r = InMemoryPackResolver::new();
r.insert("telegraf_snmp_interface", telegraf_pack());
let def = r.resolve("telegraf_snmp_interface").expect("must resolve");
assert_eq!(def.name, "telegraf_snmp_interface");
}
#[test]
fn in_memory_resolver_errors_on_missing_reference() {
let r = InMemoryPackResolver::new();
let err = r.resolve("nope").expect_err("must error");
assert_eq!(err.origin, PackResolveOrigin::Name);
assert!(err.message.contains("nope"));
}
#[test]
fn in_memory_resolver_classifies_file_paths() {
let r = InMemoryPackResolver::new();
let err = r.resolve("./no-such.yaml").expect_err("must error");
assert_eq!(err.origin, PackResolveOrigin::FilePath);
}
#[test]
fn expand_produces_one_entry_per_pack_metric() {
let yaml = r#"
version: 2
defaults:
rate: 1
scenarios:
- id: primary
signal_type: metrics
pack: telegraf_snmp_interface
"#;
let mut resolver = InMemoryPackResolver::new();
resolver.insert("telegraf_snmp_interface", telegraf_pack());
let expanded = expand_yaml(yaml, &resolver);
assert_eq!(expanded.entries.len(), 2);
assert_eq!(expanded.entries[0].name, "ifOperStatus");
assert_eq!(expanded.entries[1].name, "ifHCInOctets");
}
#[test]
fn expanded_signal_type_is_metrics() {
let yaml = r#"
version: 2
defaults: { rate: 1 }
scenarios:
- signal_type: metrics
pack: telegraf_snmp_interface
"#;
let mut resolver = InMemoryPackResolver::new();
resolver.insert("telegraf_snmp_interface", telegraf_pack());
let expanded = expand_yaml(yaml, &resolver);
for e in &expanded.entries {
assert_eq!(e.signal_type, "metrics");
}
}
#[rustfmt::skip]
#[rstest::rstest]
#[case::user_supplied_entry_id(r#"
version: 2
defaults: { rate: 1 }
scenarios:
- id: primary
signal_type: metrics
pack: telegraf_snmp_interface
"#, "primary.ifOperStatus", "primary.ifHCInOctets")]
#[case::auto_generated_entry_id(r#"
version: 2
defaults: { rate: 1 }
scenarios:
- signal_type: metrics
pack: telegraf_snmp_interface
"#, "telegraf_snmp_interface_0.ifOperStatus", "telegraf_snmp_interface_0.ifHCInOctets")]
fn sub_signal_ids_follow_effective_entry_id(
#[case] yaml: &str,
#[case] expected_first: &str,
#[case] expected_second: &str,
) {
let mut resolver = InMemoryPackResolver::new();
resolver.insert("telegraf_snmp_interface", telegraf_pack());
let expanded = expand_yaml(yaml, &resolver);
assert_eq!(expanded.entries[0].id.as_deref(), Some(expected_first));
assert_eq!(expanded.entries[1].id.as_deref(), Some(expected_second));
}
#[test]
fn two_anonymous_pack_entries_disambiguate_by_index() {
let yaml = r#"
version: 2
defaults: { rate: 1 }
scenarios:
- signal_type: metrics
pack: telegraf_snmp_interface
- signal_type: metrics
pack: telegraf_snmp_interface
"#;
let mut resolver = InMemoryPackResolver::new();
resolver.insert("telegraf_snmp_interface", telegraf_pack());
let expanded = expand_yaml(yaml, &resolver);
let ids: Vec<_> = expanded
.entries
.iter()
.filter_map(|e| e.id.as_deref())
.collect();
assert!(ids.contains(&"telegraf_snmp_interface_0.ifOperStatus"));
assert!(ids.contains(&"telegraf_snmp_interface_1.ifOperStatus"));
let mut sorted = ids.clone();
sorted.sort();
sorted.dedup();
assert_eq!(sorted.len(), ids.len(), "ids must be unique");
}
#[test]
fn label_precedence_chain_applied_in_order() {
let mut shared = HashMap::new();
shared.insert("region".to_string(), "shared-region".to_string());
shared.insert("job".to_string(), "snmp".to_string());
let mut metric_labels = HashMap::new();
metric_labels.insert("region".to_string(), "metric-region".to_string());
let pack = MetricPackDef {
name: "p".to_string(),
description: "t".to_string(),
category: "c".to_string(),
shared_labels: Some(shared),
metrics: vec![MetricSpec {
name: "m".to_string(),
labels: Some(metric_labels),
generator: Some(GeneratorConfig::Constant { value: 0.0 }),
}],
};
let mut resolver = InMemoryPackResolver::new();
resolver.insert("p", pack);
let yaml = r#"
version: 2
defaults:
rate: 1
labels:
region: defaults-region
env: prod
scenarios:
- id: e
signal_type: metrics
pack: p
labels:
region: entry-region
device: rtr-01
overrides:
m:
labels:
region: override-region
"#;
let expanded = expand_yaml(yaml, &resolver);
let labels = expanded.entries[0].labels.as_ref().unwrap();
assert_eq!(labels.get("region").unwrap(), "override-region");
assert_eq!(labels.get("env").unwrap(), "prod");
assert_eq!(labels.get("job").unwrap(), "snmp");
assert_eq!(labels.get("device").unwrap(), "rtr-01");
}
#[test]
fn defaults_labels_flow_into_pack_metric_labels() {
let yaml = r#"
version: 2
defaults:
rate: 1
labels:
env: prod
scenarios:
- id: p
signal_type: metrics
pack: telegraf_snmp_interface
"#;
let mut resolver = InMemoryPackResolver::new();
resolver.insert("telegraf_snmp_interface", telegraf_pack());
let expanded = expand_yaml(yaml, &resolver);
let labels = expanded.entries[0].labels.as_ref().unwrap();
assert_eq!(labels.get("env").unwrap(), "prod");
}
#[test]
fn pack_shared_labels_override_defaults_labels() {
let mut shared = HashMap::new();
shared.insert("job".to_string(), "snmp".to_string());
let pack = MetricPackDef {
name: "p".to_string(),
description: "t".to_string(),
category: "c".to_string(),
shared_labels: Some(shared),
metrics: vec![MetricSpec {
name: "m".to_string(),
labels: None,
generator: Some(GeneratorConfig::Constant { value: 0.0 }),
}],
};
let mut resolver = InMemoryPackResolver::new();
resolver.insert("p", pack);
let yaml = r#"
version: 2
defaults:
rate: 1
labels:
job: web
scenarios:
- signal_type: metrics
pack: p
"#;
let expanded = expand_yaml(yaml, &resolver);
let labels = expanded.entries[0].labels.as_ref().unwrap();
assert_eq!(labels.get("job").unwrap(), "snmp");
}
#[test]
fn inline_entry_labels_pass_through_unchanged() {
let yaml = r#"
version: 2
defaults:
rate: 1
labels:
env: prod
scenarios:
- signal_type: metrics
name: cpu
generator: { type: constant, value: 1 }
labels:
instance: web-01
"#;
let resolver = InMemoryPackResolver::new();
let expanded = expand_yaml(yaml, &resolver);
let labels = expanded.entries[0].labels.as_ref().unwrap();
assert_eq!(labels.get("env").unwrap(), "prod");
assert_eq!(labels.get("instance").unwrap(), "web-01");
assert_eq!(labels.len(), 2);
}
#[test]
fn override_generator_replaces_pack_generator() {
let yaml = r#"
version: 2
defaults: { rate: 1 }
scenarios:
- id: e
signal_type: metrics
pack: telegraf_snmp_interface
overrides:
ifOperStatus:
generator:
type: flap
up_duration: 60s
down_duration: 30s
"#;
let mut resolver = InMemoryPackResolver::new();
resolver.insert("telegraf_snmp_interface", telegraf_pack());
let expanded = expand_yaml(yaml, &resolver);
assert!(matches!(
expanded.entries[0].generator.as_ref().unwrap(),
GeneratorConfig::Flap { .. }
));
assert!(matches!(
expanded.entries[1].generator.as_ref().unwrap(),
GeneratorConfig::Step { .. }
));
}
#[test]
fn missing_generator_falls_back_to_constant_zero() {
let pack = MetricPackDef {
name: "p".to_string(),
description: "t".to_string(),
category: "c".to_string(),
shared_labels: None,
metrics: vec![MetricSpec {
name: "x".to_string(),
labels: None,
generator: None,
}],
};
let mut resolver = InMemoryPackResolver::new();
resolver.insert("p", pack);
let yaml = r#"
version: 2
defaults: { rate: 1 }
scenarios:
- signal_type: metrics
pack: p
"#;
let expanded = expand_yaml(yaml, &resolver);
match expanded.entries[0].generator.as_ref().unwrap() {
GeneratorConfig::Constant { value } => assert_eq!(*value, 0.0),
other => panic!("expected constant(0), got {other:?}"),
}
}
#[test]
fn entry_level_after_propagates_to_every_metric() {
let yaml = r#"
version: 2
defaults: { rate: 1 }
scenarios:
- id: tail
signal_type: metrics
pack: telegraf_snmp_interface
after:
ref: head
op: ">"
value: 5
"#;
let mut resolver = InMemoryPackResolver::new();
resolver.insert("telegraf_snmp_interface", telegraf_pack());
let expanded = expand_yaml(yaml, &resolver);
for e in &expanded.entries {
let after = e.after.as_ref().expect("after must be propagated");
assert_eq!(after.ref_id, "head");
assert!(matches!(after.op, AfterOp::GreaterThan));
}
}
#[test]
fn override_after_replaces_entry_after_for_that_metric() {
let yaml = r#"
version: 2
defaults: { rate: 1 }
scenarios:
- id: tail
signal_type: metrics
pack: telegraf_snmp_interface
after:
ref: head
op: ">"
value: 5
overrides:
ifOperStatus:
after:
ref: other
op: "<"
value: 1
"#;
let mut resolver = InMemoryPackResolver::new();
resolver.insert("telegraf_snmp_interface", telegraf_pack());
let expanded = expand_yaml(yaml, &resolver);
let oper = expanded
.entries
.iter()
.find(|e| e.name == "ifOperStatus")
.unwrap();
assert_eq!(oper.after.as_ref().unwrap().ref_id, "other");
let in_octets = expanded
.entries
.iter()
.find(|e| e.name == "ifHCInOctets")
.unwrap();
assert_eq!(in_octets.after.as_ref().unwrap().ref_id, "head");
}
#[test]
fn schedule_delivery_fields_propagate_to_every_metric() {
let yaml = r#"
version: 2
defaults:
rate: 1
duration: 2m
scenarios:
- id: p
signal_type: metrics
pack: telegraf_snmp_interface
phase_offset: 5s
clock_group: uplink
jitter: 0.2
jitter_seed: 42
gaps:
every: 2m
for: 20s
bursts:
every: 5m
for: 30s
multiplier: 10
"#;
let mut resolver = InMemoryPackResolver::new();
resolver.insert("telegraf_snmp_interface", telegraf_pack());
let expanded = expand_yaml(yaml, &resolver);
for e in &expanded.entries {
assert_eq!(e.rate, 1.0);
assert_eq!(e.duration.as_deref(), Some("2m"));
assert_eq!(e.phase_offset.as_deref(), Some("5s"));
assert_eq!(e.clock_group.as_deref(), Some("uplink"));
assert_eq!(e.jitter, Some(0.2));
assert_eq!(e.jitter_seed, Some(42));
assert!(e.gaps.is_some());
assert!(e.bursts.is_some());
}
}
#[test]
fn expanded_entries_have_no_pack_field() {
let yaml = r#"
version: 2
defaults: { rate: 1 }
scenarios:
- signal_type: metrics
pack: telegraf_snmp_interface
"#;
let mut resolver = InMemoryPackResolver::new();
resolver.insert("telegraf_snmp_interface", telegraf_pack());
let expanded = expand_yaml(yaml, &resolver);
assert!(expanded.entries.iter().all(|e| e.generator.is_some()));
}
#[test]
fn unknown_override_key_is_an_error() {
let yaml = r#"
version: 2
defaults: { rate: 1 }
scenarios:
- signal_type: metrics
pack: telegraf_snmp_interface
overrides:
not_a_metric:
generator:
type: constant
value: 0
"#;
let mut resolver = InMemoryPackResolver::new();
resolver.insert("telegraf_snmp_interface", telegraf_pack());
let parsed = parse(yaml).expect("parse");
let normalized = normalize(parsed).expect("normalize");
let err = expand(normalized, &resolver).expect_err("must fail");
match err {
ExpandError::UnknownOverrideKey {
key,
pack_name,
available,
} => {
assert_eq!(key, "not_a_metric");
assert_eq!(pack_name, "telegraf_snmp_interface");
assert!(available.contains("ifOperStatus"));
}
other => panic!("wrong error variant: {other:?}"),
}
}
#[test]
fn unresolvable_pack_is_an_error() {
let yaml = r#"
version: 2
defaults: { rate: 1 }
scenarios:
- signal_type: metrics
pack: nonexistent
"#;
let resolver = InMemoryPackResolver::new();
let parsed = parse(yaml).expect("parse");
let normalized = normalize(parsed).expect("normalize");
let err = expand(normalized, &resolver).expect_err("must fail");
match err {
ExpandError::ResolveFailed { reference, message } => {
assert_eq!(reference, "nonexistent");
assert!(message.contains("nonexistent"));
}
other => panic!("wrong error variant: {other:?}"),
}
}
#[test]
fn empty_pack_is_an_error() {
let pack = MetricPackDef {
name: "empty".to_string(),
description: "t".to_string(),
category: "c".to_string(),
shared_labels: None,
metrics: vec![],
};
let mut resolver = InMemoryPackResolver::new();
resolver.insert("empty", pack);
let yaml = r#"
version: 2
defaults: { rate: 1 }
scenarios:
- signal_type: metrics
pack: empty
"#;
let parsed = parse(yaml).expect("parse");
let normalized = normalize(parsed).expect("normalize");
let err = expand(normalized, &resolver).expect_err("must fail");
assert!(matches!(err, ExpandError::EmptyPack { pack_name } if pack_name == "empty"));
}
#[test]
fn inline_entries_pass_through_untouched() {
let yaml = r#"
version: 2
scenarios:
- id: cpu
signal_type: metrics
name: cpu_usage
rate: 2
duration: 60s
generator: { type: constant, value: 1 }
labels: { instance: web-01 }
"#;
let resolver = InMemoryPackResolver::new();
let expanded = expand_yaml(yaml, &resolver);
assert_eq!(expanded.entries.len(), 1);
let e = &expanded.entries[0];
assert_eq!(e.id.as_deref(), Some("cpu"));
assert_eq!(e.name, "cpu_usage");
assert_eq!(e.rate, 2.0);
assert_eq!(e.duration.as_deref(), Some("60s"));
assert_eq!(
e.labels.as_ref().unwrap().get("instance").unwrap(),
"web-01"
);
}
#[test]
fn mixed_inline_and_pack_entries_interleave_correctly() {
let yaml = r#"
version: 2
defaults: { rate: 1 }
scenarios:
- id: cpu
signal_type: metrics
name: cpu_usage
generator: { type: constant, value: 1 }
- id: net
signal_type: metrics
pack: telegraf_snmp_interface
"#;
let mut resolver = InMemoryPackResolver::new();
resolver.insert("telegraf_snmp_interface", telegraf_pack());
let expanded = expand_yaml(yaml, &resolver);
assert_eq!(expanded.entries.len(), 3);
assert_eq!(expanded.entries[0].id.as_deref(), Some("cpu"));
assert_eq!(expanded.entries[1].id.as_deref(), Some("net.ifOperStatus"));
assert_eq!(expanded.entries[2].id.as_deref(), Some("net.ifHCInOctets"));
}
#[test]
fn repeated_metric_names_produce_one_entry_per_spec_instance() {
let yaml = r#"
version: 2
defaults: { rate: 1 }
scenarios:
- id: cpu
signal_type: metrics
pack: node_exporter_cpu
"#;
let mut resolver = InMemoryPackResolver::new();
resolver.insert("node_exporter_cpu", node_cpu_pack());
let expanded = expand_yaml(yaml, &resolver);
assert_eq!(expanded.entries.len(), 2);
assert_eq!(expanded.entries[0].name, "node_cpu_seconds_total");
assert_eq!(expanded.entries[1].name, "node_cpu_seconds_total");
assert_eq!(
expanded.entries[0]
.labels
.as_ref()
.unwrap()
.get("mode")
.unwrap(),
"user"
);
assert_eq!(
expanded.entries[1]
.labels
.as_ref()
.unwrap()
.get("mode")
.unwrap(),
"system"
);
}
#[test]
fn repeated_metric_names_produce_unique_sub_signal_ids() {
let yaml = r#"
version: 2
defaults: { rate: 1 }
scenarios:
- id: cpu
signal_type: metrics
pack: node_exporter_cpu
"#;
let mut resolver = InMemoryPackResolver::new();
resolver.insert("node_exporter_cpu", node_cpu_pack());
let expanded = expand_yaml(yaml, &resolver);
let ids: Vec<&str> = expanded
.entries
.iter()
.map(|e| {
e.id.as_deref()
.expect("pack-expanded entries always carry an id")
})
.collect();
let mut unique = ids.clone();
unique.sort();
unique.dedup();
assert_eq!(
unique.len(),
ids.len(),
"sub-signal ids must be unique; saw {ids:?}"
);
assert_eq!(ids[0], "cpu.node_cpu_seconds_total#0");
assert_eq!(ids[1], "cpu.node_cpu_seconds_total#1");
}
#[test]
fn unique_metric_names_keep_clean_sub_signal_ids() {
let yaml = r#"
version: 2
defaults: { rate: 1 }
scenarios:
- id: net
signal_type: metrics
pack: telegraf_snmp_interface
"#;
let mut resolver = InMemoryPackResolver::new();
resolver.insert("telegraf_snmp_interface", telegraf_pack());
let expanded = expand_yaml(yaml, &resolver);
let ids: Vec<&str> = expanded
.entries
.iter()
.filter_map(|e| e.id.as_deref())
.collect();
assert_eq!(ids, vec!["net.ifOperStatus", "net.ifHCInOctets"]);
}
#[rustfmt::skip]
#[rstest::rstest]
#[case::inline_first_then_auto(r#"
version: 2
defaults: { rate: 1 }
scenarios:
- id: telegraf_snmp_interface_1
signal_type: metrics
name: cpu
generator: { type: constant, value: 1 }
- signal_type: metrics
pack: telegraf_snmp_interface
"#, "telegraf_snmp_interface_1", "inline entry", "auto-generated")]
#[case::auto_first_then_inline(r#"
version: 2
defaults: { rate: 1 }
scenarios:
- signal_type: metrics
pack: telegraf_snmp_interface
- id: telegraf_snmp_interface_0
signal_type: metrics
name: cpu
generator: { type: constant, value: 1 }
"#, "telegraf_snmp_interface_0", "auto-generated", "inline entry")]
fn duplicate_entry_id_detected_regardless_of_source_order(
#[case] yaml: &str,
#[case] expected_id: &str,
#[case] expected_first_substr: &str,
#[case] expected_second_substr: &str,
) {
let mut resolver = InMemoryPackResolver::new();
resolver.insert("telegraf_snmp_interface", telegraf_pack());
let parsed = parse(yaml).expect("parse");
let normalized = normalize(parsed).expect("normalize");
let err = expand(normalized, &resolver).expect_err("must fail");
match err {
ExpandError::DuplicateEntryId {
id,
first_source,
second_source,
} => {
assert_eq!(id, expected_id);
assert!(
first_source.contains(expected_first_substr),
"unexpected first source: {first_source}"
);
assert!(
second_source.contains(expected_second_substr),
"unexpected second source: {second_source}"
);
}
other => panic!("wrong error variant: {other:?}"),
}
}
#[test]
fn duplicate_entry_id_error_preserves_both_sources() {
let yaml = r#"
version: 2
defaults: { rate: 1 }
scenarios:
- id: telegraf_snmp_interface_1
signal_type: metrics
name: cpu
generator: { type: constant, value: 1 }
- signal_type: metrics
pack: telegraf_snmp_interface
"#;
let mut resolver = InMemoryPackResolver::new();
resolver.insert("telegraf_snmp_interface", telegraf_pack());
let parsed = parse(yaml).expect("parse");
let normalized = normalize(parsed).expect("normalize");
let err = expand(normalized, &resolver).expect_err("must fail");
let rendered = err.to_string();
assert!(
rendered.contains("'telegraf_snmp_interface_1'"),
"error must name the colliding id: {rendered}"
);
assert!(
rendered.contains("inline entry"),
"error must name the inline source: {rendered}"
);
assert!(
rendered.contains("auto-generated"),
"error must name the auto-generated source: {rendered}"
);
}
#[test]
fn pack_by_file_path_is_resolved_through_trait() {
let yaml = r#"
version: 2
defaults: { rate: 1 }
scenarios:
- signal_type: metrics
pack: ./packs/telegraf-snmp-interface.yaml
"#;
let mut resolver = InMemoryPackResolver::new();
resolver.insert("./packs/telegraf-snmp-interface.yaml", telegraf_pack());
let expanded = expand_yaml(yaml, &resolver);
assert_eq!(expanded.entries.len(), 2);
}
#[test]
fn expanded_file_is_send_and_sync() {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<ExpandedFile>();
assert_send_sync::<ExpandedEntry>();
assert_send_sync::<ExpandError>();
}
}