1use std::collections::{BTreeMap, BTreeSet};
142
143use super::normalize::{NormalizedEntry, NormalizedFile};
144use super::{AfterClause, DelayClause, WhileClause};
145use crate::config::{
146 BurstConfig, CardinalitySpikeConfig, DistributionConfig, DynamicLabelConfig, GapConfig,
147 OnSinkError,
148};
149use crate::encoder::EncoderConfig;
150use crate::generator::{GeneratorConfig, LogGeneratorConfig};
151use crate::packs::{MetricOverride, MetricPackDef};
152use crate::sink::SinkConfig;
153
154#[derive(Debug, thiserror::Error)]
160#[non_exhaustive]
161pub enum ExpandError {
162 #[error("pack '{reference}' could not be resolved: {message}")]
167 ResolveFailed {
168 reference: String,
170 message: String,
172 },
173
174 #[error(
180 "override references unknown metric '{key}'; pack '{pack_name}' contains: {available}"
181 )]
182 UnknownOverrideKey {
183 key: String,
185 pack_name: String,
187 available: String,
189 },
190
191 #[error("pack '{pack_name}' contains no metrics")]
193 EmptyPack {
194 pack_name: String,
196 },
197
198 #[error(
211 "duplicate entry id '{id}' after pack expansion: \
212 {first_source} conflicts with {second_source}"
213 )]
214 DuplicateEntryId {
215 id: String,
217 first_source: String,
219 second_source: String,
221 },
222}
223
224pub trait PackResolver {
243 fn resolve(&self, reference: &str) -> Result<MetricPackDef, PackResolveError>;
254}
255
256#[derive(Debug, thiserror::Error)]
262#[error("{message}")]
263pub struct PackResolveError {
264 pub message: String,
266 pub origin: PackResolveOrigin,
268}
269
270#[derive(Debug, Clone, Copy, PartialEq, Eq)]
272pub enum PackResolveOrigin {
273 Name,
275 FilePath,
277}
278
279impl PackResolveError {
280 pub fn new(message: impl Into<String>, origin: PackResolveOrigin) -> Self {
286 Self {
287 message: message.into(),
288 origin,
289 }
290 }
291}
292
293pub fn classify_pack_reference(reference: &str) -> PackResolveOrigin {
298 if reference.contains('/') || reference.starts_with('.') {
299 PackResolveOrigin::FilePath
300 } else {
301 PackResolveOrigin::Name
302 }
303}
304
305#[derive(Debug, Default, Clone)]
312pub struct InMemoryPackResolver {
313 packs: BTreeMap<String, MetricPackDef>,
314}
315
316impl InMemoryPackResolver {
317 pub fn new() -> Self {
319 Self::default()
320 }
321
322 pub fn insert(&mut self, reference: impl Into<String>, pack: MetricPackDef) {
329 self.packs.insert(reference.into(), pack);
330 }
331}
332
333impl PackResolver for InMemoryPackResolver {
334 fn resolve(&self, reference: &str) -> Result<MetricPackDef, PackResolveError> {
335 match self.packs.get(reference) {
336 Some(pack) => Ok(pack.clone()),
337 None => Err(PackResolveError::new(
338 format!("pack reference '{reference}' not found in resolver"),
339 classify_pack_reference(reference),
340 )),
341 }
342 }
343}
344
345#[derive(Debug, Clone)]
364#[cfg_attr(feature = "config", derive(serde::Serialize))]
365pub struct ExpandedFile {
366 pub version: u32,
368 #[cfg_attr(feature = "config", serde(skip_serializing_if = "Option::is_none"))]
371 pub scenario_name: Option<String>,
372 pub entries: Vec<ExpandedEntry>,
378}
379
380#[derive(Debug, Clone)]
394#[cfg_attr(feature = "config", derive(serde::Serialize))]
395pub struct ExpandedEntry {
396 pub id: Option<String>,
405 pub signal_type: String,
407 pub name: String,
411 pub rate: f64,
414 pub duration: Option<String>,
416 pub generator: Option<GeneratorConfig>,
418 pub log_generator: Option<LogGeneratorConfig>,
420 pub labels: Option<BTreeMap<String, String>>,
428 pub dynamic_labels: Option<Vec<DynamicLabelConfig>>,
430 pub encoder: EncoderConfig,
432 pub sink: SinkConfig,
434 pub jitter: Option<f64>,
436 pub jitter_seed: Option<u64>,
438 pub gaps: Option<GapConfig>,
440 pub bursts: Option<BurstConfig>,
442 pub cardinality_spikes: Option<Vec<CardinalitySpikeConfig>>,
444 pub phase_offset: Option<String>,
446 pub clock_group: Option<String>,
448 pub after: Option<AfterClause>,
454 #[cfg_attr(feature = "config", serde(skip_serializing_if = "Option::is_none"))]
459 pub while_clause: Option<WhileClause>,
460 #[cfg_attr(feature = "config", serde(skip_serializing_if = "Option::is_none"))]
462 pub delay_clause: Option<DelayClause>,
463
464 pub distribution: Option<DistributionConfig>,
472 pub buckets: Option<Vec<f64>>,
474 pub quantiles: Option<Vec<f64>>,
476 pub observations_per_tick: Option<u32>,
478 pub mean_shift_per_sec: Option<f64>,
480 pub seed: Option<u64>,
482 pub on_sink_error: OnSinkError,
484}
485
486pub fn expand<R: PackResolver>(
514 file: NormalizedFile,
515 resolver: &R,
516) -> Result<ExpandedFile, ExpandError> {
517 let defaults_labels = file.defaults_labels;
518 let mut entries: Vec<ExpandedEntry> = Vec::with_capacity(file.entries.len());
519 let mut id_registry: BTreeMap<String, String> = BTreeMap::new();
523
524 for (index, entry) in file.entries.into_iter().enumerate() {
525 if entry.pack.is_some() {
526 expand_pack_entry(
527 entry,
528 index,
529 defaults_labels.as_ref(),
530 resolver,
531 &mut entries,
532 &mut id_registry,
533 )?;
534 } else {
535 let expanded = expand_inline_entry(entry);
536 if let Some(id) = expanded.id.as_ref() {
537 record_id(&mut id_registry, id, format!("inline entry '{id}'"))?;
538 }
539 entries.push(expanded);
540 }
541 }
542
543 Ok(ExpandedFile {
544 version: file.version,
545 scenario_name: file.scenario_name,
546 entries,
547 })
548}
549
550fn record_id(
556 registry: &mut BTreeMap<String, String>,
557 id: &str,
558 source: String,
559) -> Result<(), ExpandError> {
560 if let Some(prior) = registry.get(id) {
561 return Err(ExpandError::DuplicateEntryId {
562 id: id.to_string(),
563 first_source: prior.clone(),
564 second_source: source,
565 });
566 }
567 registry.insert(id.to_string(), source);
568 Ok(())
569}
570
571fn expand_inline_entry(entry: NormalizedEntry) -> ExpandedEntry {
581 ExpandedEntry {
582 id: entry.id,
583 signal_type: entry.signal_type,
584 name: entry.name.unwrap_or_default(),
586 rate: entry.rate,
587 duration: entry.duration,
588 generator: entry.generator,
589 log_generator: entry.log_generator,
590 labels: entry.labels,
591 dynamic_labels: entry.dynamic_labels,
592 encoder: entry.encoder,
593 sink: entry.sink,
594 jitter: entry.jitter,
595 jitter_seed: entry.jitter_seed,
596 gaps: entry.gaps,
597 bursts: entry.bursts,
598 cardinality_spikes: entry.cardinality_spikes,
599 phase_offset: entry.phase_offset,
600 clock_group: entry.clock_group,
601 after: entry.after,
602 while_clause: entry.while_clause,
603 delay_clause: entry.delay_clause,
604 distribution: entry.distribution,
605 buckets: entry.buckets,
606 quantiles: entry.quantiles,
607 observations_per_tick: entry.observations_per_tick,
608 mean_shift_per_sec: entry.mean_shift_per_sec,
609 on_sink_error: entry.on_sink_error,
610 seed: entry.seed,
611 }
612}
613
614fn expand_pack_entry<R: PackResolver>(
622 entry: NormalizedEntry,
623 entry_index: usize,
624 defaults_labels: Option<&BTreeMap<String, String>>,
625 resolver: &R,
626 out: &mut Vec<ExpandedEntry>,
627 id_registry: &mut BTreeMap<String, String>,
628) -> Result<(), ExpandError> {
629 let reference = entry
631 .pack
632 .as_deref()
633 .expect("expand_pack_entry called with non-pack entry; caller must check");
634
635 let pack = resolver
636 .resolve(reference)
637 .map_err(|e| ExpandError::ResolveFailed {
638 reference: reference.to_string(),
639 message: e.message,
640 })?;
641
642 if pack.metrics.is_empty() {
643 return Err(ExpandError::EmptyPack {
644 pack_name: pack.name,
645 });
646 }
647
648 validate_override_keys(&pack, entry.overrides.as_ref())?;
649
650 let (effective_entry_id, effective_id_source) = match entry.id.clone() {
651 Some(id) => (id.clone(), format!("pack entry '{id}' (user-provided id)")),
652 None => {
653 let synthesized = format!("{}_{}", pack.name, entry_index);
654 (
655 synthesized.clone(),
656 format!(
657 "pack entry at index {entry_index} (auto-generated id '{synthesized}' \
658 from pack '{}')",
659 pack.name
660 ),
661 )
662 }
663 };
664
665 record_id(id_registry, &effective_entry_id, effective_id_source)?;
672
673 let duplicate_metric_names = duplicate_metric_names(&pack);
680
681 for (spec_index, metric) in pack.metrics.iter().enumerate() {
682 let override_for_metric = entry
683 .overrides
684 .as_ref()
685 .and_then(|map| map.get(&metric.name));
686
687 let labels = compose_pack_metric_labels(
688 defaults_labels,
689 pack.shared_labels.as_ref(),
690 metric.labels.as_ref(),
691 entry.labels.as_ref(),
692 override_for_metric.and_then(|o| o.labels.as_ref()),
693 );
694
695 let generator = select_pack_metric_generator(metric, override_for_metric);
696
697 let after = override_for_metric
702 .and_then(|o| o.after.clone())
703 .or_else(|| entry.after.clone());
704 let while_clause = override_for_metric
705 .and_then(|o| o.while_clause.clone())
706 .or_else(|| entry.while_clause.clone());
707 let delay_clause = override_for_metric
708 .and_then(|o| o.delay_clause.clone())
709 .or_else(|| entry.delay_clause.clone());
710
711 let sub_signal_id = if duplicate_metric_names.contains(metric.name.as_str()) {
712 format!("{}.{}#{}", effective_entry_id, metric.name, spec_index)
713 } else {
714 format!("{}.{}", effective_entry_id, metric.name)
715 };
716 record_id(
717 id_registry,
718 &sub_signal_id,
719 format!(
720 "pack sub-signal '{sub_signal_id}' (pack '{}', metric '{}' at index {spec_index})",
721 pack.name, metric.name
722 ),
723 )?;
724
725 out.push(ExpandedEntry {
726 id: Some(sub_signal_id),
727 signal_type: "metrics".to_string(),
728 name: metric.name.clone(),
729 rate: entry.rate,
730 duration: entry.duration.clone(),
731 generator: Some(generator),
732 log_generator: None,
733 labels,
734 dynamic_labels: entry.dynamic_labels.clone(),
735 encoder: entry.encoder.clone(),
736 sink: entry.sink.clone(),
737 jitter: entry.jitter,
738 jitter_seed: entry.jitter_seed,
739 gaps: entry.gaps.clone(),
740 bursts: entry.bursts.clone(),
741 cardinality_spikes: entry.cardinality_spikes.clone(),
742 phase_offset: entry.phase_offset.clone(),
743 clock_group: entry.clock_group.clone(),
744 after,
745 while_clause,
746 delay_clause,
747 distribution: None,
748 buckets: None,
749 quantiles: None,
750 observations_per_tick: None,
751 mean_shift_per_sec: None,
752 seed: None,
753 on_sink_error: entry.on_sink_error,
754 });
755 }
756
757 Ok(())
758}
759
760fn duplicate_metric_names(pack: &MetricPackDef) -> BTreeSet<&str> {
767 let mut seen: BTreeSet<&str> = BTreeSet::new();
768 let mut duplicates: BTreeSet<&str> = BTreeSet::new();
769 for metric in &pack.metrics {
770 if !seen.insert(metric.name.as_str()) {
771 duplicates.insert(metric.name.as_str());
772 }
773 }
774 duplicates
775}
776
777fn validate_override_keys(
782 pack: &MetricPackDef,
783 overrides: Option<&BTreeMap<String, MetricOverride>>,
784) -> Result<(), ExpandError> {
785 let Some(overrides) = overrides else {
786 return Ok(());
787 };
788 if overrides.is_empty() {
789 return Ok(());
790 }
791
792 let metric_names: BTreeSet<&str> = pack.metrics.iter().map(|m| m.name.as_str()).collect();
793 for key in overrides.keys() {
794 if !metric_names.contains(key.as_str()) {
795 let available: Vec<&str> = pack.metrics.iter().map(|m| m.name.as_str()).collect();
796 return Err(ExpandError::UnknownOverrideKey {
797 key: key.clone(),
798 pack_name: pack.name.clone(),
799 available: available.join(", "),
800 });
801 }
802 }
803 Ok(())
804}
805
806fn compose_pack_metric_labels(
812 defaults_labels: Option<&BTreeMap<String, String>>,
813 pack_shared_labels: Option<&std::collections::HashMap<String, String>>,
814 pack_metric_labels: Option<&std::collections::HashMap<String, String>>,
815 entry_labels: Option<&BTreeMap<String, String>>,
816 override_labels: Option<&BTreeMap<String, String>>,
817) -> Option<BTreeMap<String, String>> {
818 let mut merged: BTreeMap<String, String> = BTreeMap::new();
819
820 if let Some(src) = defaults_labels {
822 for (k, v) in src {
823 merged.insert(k.clone(), v.clone());
824 }
825 }
826
827 if let Some(src) = pack_shared_labels {
829 for (k, v) in src {
830 merged.insert(k.clone(), v.clone());
831 }
832 }
833
834 if let Some(src) = pack_metric_labels {
836 for (k, v) in src {
837 merged.insert(k.clone(), v.clone());
838 }
839 }
840
841 if let Some(src) = entry_labels {
843 for (k, v) in src {
844 merged.insert(k.clone(), v.clone());
845 }
846 }
847
848 if let Some(src) = override_labels {
850 for (k, v) in src {
851 merged.insert(k.clone(), v.clone());
852 }
853 }
854
855 if merged.is_empty() {
856 None
857 } else {
858 Some(merged)
859 }
860}
861
862fn select_pack_metric_generator(
868 metric: &crate::packs::MetricSpec,
869 metric_override: Option<&MetricOverride>,
870) -> GeneratorConfig {
871 if let Some(over) = metric_override {
872 if let Some(gen) = over.generator.clone() {
873 return gen;
874 }
875 }
876 metric
877 .generator
878 .clone()
879 .unwrap_or(GeneratorConfig::Constant { value: 0.0 })
880}
881
882#[cfg(test)]
887mod tests {
888 use super::*;
889 use crate::compiler::normalize::normalize;
890 use crate::compiler::parse::parse;
891 use crate::compiler::AfterOp;
892 use crate::packs::MetricSpec;
893 use std::collections::HashMap;
894
895 fn telegraf_pack() -> MetricPackDef {
900 let mut shared = HashMap::new();
901 shared.insert("device".to_string(), String::new());
902 shared.insert("job".to_string(), "snmp".to_string());
903
904 MetricPackDef {
905 name: "telegraf_snmp_interface".to_string(),
906 description: "test".to_string(),
907 category: "network".to_string(),
908 shared_labels: Some(shared),
909 metrics: vec![
910 MetricSpec {
911 name: "ifOperStatus".to_string(),
912 labels: None,
913 generator: Some(GeneratorConfig::Constant { value: 1.0 }),
914 },
915 MetricSpec {
916 name: "ifHCInOctets".to_string(),
917 labels: None,
918 generator: Some(GeneratorConfig::Step {
919 start: Some(0.0),
920 step_size: 125_000.0,
921 max: None,
922 }),
923 },
924 ],
925 }
926 }
927
928 fn node_cpu_pack() -> MetricPackDef {
929 let mut shared = HashMap::new();
930 shared.insert("job".to_string(), "node_exporter".to_string());
931
932 let mut user_labels = HashMap::new();
933 user_labels.insert("mode".to_string(), "user".to_string());
934
935 let mut system_labels = HashMap::new();
936 system_labels.insert("mode".to_string(), "system".to_string());
937
938 MetricPackDef {
939 name: "node_exporter_cpu".to_string(),
940 description: "test".to_string(),
941 category: "infrastructure".to_string(),
942 shared_labels: Some(shared),
943 metrics: vec![
944 MetricSpec {
945 name: "node_cpu_seconds_total".to_string(),
946 labels: Some(user_labels),
947 generator: Some(GeneratorConfig::Step {
948 start: Some(0.0),
949 step_size: 0.25,
950 max: None,
951 }),
952 },
953 MetricSpec {
954 name: "node_cpu_seconds_total".to_string(),
955 labels: Some(system_labels),
956 generator: Some(GeneratorConfig::Step {
957 start: Some(0.0),
958 step_size: 0.10,
959 max: None,
960 }),
961 },
962 ],
963 }
964 }
965
966 fn expand_yaml(yaml: &str, resolver: &InMemoryPackResolver) -> ExpandedFile {
967 let parsed = parse(yaml).expect("parse must succeed");
968 let normalized = normalize(parsed).expect("normalize must succeed");
969 expand(normalized, resolver).expect("expand must succeed")
970 }
971
972 #[rustfmt::skip]
977 #[rstest::rstest]
978 #[case::plain_name("telegraf_snmp_interface", PackResolveOrigin::Name)]
979 #[case::dot_relative("./packs/custom.yaml", PackResolveOrigin::FilePath)]
980 #[case::absolute_path("/abs/path/pack.yaml", PackResolveOrigin::FilePath)]
981 #[case::plain_relative("rel/pack.yaml", PackResolveOrigin::FilePath)]
982 fn classify_pack_reference_distinguishes_name_and_file_path(
983 #[case] reference: &str,
984 #[case] expected: PackResolveOrigin,
985 ) {
986 assert_eq!(classify_pack_reference(reference), expected);
987 }
988
989 #[test]
990 fn in_memory_resolver_returns_registered_pack() {
991 let mut r = InMemoryPackResolver::new();
992 r.insert("telegraf_snmp_interface", telegraf_pack());
993 let def = r.resolve("telegraf_snmp_interface").expect("must resolve");
994 assert_eq!(def.name, "telegraf_snmp_interface");
995 }
996
997 #[test]
998 fn in_memory_resolver_errors_on_missing_reference() {
999 let r = InMemoryPackResolver::new();
1000 let err = r.resolve("nope").expect_err("must error");
1001 assert_eq!(err.origin, PackResolveOrigin::Name);
1002 assert!(err.message.contains("nope"));
1003 }
1004
1005 #[test]
1006 fn in_memory_resolver_classifies_file_paths() {
1007 let r = InMemoryPackResolver::new();
1008 let err = r.resolve("./no-such.yaml").expect_err("must error");
1009 assert_eq!(err.origin, PackResolveOrigin::FilePath);
1010 }
1011
1012 #[test]
1017 fn expand_produces_one_entry_per_pack_metric() {
1018 let yaml = r#"
1019version: 2
1020defaults:
1021 rate: 1
1022scenarios:
1023 - id: primary
1024 signal_type: metrics
1025 pack: telegraf_snmp_interface
1026"#;
1027 let mut resolver = InMemoryPackResolver::new();
1028 resolver.insert("telegraf_snmp_interface", telegraf_pack());
1029 let expanded = expand_yaml(yaml, &resolver);
1030 assert_eq!(expanded.entries.len(), 2);
1031 assert_eq!(expanded.entries[0].name, "ifOperStatus");
1032 assert_eq!(expanded.entries[1].name, "ifHCInOctets");
1033 }
1034
1035 #[test]
1036 fn expanded_signal_type_is_metrics() {
1037 let yaml = r#"
1038version: 2
1039defaults: { rate: 1 }
1040scenarios:
1041 - signal_type: metrics
1042 pack: telegraf_snmp_interface
1043"#;
1044 let mut resolver = InMemoryPackResolver::new();
1045 resolver.insert("telegraf_snmp_interface", telegraf_pack());
1046 let expanded = expand_yaml(yaml, &resolver);
1047 for e in &expanded.entries {
1048 assert_eq!(e.signal_type, "metrics");
1049 }
1050 }
1051
1052 #[rustfmt::skip]
1057 #[rstest::rstest]
1058 #[case::user_supplied_entry_id(r#"
1061version: 2
1062defaults: { rate: 1 }
1063scenarios:
1064 - id: primary
1065 signal_type: metrics
1066 pack: telegraf_snmp_interface
1067"#, "primary.ifOperStatus", "primary.ifHCInOctets")]
1068 #[case::auto_generated_entry_id(r#"
1072version: 2
1073defaults: { rate: 1 }
1074scenarios:
1075 - signal_type: metrics
1076 pack: telegraf_snmp_interface
1077"#, "telegraf_snmp_interface_0.ifOperStatus", "telegraf_snmp_interface_0.ifHCInOctets")]
1078 fn sub_signal_ids_follow_effective_entry_id(
1079 #[case] yaml: &str,
1080 #[case] expected_first: &str,
1081 #[case] expected_second: &str,
1082 ) {
1083 let mut resolver = InMemoryPackResolver::new();
1084 resolver.insert("telegraf_snmp_interface", telegraf_pack());
1085 let expanded = expand_yaml(yaml, &resolver);
1086 assert_eq!(expanded.entries[0].id.as_deref(), Some(expected_first));
1087 assert_eq!(expanded.entries[1].id.as_deref(), Some(expected_second));
1088 }
1089
1090 #[test]
1091 fn two_anonymous_pack_entries_disambiguate_by_index() {
1092 let yaml = r#"
1093version: 2
1094defaults: { rate: 1 }
1095scenarios:
1096 - signal_type: metrics
1097 pack: telegraf_snmp_interface
1098 - signal_type: metrics
1099 pack: telegraf_snmp_interface
1100"#;
1101 let mut resolver = InMemoryPackResolver::new();
1102 resolver.insert("telegraf_snmp_interface", telegraf_pack());
1103 let expanded = expand_yaml(yaml, &resolver);
1104 let ids: Vec<_> = expanded
1105 .entries
1106 .iter()
1107 .filter_map(|e| e.id.as_deref())
1108 .collect();
1109 assert!(ids.contains(&"telegraf_snmp_interface_0.ifOperStatus"));
1110 assert!(ids.contains(&"telegraf_snmp_interface_1.ifOperStatus"));
1111 let mut sorted = ids.clone();
1113 sorted.sort();
1114 sorted.dedup();
1115 assert_eq!(sorted.len(), ids.len(), "ids must be unique");
1116 }
1117
1118 #[test]
1123 fn label_precedence_chain_applied_in_order() {
1124 let mut shared = HashMap::new();
1127 shared.insert("region".to_string(), "shared-region".to_string());
1128 shared.insert("job".to_string(), "snmp".to_string());
1129
1130 let mut metric_labels = HashMap::new();
1131 metric_labels.insert("region".to_string(), "metric-region".to_string());
1132
1133 let pack = MetricPackDef {
1134 name: "p".to_string(),
1135 description: "t".to_string(),
1136 category: "c".to_string(),
1137 shared_labels: Some(shared),
1138 metrics: vec![MetricSpec {
1139 name: "m".to_string(),
1140 labels: Some(metric_labels),
1141 generator: Some(GeneratorConfig::Constant { value: 0.0 }),
1142 }],
1143 };
1144
1145 let mut resolver = InMemoryPackResolver::new();
1146 resolver.insert("p", pack);
1147
1148 let yaml = r#"
1149version: 2
1150defaults:
1151 rate: 1
1152 labels:
1153 region: defaults-region
1154 env: prod
1155scenarios:
1156 - id: e
1157 signal_type: metrics
1158 pack: p
1159 labels:
1160 region: entry-region
1161 device: rtr-01
1162 overrides:
1163 m:
1164 labels:
1165 region: override-region
1166"#;
1167 let expanded = expand_yaml(yaml, &resolver);
1168 let labels = expanded.entries[0].labels.as_ref().unwrap();
1169
1170 assert_eq!(labels.get("region").unwrap(), "override-region");
1172 assert_eq!(labels.get("env").unwrap(), "prod");
1174 assert_eq!(labels.get("job").unwrap(), "snmp");
1175 assert_eq!(labels.get("device").unwrap(), "rtr-01");
1176 }
1177
1178 #[test]
1179 fn defaults_labels_flow_into_pack_metric_labels() {
1180 let yaml = r#"
1183version: 2
1184defaults:
1185 rate: 1
1186 labels:
1187 env: prod
1188scenarios:
1189 - id: p
1190 signal_type: metrics
1191 pack: telegraf_snmp_interface
1192"#;
1193 let mut resolver = InMemoryPackResolver::new();
1194 resolver.insert("telegraf_snmp_interface", telegraf_pack());
1195 let expanded = expand_yaml(yaml, &resolver);
1196 let labels = expanded.entries[0].labels.as_ref().unwrap();
1197 assert_eq!(labels.get("env").unwrap(), "prod");
1198 }
1199
1200 #[test]
1201 fn pack_shared_labels_override_defaults_labels() {
1202 let mut shared = HashMap::new();
1203 shared.insert("job".to_string(), "snmp".to_string());
1204 let pack = MetricPackDef {
1205 name: "p".to_string(),
1206 description: "t".to_string(),
1207 category: "c".to_string(),
1208 shared_labels: Some(shared),
1209 metrics: vec![MetricSpec {
1210 name: "m".to_string(),
1211 labels: None,
1212 generator: Some(GeneratorConfig::Constant { value: 0.0 }),
1213 }],
1214 };
1215 let mut resolver = InMemoryPackResolver::new();
1216 resolver.insert("p", pack);
1217
1218 let yaml = r#"
1219version: 2
1220defaults:
1221 rate: 1
1222 labels:
1223 job: web
1224scenarios:
1225 - signal_type: metrics
1226 pack: p
1227"#;
1228 let expanded = expand_yaml(yaml, &resolver);
1229 let labels = expanded.entries[0].labels.as_ref().unwrap();
1230 assert_eq!(labels.get("job").unwrap(), "snmp");
1231 }
1232
1233 #[test]
1234 fn inline_entry_labels_pass_through_unchanged() {
1235 let yaml = r#"
1239version: 2
1240defaults:
1241 rate: 1
1242 labels:
1243 env: prod
1244scenarios:
1245 - signal_type: metrics
1246 name: cpu
1247 generator: { type: constant, value: 1 }
1248 labels:
1249 instance: web-01
1250"#;
1251 let resolver = InMemoryPackResolver::new();
1252 let expanded = expand_yaml(yaml, &resolver);
1253 let labels = expanded.entries[0].labels.as_ref().unwrap();
1254 assert_eq!(labels.get("env").unwrap(), "prod");
1255 assert_eq!(labels.get("instance").unwrap(), "web-01");
1256 assert_eq!(labels.len(), 2);
1257 }
1258
1259 #[test]
1264 fn override_generator_replaces_pack_generator() {
1265 let yaml = r#"
1266version: 2
1267defaults: { rate: 1 }
1268scenarios:
1269 - id: e
1270 signal_type: metrics
1271 pack: telegraf_snmp_interface
1272 overrides:
1273 ifOperStatus:
1274 generator:
1275 type: flap
1276 up_duration: 60s
1277 down_duration: 30s
1278"#;
1279 let mut resolver = InMemoryPackResolver::new();
1280 resolver.insert("telegraf_snmp_interface", telegraf_pack());
1281 let expanded = expand_yaml(yaml, &resolver);
1282 assert!(matches!(
1284 expanded.entries[0].generator.as_ref().unwrap(),
1285 GeneratorConfig::Flap { .. }
1286 ));
1287 assert!(matches!(
1289 expanded.entries[1].generator.as_ref().unwrap(),
1290 GeneratorConfig::Step { .. }
1291 ));
1292 }
1293
1294 #[test]
1295 fn missing_generator_falls_back_to_constant_zero() {
1296 let pack = MetricPackDef {
1297 name: "p".to_string(),
1298 description: "t".to_string(),
1299 category: "c".to_string(),
1300 shared_labels: None,
1301 metrics: vec![MetricSpec {
1302 name: "x".to_string(),
1303 labels: None,
1304 generator: None,
1305 }],
1306 };
1307 let mut resolver = InMemoryPackResolver::new();
1308 resolver.insert("p", pack);
1309
1310 let yaml = r#"
1311version: 2
1312defaults: { rate: 1 }
1313scenarios:
1314 - signal_type: metrics
1315 pack: p
1316"#;
1317 let expanded = expand_yaml(yaml, &resolver);
1318 match expanded.entries[0].generator.as_ref().unwrap() {
1319 GeneratorConfig::Constant { value } => assert_eq!(*value, 0.0),
1320 other => panic!("expected constant(0), got {other:?}"),
1321 }
1322 }
1323
1324 #[test]
1329 fn entry_level_after_propagates_to_every_metric() {
1330 let yaml = r#"
1331version: 2
1332defaults: { rate: 1 }
1333scenarios:
1334 - id: tail
1335 signal_type: metrics
1336 pack: telegraf_snmp_interface
1337 after:
1338 ref: head
1339 op: ">"
1340 value: 5
1341"#;
1342 let mut resolver = InMemoryPackResolver::new();
1343 resolver.insert("telegraf_snmp_interface", telegraf_pack());
1344 let expanded = expand_yaml(yaml, &resolver);
1345 for e in &expanded.entries {
1346 let after = e.after.as_ref().expect("after must be propagated");
1347 assert_eq!(after.ref_id, "head");
1348 assert!(matches!(after.op, AfterOp::GreaterThan));
1349 }
1350 }
1351
1352 #[test]
1353 fn entry_level_while_propagates_to_every_metric() {
1354 let yaml = r#"
1355version: 2
1356defaults: { rate: 1, duration: 5m }
1357scenarios:
1358 - id: head
1359 signal_type: metrics
1360 name: head
1361 generator: { type: constant, value: 1 }
1362 - id: tail
1363 signal_type: metrics
1364 pack: telegraf_snmp_interface
1365 while:
1366 ref: head
1367 op: ">"
1368 value: 5
1369"#;
1370 let mut resolver = InMemoryPackResolver::new();
1371 resolver.insert("telegraf_snmp_interface", telegraf_pack());
1372 let expanded = expand_yaml(yaml, &resolver);
1373 let pack_subs: Vec<_> = expanded
1374 .entries
1375 .iter()
1376 .filter(|e| {
1377 e.id.as_deref()
1378 .map(|s| s.starts_with("tail."))
1379 .unwrap_or(false)
1380 })
1381 .collect();
1382 assert!(!pack_subs.is_empty());
1383 for e in pack_subs {
1384 let w = e.while_clause.as_ref().expect("while must be propagated");
1385 assert_eq!(w.ref_id, "head");
1386 }
1387 }
1388
1389 #[test]
1390 fn override_while_replaces_entry_while_for_that_metric() {
1391 let yaml = r#"
1392version: 2
1393defaults: { rate: 1, duration: 5m }
1394scenarios:
1395 - id: head
1396 signal_type: metrics
1397 name: head
1398 generator: { type: constant, value: 1 }
1399 - id: other
1400 signal_type: metrics
1401 name: other
1402 generator: { type: constant, value: 1 }
1403 - id: tail
1404 signal_type: metrics
1405 pack: telegraf_snmp_interface
1406 while:
1407 ref: head
1408 op: ">"
1409 value: 5
1410 overrides:
1411 ifOperStatus:
1412 while:
1413 ref: other
1414 op: "<"
1415 value: 1
1416"#;
1417 let mut resolver = InMemoryPackResolver::new();
1418 resolver.insert("telegraf_snmp_interface", telegraf_pack());
1419 let expanded = expand_yaml(yaml, &resolver);
1420 let oper = expanded
1421 .entries
1422 .iter()
1423 .find(|e| e.name == "ifOperStatus")
1424 .unwrap();
1425 assert_eq!(oper.while_clause.as_ref().unwrap().ref_id, "other");
1426 let in_octets = expanded
1427 .entries
1428 .iter()
1429 .find(|e| e.name == "ifHCInOctets")
1430 .unwrap();
1431 assert_eq!(in_octets.while_clause.as_ref().unwrap().ref_id, "head");
1432 }
1433
1434 #[test]
1435 fn override_after_replaces_entry_after_for_that_metric() {
1436 let yaml = r#"
1437version: 2
1438defaults: { rate: 1 }
1439scenarios:
1440 - id: tail
1441 signal_type: metrics
1442 pack: telegraf_snmp_interface
1443 after:
1444 ref: head
1445 op: ">"
1446 value: 5
1447 overrides:
1448 ifOperStatus:
1449 after:
1450 ref: other
1451 op: "<"
1452 value: 1
1453"#;
1454 let mut resolver = InMemoryPackResolver::new();
1455 resolver.insert("telegraf_snmp_interface", telegraf_pack());
1456 let expanded = expand_yaml(yaml, &resolver);
1457 let oper = expanded
1458 .entries
1459 .iter()
1460 .find(|e| e.name == "ifOperStatus")
1461 .unwrap();
1462 assert_eq!(oper.after.as_ref().unwrap().ref_id, "other");
1463 let in_octets = expanded
1464 .entries
1465 .iter()
1466 .find(|e| e.name == "ifHCInOctets")
1467 .unwrap();
1468 assert_eq!(in_octets.after.as_ref().unwrap().ref_id, "head");
1469 }
1470
1471 #[test]
1476 fn schedule_delivery_fields_propagate_to_every_metric() {
1477 let yaml = r#"
1478version: 2
1479defaults:
1480 rate: 1
1481 duration: 2m
1482scenarios:
1483 - id: p
1484 signal_type: metrics
1485 pack: telegraf_snmp_interface
1486 phase_offset: 5s
1487 clock_group: uplink
1488 jitter: 0.2
1489 jitter_seed: 42
1490 gaps:
1491 every: 2m
1492 for: 20s
1493 bursts:
1494 every: 5m
1495 for: 30s
1496 multiplier: 10
1497"#;
1498 let mut resolver = InMemoryPackResolver::new();
1499 resolver.insert("telegraf_snmp_interface", telegraf_pack());
1500 let expanded = expand_yaml(yaml, &resolver);
1501 for e in &expanded.entries {
1502 assert_eq!(e.rate, 1.0);
1503 assert_eq!(e.duration.as_deref(), Some("2m"));
1504 assert_eq!(e.phase_offset.as_deref(), Some("5s"));
1505 assert_eq!(e.clock_group.as_deref(), Some("uplink"));
1506 assert_eq!(e.jitter, Some(0.2));
1507 assert_eq!(e.jitter_seed, Some(42));
1508 assert!(e.gaps.is_some());
1509 assert!(e.bursts.is_some());
1510 }
1511 }
1512
1513 #[test]
1518 fn expanded_entries_have_no_pack_field() {
1519 let yaml = r#"
1523version: 2
1524defaults: { rate: 1 }
1525scenarios:
1526 - signal_type: metrics
1527 pack: telegraf_snmp_interface
1528"#;
1529 let mut resolver = InMemoryPackResolver::new();
1530 resolver.insert("telegraf_snmp_interface", telegraf_pack());
1531 let expanded = expand_yaml(yaml, &resolver);
1532 assert!(expanded.entries.iter().all(|e| e.generator.is_some()));
1536 }
1537
1538 #[test]
1543 fn unknown_override_key_is_an_error() {
1544 let yaml = r#"
1545version: 2
1546defaults: { rate: 1 }
1547scenarios:
1548 - signal_type: metrics
1549 pack: telegraf_snmp_interface
1550 overrides:
1551 not_a_metric:
1552 generator:
1553 type: constant
1554 value: 0
1555"#;
1556 let mut resolver = InMemoryPackResolver::new();
1557 resolver.insert("telegraf_snmp_interface", telegraf_pack());
1558 let parsed = parse(yaml).expect("parse");
1559 let normalized = normalize(parsed).expect("normalize");
1560 let err = expand(normalized, &resolver).expect_err("must fail");
1561 match err {
1562 ExpandError::UnknownOverrideKey {
1563 key,
1564 pack_name,
1565 available,
1566 } => {
1567 assert_eq!(key, "not_a_metric");
1568 assert_eq!(pack_name, "telegraf_snmp_interface");
1569 assert!(available.contains("ifOperStatus"));
1570 }
1571 other => panic!("wrong error variant: {other:?}"),
1572 }
1573 }
1574
1575 #[test]
1576 fn unresolvable_pack_is_an_error() {
1577 let yaml = r#"
1578version: 2
1579defaults: { rate: 1 }
1580scenarios:
1581 - signal_type: metrics
1582 pack: nonexistent
1583"#;
1584 let resolver = InMemoryPackResolver::new();
1585 let parsed = parse(yaml).expect("parse");
1586 let normalized = normalize(parsed).expect("normalize");
1587 let err = expand(normalized, &resolver).expect_err("must fail");
1588 match err {
1589 ExpandError::ResolveFailed { reference, message } => {
1590 assert_eq!(reference, "nonexistent");
1591 assert!(message.contains("nonexistent"));
1592 }
1593 other => panic!("wrong error variant: {other:?}"),
1594 }
1595 }
1596
1597 #[test]
1598 fn empty_pack_is_an_error() {
1599 let pack = MetricPackDef {
1600 name: "empty".to_string(),
1601 description: "t".to_string(),
1602 category: "c".to_string(),
1603 shared_labels: None,
1604 metrics: vec![],
1605 };
1606 let mut resolver = InMemoryPackResolver::new();
1607 resolver.insert("empty", pack);
1608 let yaml = r#"
1609version: 2
1610defaults: { rate: 1 }
1611scenarios:
1612 - signal_type: metrics
1613 pack: empty
1614"#;
1615 let parsed = parse(yaml).expect("parse");
1616 let normalized = normalize(parsed).expect("normalize");
1617 let err = expand(normalized, &resolver).expect_err("must fail");
1618 assert!(matches!(err, ExpandError::EmptyPack { pack_name } if pack_name == "empty"));
1619 }
1620
1621 #[test]
1626 fn inline_entries_pass_through_untouched() {
1627 let yaml = r#"
1628version: 2
1629scenarios:
1630 - id: cpu
1631 signal_type: metrics
1632 name: cpu_usage
1633 rate: 2
1634 duration: 60s
1635 generator: { type: constant, value: 1 }
1636 labels: { instance: web-01 }
1637"#;
1638 let resolver = InMemoryPackResolver::new();
1639 let expanded = expand_yaml(yaml, &resolver);
1640 assert_eq!(expanded.entries.len(), 1);
1641 let e = &expanded.entries[0];
1642 assert_eq!(e.id.as_deref(), Some("cpu"));
1643 assert_eq!(e.name, "cpu_usage");
1644 assert_eq!(e.rate, 2.0);
1645 assert_eq!(e.duration.as_deref(), Some("60s"));
1646 assert_eq!(
1647 e.labels.as_ref().unwrap().get("instance").unwrap(),
1648 "web-01"
1649 );
1650 }
1651
1652 #[test]
1653 fn mixed_inline_and_pack_entries_interleave_correctly() {
1654 let yaml = r#"
1655version: 2
1656defaults: { rate: 1 }
1657scenarios:
1658 - id: cpu
1659 signal_type: metrics
1660 name: cpu_usage
1661 generator: { type: constant, value: 1 }
1662 - id: net
1663 signal_type: metrics
1664 pack: telegraf_snmp_interface
1665"#;
1666 let mut resolver = InMemoryPackResolver::new();
1667 resolver.insert("telegraf_snmp_interface", telegraf_pack());
1668 let expanded = expand_yaml(yaml, &resolver);
1669 assert_eq!(expanded.entries.len(), 3);
1671 assert_eq!(expanded.entries[0].id.as_deref(), Some("cpu"));
1672 assert_eq!(expanded.entries[1].id.as_deref(), Some("net.ifOperStatus"));
1673 assert_eq!(expanded.entries[2].id.as_deref(), Some("net.ifHCInOctets"));
1674 }
1675
1676 #[test]
1681 fn repeated_metric_names_produce_one_entry_per_spec_instance() {
1682 let yaml = r#"
1683version: 2
1684defaults: { rate: 1 }
1685scenarios:
1686 - id: cpu
1687 signal_type: metrics
1688 pack: node_exporter_cpu
1689"#;
1690 let mut resolver = InMemoryPackResolver::new();
1691 resolver.insert("node_exporter_cpu", node_cpu_pack());
1692 let expanded = expand_yaml(yaml, &resolver);
1693 assert_eq!(expanded.entries.len(), 2);
1694 assert_eq!(expanded.entries[0].name, "node_cpu_seconds_total");
1695 assert_eq!(expanded.entries[1].name, "node_cpu_seconds_total");
1696 assert_eq!(
1698 expanded.entries[0]
1699 .labels
1700 .as_ref()
1701 .unwrap()
1702 .get("mode")
1703 .unwrap(),
1704 "user"
1705 );
1706 assert_eq!(
1707 expanded.entries[1]
1708 .labels
1709 .as_ref()
1710 .unwrap()
1711 .get("mode")
1712 .unwrap(),
1713 "system"
1714 );
1715 }
1716
1717 #[test]
1718 fn repeated_metric_names_produce_unique_sub_signal_ids() {
1719 let yaml = r#"
1724version: 2
1725defaults: { rate: 1 }
1726scenarios:
1727 - id: cpu
1728 signal_type: metrics
1729 pack: node_exporter_cpu
1730"#;
1731 let mut resolver = InMemoryPackResolver::new();
1732 resolver.insert("node_exporter_cpu", node_cpu_pack());
1733 let expanded = expand_yaml(yaml, &resolver);
1734
1735 let ids: Vec<&str> = expanded
1736 .entries
1737 .iter()
1738 .map(|e| {
1739 e.id.as_deref()
1740 .expect("pack-expanded entries always carry an id")
1741 })
1742 .collect();
1743 let mut unique = ids.clone();
1744 unique.sort();
1745 unique.dedup();
1746 assert_eq!(
1747 unique.len(),
1748 ids.len(),
1749 "sub-signal ids must be unique; saw {ids:?}"
1750 );
1751
1752 assert_eq!(ids[0], "cpu.node_cpu_seconds_total#0");
1755 assert_eq!(ids[1], "cpu.node_cpu_seconds_total#1");
1756 }
1757
1758 #[test]
1759 fn unique_metric_names_keep_clean_sub_signal_ids() {
1760 let yaml = r#"
1766version: 2
1767defaults: { rate: 1 }
1768scenarios:
1769 - id: net
1770 signal_type: metrics
1771 pack: telegraf_snmp_interface
1772"#;
1773 let mut resolver = InMemoryPackResolver::new();
1774 resolver.insert("telegraf_snmp_interface", telegraf_pack());
1775 let expanded = expand_yaml(yaml, &resolver);
1776
1777 let ids: Vec<&str> = expanded
1778 .entries
1779 .iter()
1780 .filter_map(|e| e.id.as_deref())
1781 .collect();
1782 assert_eq!(ids, vec!["net.ifOperStatus", "net.ifHCInOctets"]);
1783 }
1784
1785 #[rustfmt::skip]
1790 #[rstest::rstest]
1791 #[case::inline_first_then_auto(r#"
1796version: 2
1797defaults: { rate: 1 }
1798scenarios:
1799 - id: telegraf_snmp_interface_1
1800 signal_type: metrics
1801 name: cpu
1802 generator: { type: constant, value: 1 }
1803 - signal_type: metrics
1804 pack: telegraf_snmp_interface
1805"#, "telegraf_snmp_interface_1", "inline entry", "auto-generated")]
1806 #[case::auto_first_then_inline(r#"
1810version: 2
1811defaults: { rate: 1 }
1812scenarios:
1813 - signal_type: metrics
1814 pack: telegraf_snmp_interface
1815 - id: telegraf_snmp_interface_0
1816 signal_type: metrics
1817 name: cpu
1818 generator: { type: constant, value: 1 }
1819"#, "telegraf_snmp_interface_0", "auto-generated", "inline entry")]
1820 fn duplicate_entry_id_detected_regardless_of_source_order(
1821 #[case] yaml: &str,
1822 #[case] expected_id: &str,
1823 #[case] expected_first_substr: &str,
1824 #[case] expected_second_substr: &str,
1825 ) {
1826 let mut resolver = InMemoryPackResolver::new();
1827 resolver.insert("telegraf_snmp_interface", telegraf_pack());
1828 let parsed = parse(yaml).expect("parse");
1829 let normalized = normalize(parsed).expect("normalize");
1830 let err = expand(normalized, &resolver).expect_err("must fail");
1831 match err {
1832 ExpandError::DuplicateEntryId {
1833 id,
1834 first_source,
1835 second_source,
1836 } => {
1837 assert_eq!(id, expected_id);
1838 assert!(
1839 first_source.contains(expected_first_substr),
1840 "unexpected first source: {first_source}"
1841 );
1842 assert!(
1843 second_source.contains(expected_second_substr),
1844 "unexpected second source: {second_source}"
1845 );
1846 }
1847 other => panic!("wrong error variant: {other:?}"),
1848 }
1849 }
1850
1851 #[test]
1852 fn duplicate_entry_id_error_preserves_both_sources() {
1853 let yaml = r#"
1863version: 2
1864defaults: { rate: 1 }
1865scenarios:
1866 - id: telegraf_snmp_interface_1
1867 signal_type: metrics
1868 name: cpu
1869 generator: { type: constant, value: 1 }
1870 - signal_type: metrics
1871 pack: telegraf_snmp_interface
1872"#;
1873 let mut resolver = InMemoryPackResolver::new();
1874 resolver.insert("telegraf_snmp_interface", telegraf_pack());
1875 let parsed = parse(yaml).expect("parse");
1876 let normalized = normalize(parsed).expect("normalize");
1877 let err = expand(normalized, &resolver).expect_err("must fail");
1878 let rendered = err.to_string();
1879 assert!(
1880 rendered.contains("'telegraf_snmp_interface_1'"),
1881 "error must name the colliding id: {rendered}"
1882 );
1883 assert!(
1884 rendered.contains("inline entry"),
1885 "error must name the inline source: {rendered}"
1886 );
1887 assert!(
1888 rendered.contains("auto-generated"),
1889 "error must name the auto-generated source: {rendered}"
1890 );
1891 }
1892
1893 #[test]
1898 fn pack_by_file_path_is_resolved_through_trait() {
1899 let yaml = r#"
1900version: 2
1901defaults: { rate: 1 }
1902scenarios:
1903 - signal_type: metrics
1904 pack: ./packs/telegraf-snmp-interface.yaml
1905"#;
1906 let mut resolver = InMemoryPackResolver::new();
1907 resolver.insert("./packs/telegraf-snmp-interface.yaml", telegraf_pack());
1908 let expanded = expand_yaml(yaml, &resolver);
1909 assert_eq!(expanded.entries.len(), 2);
1910 }
1911
1912 #[test]
1917 fn expanded_file_is_send_and_sync() {
1918 fn assert_send_sync<T: Send + Sync>() {}
1919 assert_send_sync::<ExpandedFile>();
1920 assert_send_sync::<ExpandedEntry>();
1921 assert_send_sync::<ExpandError>();
1922 }
1923}