1use std::collections::BTreeMap;
88
89use super::{AfterClause, Defaults, DelayClause, Entry, ScenarioFile, WhileClause};
90use crate::config::{
91 BurstConfig, CardinalitySpikeConfig, DistributionConfig, DynamicLabelConfig, GapConfig,
92 OnSinkError,
93};
94use crate::encoder::EncoderConfig;
95use crate::generator::{GeneratorConfig, LogGeneratorConfig};
96use crate::packs::MetricOverride;
97use crate::sink::SinkConfig;
98
99#[derive(Debug, thiserror::Error)]
105#[non_exhaustive]
106pub enum NormalizeError {
107 #[error("entry {index} ({label}): missing required field 'rate' (set it on the entry or in defaults:)")]
112 MissingRate {
113 index: usize,
115 label: String,
118 },
119
120 #[error(
121 "entry '{source_id}': scenarios with `while:` must have `duration:` set \
122 (either on the entry or via `defaults.duration`).\n\
123 Bounds the scenario's lifetime so paused state has a terminal point."
124 )]
125 WhileWithoutDuration { source_id: String },
126
127 #[error(
128 "entry '{source_id}': `delay:` requires `while:` on the same entry. \
129 `delay:` debounces `while:` transitions and has no meaning without it."
130 )]
131 DelayWithoutWhile { source_id: String },
132
133 #[error(
134 "entry '{source_id}': delay.close.snap_to already replaces the stale marker; \
135 do not also set delay.close.stale_marker: false"
136 )]
137 CloseEmitConflict { source_id: String },
138
139 #[error(
140 "entry '{source_id}': while.value must be a finite number; \
141 NaN and infinity are rejected because the strict comparison gate \
142 would never resolve deterministically"
143 )]
144 WhileValueIsNan { source_id: String },
145
146 #[error(
147 "entry '{source_id}': delay.close.snap_to must be a finite number; \
148 NaN and infinity cannot be emitted as a recovery sample"
149 )]
150 CloseSnapToIsNan { source_id: String },
151}
152
153#[derive(Debug, Clone)]
180#[cfg_attr(feature = "config", derive(serde::Serialize))]
181pub struct NormalizedFile {
182 pub version: u32,
184 #[cfg_attr(feature = "config", serde(skip_serializing_if = "Option::is_none"))]
187 pub scenario_name: Option<String>,
188 #[cfg_attr(feature = "config", serde(skip_serializing_if = "Option::is_none"))]
201 pub defaults_labels: Option<BTreeMap<String, String>>,
202 pub entries: Vec<NormalizedEntry>,
205}
206
207#[derive(Debug, Clone)]
219#[cfg_attr(feature = "config", derive(serde::Serialize))]
220pub struct NormalizedEntry {
221 pub id: Option<String>,
223 pub signal_type: String,
225 pub name: Option<String>,
227 pub rate: f64,
229 pub duration: Option<String>,
232 pub generator: Option<GeneratorConfig>,
234 pub log_generator: Option<LogGeneratorConfig>,
236 pub labels: Option<BTreeMap<String, String>>,
247 pub dynamic_labels: Option<Vec<DynamicLabelConfig>>,
249 pub encoder: EncoderConfig,
251 pub sink: SinkConfig,
253 pub jitter: Option<f64>,
255 pub jitter_seed: Option<u64>,
257 pub gaps: Option<GapConfig>,
259 pub bursts: Option<BurstConfig>,
261 pub cardinality_spikes: Option<Vec<CardinalitySpikeConfig>>,
263 pub phase_offset: Option<String>,
265 pub clock_group: Option<String>,
267 pub after: Option<AfterClause>,
269 #[cfg_attr(feature = "config", serde(skip_serializing_if = "Option::is_none"))]
271 pub while_clause: Option<WhileClause>,
272 #[cfg_attr(feature = "config", serde(skip_serializing_if = "Option::is_none"))]
274 pub delay_clause: Option<DelayClause>,
275
276 pub pack: Option<String>,
279 pub overrides: Option<BTreeMap<String, MetricOverride>>,
281
282 pub distribution: Option<DistributionConfig>,
285 pub buckets: Option<Vec<f64>>,
287 pub quantiles: Option<Vec<f64>>,
289 pub observations_per_tick: Option<u32>,
291 pub mean_shift_per_sec: Option<f64>,
293 pub seed: Option<u64>,
295 pub on_sink_error: OnSinkError,
297}
298
299pub fn normalize(file: ScenarioFile) -> Result<NormalizedFile, NormalizeError> {
333 let defaults = file.defaults;
334 let defaults_labels = defaults
335 .as_ref()
336 .and_then(|d| d.labels.as_ref())
337 .filter(|m| !m.is_empty())
338 .cloned();
339 let mut entries = Vec::with_capacity(file.scenarios.len());
340
341 for (index, entry) in file.scenarios.into_iter().enumerate() {
342 entries.push(normalize_entry(entry, index, defaults.as_ref())?);
343 }
344
345 Ok(NormalizedFile {
346 version: file.version,
347 scenario_name: file.scenario_name,
348 defaults_labels,
349 entries,
350 })
351}
352
353fn normalize_entry(
364 entry: Entry,
365 index: usize,
366 defaults: Option<&Defaults>,
367) -> Result<NormalizedEntry, NormalizeError> {
368 let rate = resolve_rate(&entry, defaults, index)?;
369 let diagnostic_label = entry_label_for_diagnostic(&entry, index);
370 let duration = entry
371 .duration
372 .or_else(|| defaults.and_then(|d| d.duration.clone()));
373 let encoder = entry
374 .encoder
375 .or_else(|| defaults.and_then(|d| d.encoder.clone()))
376 .unwrap_or_else(|| default_encoder_for(&entry.signal_type));
377 let sink = entry
378 .sink
379 .or_else(|| defaults.and_then(|d| d.sink.clone()))
380 .unwrap_or_else(default_sink);
381 let labels = if entry.pack.is_some() {
382 entry.labels
386 } else {
387 merge_labels(defaults.and_then(|d| d.labels.as_ref()), entry.labels)
388 };
389 let on_sink_error = entry
390 .on_sink_error
391 .or_else(|| defaults.and_then(|d| d.on_sink_error))
392 .unwrap_or_default();
393
394 let while_clause = entry
395 .while_clause
396 .or_else(|| defaults.and_then(|d| d.while_clause.clone()));
397 let delay_clause = entry
398 .delay_clause
399 .or_else(|| defaults.and_then(|d| d.delay_clause.clone()));
400
401 if delay_clause.is_some() && while_clause.is_none() {
402 return Err(NormalizeError::DelayWithoutWhile {
403 source_id: diagnostic_label,
404 });
405 }
406 if while_clause.is_some() && duration.is_none() {
407 return Err(NormalizeError::WhileWithoutDuration {
408 source_id: diagnostic_label,
409 });
410 }
411 if let Some(w) = while_clause.as_ref() {
412 if !w.value.is_finite() {
413 return Err(NormalizeError::WhileValueIsNan {
414 source_id: diagnostic_label,
415 });
416 }
417 }
418 if let Some(d) = delay_clause.as_ref() {
419 if d.close_snap_to.is_some() && d.close_stale_marker == Some(false) {
420 return Err(NormalizeError::CloseEmitConflict {
421 source_id: diagnostic_label,
422 });
423 }
424 if let Some(v) = d.close_snap_to {
425 if !v.is_finite() {
426 return Err(NormalizeError::CloseSnapToIsNan {
427 source_id: diagnostic_label,
428 });
429 }
430 }
431 }
432
433 Ok(NormalizedEntry {
434 id: entry.id,
435 signal_type: entry.signal_type,
436 name: entry.name,
437 rate,
438 duration,
439 generator: entry.generator,
440 log_generator: entry.log_generator,
441 labels,
442 dynamic_labels: entry.dynamic_labels,
443 encoder,
444 sink,
445 jitter: entry.jitter,
446 jitter_seed: entry.jitter_seed,
447 gaps: entry.gaps,
448 bursts: entry.bursts,
449 cardinality_spikes: entry.cardinality_spikes,
450 phase_offset: entry.phase_offset,
451 clock_group: entry.clock_group,
452 after: entry.after,
453 while_clause,
454 delay_clause,
455 pack: entry.pack,
456 overrides: entry.overrides,
457 distribution: entry.distribution,
458 buckets: entry.buckets,
459 quantiles: entry.quantiles,
460 observations_per_tick: entry.observations_per_tick,
461 mean_shift_per_sec: entry.mean_shift_per_sec,
462 seed: entry.seed,
463 on_sink_error,
464 })
465}
466
467fn resolve_rate(
470 entry: &Entry,
471 defaults: Option<&Defaults>,
472 index: usize,
473) -> Result<f64, NormalizeError> {
474 if let Some(rate) = entry.rate {
475 return Ok(rate);
476 }
477 if let Some(rate) = defaults.and_then(|d| d.rate) {
478 return Ok(rate);
479 }
480 Err(NormalizeError::MissingRate {
481 index,
482 label: entry_label(entry),
483 })
484}
485
486fn entry_label(entry: &Entry) -> String {
490 entry
491 .name
492 .clone()
493 .or_else(|| entry.id.clone())
494 .or_else(|| entry.pack.clone())
495 .unwrap_or_else(|| "<unnamed>".to_string())
496}
497
498fn entry_label_for_diagnostic(entry: &Entry, index: usize) -> String {
499 entry
500 .id
501 .clone()
502 .or_else(|| entry.name.clone())
503 .or_else(|| entry.pack.clone())
504 .unwrap_or_else(|| format!("<entry {index}>"))
505}
506
507fn default_encoder_for(signal_type: &str) -> EncoderConfig {
514 match signal_type {
515 "logs" => EncoderConfig::JsonLines { precision: None },
516 _ => EncoderConfig::PrometheusText { precision: None },
517 }
518}
519
520fn default_sink() -> SinkConfig {
522 SinkConfig::Stdout
523}
524
525fn merge_labels(
530 defaults_labels: Option<&BTreeMap<String, String>>,
531 entry_labels: Option<BTreeMap<String, String>>,
532) -> Option<BTreeMap<String, String>> {
533 match (defaults_labels, entry_labels) {
534 (None, None) => None,
535 (Some(d), None) => Some(d.clone()),
536 (None, Some(e)) => Some(e),
537 (Some(d), Some(e)) => {
538 let mut merged = d.clone();
539 for (k, v) in e {
540 merged.insert(k, v);
541 }
542 Some(merged)
543 }
544 }
545}
546
547#[cfg(test)]
552mod tests {
553 use super::super::parse::parse;
554 use super::*;
555
556 fn normalize_yaml(yaml: &str) -> Result<NormalizedFile, NormalizeError> {
561 let parsed = parse(yaml).expect("parse must succeed in normalization tests");
562 normalize(parsed)
563 }
564
565 fn is_prometheus_text(encoder: &EncoderConfig) -> bool {
566 matches!(encoder, EncoderConfig::PrometheusText { .. })
567 }
568
569 fn is_json_lines(encoder: &EncoderConfig) -> bool {
570 matches!(encoder, EncoderConfig::JsonLines { .. })
571 }
572
573 fn is_stdout(sink: &SinkConfig) -> bool {
574 matches!(sink, SinkConfig::Stdout)
575 }
576
577 #[test]
582 fn entry_inherits_rate_and_duration_from_defaults() {
583 let yaml = r#"
584version: 2
585defaults:
586 rate: 1
587 duration: 5m
588scenarios:
589 - signal_type: metrics
590 name: cpu
591 generator: { type: constant, value: 42 }
592"#;
593 let file = normalize_yaml(yaml).expect("must normalize");
594 let entry = &file.entries[0];
595 assert!((entry.rate - 1.0).abs() < f64::EPSILON);
596 assert_eq!(entry.duration.as_deref(), Some("5m"));
597 }
598
599 #[test]
600 fn entry_rate_overrides_defaults_rate() {
601 let yaml = r#"
602version: 2
603defaults:
604 rate: 1
605scenarios:
606 - signal_type: metrics
607 name: cpu
608 rate: 10
609 generator: { type: constant, value: 42 }
610"#;
611 let file = normalize_yaml(yaml).expect("must normalize");
612 assert!((file.entries[0].rate - 10.0).abs() < f64::EPSILON);
613 }
614
615 #[test]
616 fn entry_duration_overrides_defaults_duration() {
617 let yaml = r#"
618version: 2
619defaults:
620 rate: 1
621 duration: 5m
622scenarios:
623 - signal_type: metrics
624 name: cpu
625 duration: 30s
626 generator: { type: constant, value: 42 }
627"#;
628 let file = normalize_yaml(yaml).expect("must normalize");
629 assert_eq!(file.entries[0].duration.as_deref(), Some("30s"));
630 }
631
632 #[test]
633 fn entry_inherits_encoder_and_sink_from_defaults() {
634 let yaml = r#"
635version: 2
636defaults:
637 rate: 1
638 encoder: { type: influx_lp }
639 sink: { type: file, path: /tmp/out.txt }
640scenarios:
641 - signal_type: metrics
642 name: cpu
643 generator: { type: constant, value: 42 }
644"#;
645 let file = normalize_yaml(yaml).expect("must normalize");
646 let entry = &file.entries[0];
647 assert!(matches!(
648 entry.encoder,
649 EncoderConfig::InfluxLineProtocol { .. }
650 ));
651 assert!(matches!(entry.sink, SinkConfig::File { .. }));
652 }
653
654 #[test]
655 fn entry_encoder_overrides_defaults_encoder() {
656 let yaml = r#"
657version: 2
658defaults:
659 rate: 1
660 encoder: { type: influx_lp }
661scenarios:
662 - signal_type: metrics
663 name: cpu
664 encoder: { type: prometheus_text }
665 generator: { type: constant, value: 42 }
666"#;
667 let file = normalize_yaml(yaml).expect("must normalize");
668 assert!(is_prometheus_text(&file.entries[0].encoder));
669 }
670
671 #[derive(Copy, Clone)]
679 enum ExpectedEncoder {
680 PrometheusText,
681 JsonLines,
682 }
683
684 #[rustfmt::skip]
685 #[rstest::rstest]
686 #[case::metrics(r#"
687version: 2
688scenarios:
689 - signal_type: metrics
690 name: cpu
691 rate: 1
692 generator: { type: constant, value: 42 }
693"#, ExpectedEncoder::PrometheusText)]
694 #[case::histogram(r#"
695version: 2
696scenarios:
697 - signal_type: histogram
698 name: http_latency
699 rate: 1
700 distribution: { type: exponential, rate: 10.0 }
701 buckets: [0.1, 0.5, 1.0]
702 observations_per_tick: 50
703 seed: 1
704"#, ExpectedEncoder::PrometheusText)]
705 #[case::summary(r#"
706version: 2
707scenarios:
708 - signal_type: summary
709 name: rpc_latency
710 rate: 1
711 distribution: { type: normal, mean: 0.1, stddev: 0.02 }
712 quantiles: [0.5, 0.9, 0.99]
713 observations_per_tick: 50
714 seed: 1
715"#, ExpectedEncoder::PrometheusText)]
716 #[case::logs(r#"
717version: 2
718scenarios:
719 - signal_type: logs
720 name: app_logs
721 rate: 1
722 log_generator:
723 type: template
724 templates:
725 - message: "hello"
726"#, ExpectedEncoder::JsonLines)]
727 fn signal_type_picks_built_in_encoder_and_stdout_sink(
728 #[case] yaml: &str,
729 #[case] expected: ExpectedEncoder,
730 ) {
731 let file = normalize_yaml(yaml).expect("must normalize");
732 let entry = &file.entries[0];
733 match expected {
734 ExpectedEncoder::PrometheusText => assert!(is_prometheus_text(&entry.encoder)),
735 ExpectedEncoder::JsonLines => assert!(is_json_lines(&entry.encoder)),
736 }
737 assert!(is_stdout(&entry.sink));
738 }
739
740 #[test]
745 fn labels_merge_entry_wins_on_conflict() {
746 let yaml = r#"
747version: 2
748defaults:
749 rate: 1
750 labels:
751 device: rtr-edge-01
752 region: us-west-2
753scenarios:
754 - signal_type: metrics
755 name: cpu
756 labels:
757 region: us-east-1
758 interface: Gi0/0/0
759 generator: { type: constant, value: 42 }
760"#;
761 let file = normalize_yaml(yaml).expect("must normalize");
762 let labels = file.entries[0]
763 .labels
764 .as_ref()
765 .expect("merged labels must exist");
766 assert_eq!(
767 labels.get("device").map(String::as_str),
768 Some("rtr-edge-01")
769 );
770 assert_eq!(
771 labels.get("region").map(String::as_str),
772 Some("us-east-1"),
773 "entry value must win on conflict"
774 );
775 assert_eq!(labels.get("interface").map(String::as_str), Some("Gi0/0/0"));
776 }
777
778 #[test]
779 fn labels_from_defaults_alone_are_preserved() {
780 let yaml = r#"
781version: 2
782defaults:
783 rate: 1
784 labels:
785 env: staging
786scenarios:
787 - signal_type: metrics
788 name: cpu
789 generator: { type: constant, value: 42 }
790"#;
791 let file = normalize_yaml(yaml).expect("must normalize");
792 let labels = file.entries[0].labels.as_ref().expect("labels must exist");
793 assert_eq!(labels.get("env").map(String::as_str), Some("staging"));
794 assert_eq!(labels.len(), 1);
795 }
796
797 #[test]
798 fn entry_labels_preserved_when_defaults_has_no_labels() {
799 let yaml = r#"
800version: 2
801defaults:
802 rate: 1
803scenarios:
804 - signal_type: metrics
805 name: cpu
806 labels:
807 job: api
808 generator: { type: constant, value: 42 }
809"#;
810 let file = normalize_yaml(yaml).expect("must normalize");
811 let labels = file.entries[0].labels.as_ref().expect("labels must exist");
812 assert_eq!(labels.get("job").map(String::as_str), Some("api"));
813 assert_eq!(labels.len(), 1);
814 }
815
816 #[test]
817 fn no_labels_anywhere_produces_none() {
818 let yaml = r#"
819version: 2
820scenarios:
821 - signal_type: metrics
822 name: cpu
823 rate: 1
824 generator: { type: constant, value: 42 }
825"#;
826 let file = normalize_yaml(yaml).expect("must normalize");
827 assert!(file.entries[0].labels.is_none());
828 }
829
830 #[rustfmt::skip]
841 #[rstest::rstest]
842 #[case::inline_uses_name(r#"
843version: 2
844scenarios:
845 - signal_type: metrics
846 name: cpu
847 generator: { type: constant, value: 1.0 }
848"#, "cpu")]
849 #[case::pack_prefers_id(r#"
850version: 2
851scenarios:
852 - id: snmp_iface
853 signal_type: metrics
854 pack: telegraf_snmp_interface
855"#, "snmp_iface")]
856 #[case::pack_falls_back_to_pack_name(r#"
857version: 2
858scenarios:
859 - signal_type: metrics
860 pack: telegraf_snmp_interface
861"#, "telegraf_snmp_interface")]
862 fn missing_rate_error_label_follows_priority_chain(
863 #[case] yaml: &str,
864 #[case] expected_label: &str,
865 ) {
866 let err = normalize_yaml(yaml).expect_err("missing rate must fail");
867 match err {
868 NormalizeError::MissingRate { index, label } => {
869 assert_eq!(index, 0);
870 assert_eq!(label, expected_label);
871 }
872 other => panic!("expected MissingRate, got {other:?}"),
873 }
874 }
875
876 #[test]
877 fn missing_rate_message_mentions_entry_and_hint() {
878 let yaml = r#"
879version: 2
880scenarios:
881 - signal_type: metrics
882 name: bare
883 generator: { type: constant, value: 1.0 }
884"#;
885 let err = normalize_yaml(yaml).expect_err("missing rate must fail");
886 let msg = err.to_string();
887 assert!(msg.contains("entry 0"), "error should mention entry index");
888 assert!(msg.contains("bare"), "error should mention entry name");
889 assert!(msg.contains("rate"), "error should mention rate");
890 assert!(
891 msg.contains("defaults"),
892 "error should hint at defaults block"
893 );
894 }
895
896 #[test]
901 fn shorthand_single_signal_normalizes_through_wrapped_form() {
902 let yaml = r#"
903version: 2
904name: cpu_usage
905signal_type: metrics
906rate: 5
907generator: { type: constant, value: 42 }
908"#;
909 let file = normalize_yaml(yaml).expect("must normalize shorthand");
910 assert_eq!(file.entries.len(), 1);
911 let entry = &file.entries[0];
912 assert!((entry.rate - 5.0).abs() < f64::EPSILON);
913 assert_eq!(entry.name.as_deref(), Some("cpu_usage"));
914 assert!(is_prometheus_text(&entry.encoder));
915 assert!(is_stdout(&entry.sink));
916 }
917
918 #[test]
919 fn shorthand_logs_signal_picks_json_lines_default() {
920 let yaml = r#"
921version: 2
922name: app_logs
923signal_type: logs
924rate: 2
925log_generator:
926 type: template
927 templates:
928 - message: "hello"
929"#;
930 let file = normalize_yaml(yaml).expect("must normalize logs shorthand");
931 assert!(is_json_lines(&file.entries[0].encoder));
932 }
933
934 #[test]
939 fn pack_entry_inherits_defaults_but_defers_label_merge() {
940 let yaml = r#"
953version: 2
954defaults:
955 rate: 1
956 duration: 10m
957 encoder: { type: prometheus_text }
958 sink: { type: stdout }
959 labels:
960 job: web
961scenarios:
962 - id: primary_uplink
963 signal_type: metrics
964 pack: mypack
965 labels:
966 device: rtr-01
967 overrides:
968 ifOperStatus:
969 generator: { type: constant, value: 0.0 }
970"#;
971 let file = normalize_yaml(yaml).expect("must normalize pack entry");
972 let entry = &file.entries[0];
973 assert_eq!(entry.pack.as_deref(), Some("mypack"));
974 assert!(
975 entry.overrides.is_some(),
976 "overrides must be carried through untouched"
977 );
978 assert!((entry.rate - 1.0).abs() < f64::EPSILON);
979 assert_eq!(entry.duration.as_deref(), Some("10m"));
980 assert!(is_prometheus_text(&entry.encoder));
981 assert!(is_stdout(&entry.sink));
982
983 let labels = entry.labels.as_ref().expect("entry labels must exist");
985 assert_eq!(labels.len(), 1, "only entry labels — defaults not merged");
986 assert_eq!(labels.get("device").map(String::as_str), Some("rtr-01"));
987 assert!(
988 !labels.contains_key("job"),
989 "defaults.labels must not leak into pack entry's labels"
990 );
991
992 let d = file
995 .defaults_labels
996 .as_ref()
997 .expect("defaults_labels must be surfaced");
998 assert_eq!(d.get("job").map(String::as_str), Some("web"));
999 }
1000
1001 #[test]
1002 fn normalized_file_defaults_labels_matches_source() {
1003 let yaml_with = r#"
1005version: 2
1006defaults:
1007 rate: 1
1008 labels:
1009 env: prod
1010 region: us-east-1
1011scenarios:
1012 - signal_type: metrics
1013 name: cpu
1014 generator: { type: constant, value: 42 }
1015"#;
1016 let file = normalize_yaml(yaml_with).expect("must normalize");
1017 let d = file
1018 .defaults_labels
1019 .as_ref()
1020 .expect("defaults_labels must be Some when defaults.labels is set");
1021 assert_eq!(d.len(), 2);
1022 assert_eq!(d.get("env").map(String::as_str), Some("prod"));
1023 assert_eq!(d.get("region").map(String::as_str), Some("us-east-1"));
1024
1025 let yaml_no_defaults = r#"
1027version: 2
1028scenarios:
1029 - signal_type: metrics
1030 name: cpu
1031 rate: 1
1032 generator: { type: constant, value: 42 }
1033"#;
1034 let file = normalize_yaml(yaml_no_defaults).expect("must normalize");
1035 assert!(file.defaults_labels.is_none());
1036
1037 let yaml_no_labels = r#"
1039version: 2
1040defaults:
1041 rate: 1
1042 duration: 5m
1043scenarios:
1044 - signal_type: metrics
1045 name: cpu
1046 generator: { type: constant, value: 42 }
1047"#;
1048 let file = normalize_yaml(yaml_no_labels).expect("must normalize");
1049 assert!(file.defaults_labels.is_none());
1050 }
1051
1052 #[test]
1053 fn inline_and_pack_entries_compose_defaults_labels_asymmetrically() {
1054 let yaml = r#"
1058version: 2
1059defaults:
1060 rate: 1
1061 labels:
1062 job: web
1063 region: us-east-1
1064scenarios:
1065 - signal_type: metrics
1066 name: cpu
1067 labels:
1068 host: node-01
1069 generator: { type: constant, value: 42 }
1070
1071 - signal_type: metrics
1072 pack: mypack
1073 labels:
1074 device: rtr-01
1075"#;
1076 let file = normalize_yaml(yaml).expect("must normalize");
1077 assert_eq!(file.entries.len(), 2);
1078
1079 let inline = &file.entries[0];
1081 assert!(inline.pack.is_none());
1082 let inline_labels = inline.labels.as_ref().expect("inline labels must exist");
1083 assert_eq!(inline_labels.len(), 3, "defaults + entry merged");
1084 assert_eq!(inline_labels.get("job").map(String::as_str), Some("web"));
1085 assert_eq!(
1086 inline_labels.get("region").map(String::as_str),
1087 Some("us-east-1")
1088 );
1089 assert_eq!(
1090 inline_labels.get("host").map(String::as_str),
1091 Some("node-01")
1092 );
1093
1094 let pack = &file.entries[1];
1096 assert_eq!(pack.pack.as_deref(), Some("mypack"));
1097 let pack_labels = pack.labels.as_ref().expect("pack entry labels must exist");
1098 assert_eq!(pack_labels.len(), 1, "only entry-level labels, no merge");
1099 assert_eq!(
1100 pack_labels.get("device").map(String::as_str),
1101 Some("rtr-01")
1102 );
1103 assert!(!pack_labels.contains_key("job"));
1104 assert!(!pack_labels.contains_key("region"));
1105
1106 let d = file
1108 .defaults_labels
1109 .as_ref()
1110 .expect("defaults_labels must be Some");
1111 assert_eq!(d.len(), 2);
1112 assert_eq!(d.get("job").map(String::as_str), Some("web"));
1113 assert_eq!(d.get("region").map(String::as_str), Some("us-east-1"));
1114 }
1115
1116 #[test]
1121 fn multi_scenario_mixed_entries_all_normalize() {
1122 let yaml = r#"
1123version: 2
1124defaults:
1125 rate: 1
1126 duration: 5m
1127 encoder: { type: prometheus_text }
1128 sink: { type: stdout }
1129 labels:
1130 region: us-west-2
1131scenarios:
1132 - id: link_state
1133 signal_type: metrics
1134 name: interface_oper_state
1135 labels:
1136 interface: Gi0/0/0
1137 region: us-east-1
1138 generator: { type: flap, up_duration: 60s, down_duration: 30s }
1139
1140 - id: fast_metric
1141 signal_type: metrics
1142 name: cpu
1143 rate: 10
1144 generator: { type: constant, value: 42 }
1145
1146 - signal_type: logs
1147 name: app_logs
1148 log_generator:
1149 type: template
1150 templates:
1151 - message: "hello"
1152
1153 - signal_type: metrics
1154 pack: telegraf_snmp_interface
1155 labels:
1156 device: rtr-01
1157"#;
1158 let file = normalize_yaml(yaml).expect("must normalize multi-scenario");
1159 assert_eq!(file.entries.len(), 4);
1160
1161 let e0 = &file.entries[0];
1164 assert!((e0.rate - 1.0).abs() < f64::EPSILON);
1165 assert_eq!(e0.duration.as_deref(), Some("5m"));
1166 assert!(is_prometheus_text(&e0.encoder));
1167 let labels0 = e0.labels.as_ref().expect("labels must exist");
1168 assert_eq!(labels0.get("region").map(String::as_str), Some("us-east-1"));
1169 assert_eq!(
1170 labels0.get("interface").map(String::as_str),
1171 Some("Gi0/0/0")
1172 );
1173
1174 let e1 = &file.entries[1];
1176 assert!((e1.rate - 10.0).abs() < f64::EPSILON);
1177 assert_eq!(e1.duration.as_deref(), Some("5m"));
1178 let labels1 = e1.labels.as_ref().expect("labels must exist");
1179 assert_eq!(
1180 labels1.get("region").map(String::as_str),
1181 Some("us-west-2"),
1182 "entry has no labels.region, defaults wins"
1183 );
1184
1185 let e2 = &file.entries[2];
1191 assert!(
1192 is_prometheus_text(&e2.encoder),
1193 "explicit defaults.encoder applies to all entries including logs"
1194 );
1195
1196 let e3 = &file.entries[3];
1199 assert_eq!(e3.pack.as_deref(), Some("telegraf_snmp_interface"));
1200 let labels3 = e3.labels.as_ref().expect("labels must exist");
1201 assert_eq!(labels3.len(), 1, "only entry-level labels on pack entry");
1202 assert_eq!(labels3.get("device").map(String::as_str), Some("rtr-01"));
1203 assert!(!labels3.contains_key("region"));
1204
1205 let d = file
1207 .defaults_labels
1208 .as_ref()
1209 .expect("defaults_labels must be Some");
1210 assert_eq!(d.get("region").map(String::as_str), Some("us-west-2"));
1211 }
1212
1213 #[test]
1218 fn after_clause_and_timing_fields_preserved() {
1219 let yaml = r#"
1220version: 2
1221defaults:
1222 rate: 1
1223scenarios:
1224 - id: src
1225 signal_type: metrics
1226 name: source
1227 generator: { type: constant, value: 100.0 }
1228
1229 - signal_type: metrics
1230 name: dependent
1231 phase_offset: 5s
1232 clock_group: group_a
1233 generator: { type: constant, value: 1.0 }
1234 after:
1235 ref: src
1236 op: ">"
1237 value: 50.0
1238 delay: 2s
1239"#;
1240 let file = normalize_yaml(yaml).expect("must normalize");
1241 let dep = &file.entries[1];
1242 assert_eq!(dep.phase_offset.as_deref(), Some("5s"));
1243 assert_eq!(dep.clock_group.as_deref(), Some("group_a"));
1244 let after = dep.after.as_ref().expect("after must be preserved");
1245 assert_eq!(after.ref_id, "src");
1246 assert_eq!(after.delay.as_deref(), Some("2s"));
1247 }
1248
1249 #[test]
1250 fn histogram_fields_preserved() {
1251 let yaml = r#"
1252version: 2
1253defaults:
1254 rate: 1
1255scenarios:
1256 - signal_type: histogram
1257 name: latency
1258 distribution: { type: exponential, rate: 10.0 }
1259 buckets: [0.1, 0.5, 1.0]
1260 observations_per_tick: 100
1261 mean_shift_per_sec: 0.01
1262 seed: 42
1263"#;
1264 let file = normalize_yaml(yaml).expect("must normalize");
1265 let entry = &file.entries[0];
1266 assert!(entry.distribution.is_some());
1267 assert_eq!(entry.buckets.as_ref().map(Vec::len), Some(3));
1268 assert_eq!(entry.observations_per_tick, Some(100));
1269 assert_eq!(entry.mean_shift_per_sec, Some(0.01));
1270 assert_eq!(entry.seed, Some(42));
1271 }
1272
1273 #[test]
1278 fn normalize_error_is_send_and_sync() {
1279 fn assert_send_sync<T: Send + Sync>() {}
1280 assert_send_sync::<NormalizeError>();
1281 }
1282
1283 #[test]
1284 fn normalized_types_are_send_and_sync() {
1285 fn assert_send_sync<T: Send + Sync>() {}
1286 assert_send_sync::<NormalizedFile>();
1287 assert_send_sync::<NormalizedEntry>();
1288 }
1289
1290 #[test]
1295 fn empty_scenarios_list_normalizes_to_empty_entries() {
1296 let yaml = r#"
1297version: 2
1298scenarios: []
1299"#;
1300 let file = normalize_yaml(yaml).expect("must normalize empty list");
1301 assert_eq!(file.version, 2);
1302 assert!(file.entries.is_empty());
1303 }
1304
1305 #[test]
1310 fn merge_labels_both_none_returns_none() {
1311 assert!(merge_labels(None, None).is_none());
1312 }
1313
1314 #[test]
1315 fn merge_labels_only_defaults_returns_defaults_clone() {
1316 let mut d = BTreeMap::new();
1317 d.insert("a".to_string(), "1".to_string());
1318 let merged = merge_labels(Some(&d), None).expect("must return map");
1319 assert_eq!(merged.get("a").map(String::as_str), Some("1"));
1320 }
1321
1322 #[test]
1323 fn merge_labels_only_entry_returns_entry() {
1324 let mut e = BTreeMap::new();
1325 e.insert("b".to_string(), "2".to_string());
1326 let merged = merge_labels(None, Some(e)).expect("must return map");
1327 assert_eq!(merged.get("b").map(String::as_str), Some("2"));
1328 }
1329
1330 #[test]
1331 fn merge_labels_entry_overrides_defaults_on_conflict() {
1332 let mut d = BTreeMap::new();
1333 d.insert("k".to_string(), "from_defaults".to_string());
1334 let mut e = BTreeMap::new();
1335 e.insert("k".to_string(), "from_entry".to_string());
1336 let merged = merge_labels(Some(&d), Some(e)).expect("must return map");
1337 assert_eq!(merged.get("k").map(String::as_str), Some("from_entry"));
1338 }
1339
1340 #[rustfmt::skip]
1341 #[rstest::rstest]
1342 #[case::metrics("metrics", ExpectedEncoder::PrometheusText)]
1343 #[case::histogram("histogram", ExpectedEncoder::PrometheusText)]
1344 #[case::summary("summary", ExpectedEncoder::PrometheusText)]
1345 #[case::logs("logs", ExpectedEncoder::JsonLines)]
1346 fn default_encoder_per_signal_type(
1347 #[case] signal_type: &str,
1348 #[case] expected: ExpectedEncoder,
1349 ) {
1350 let encoder = default_encoder_for(signal_type);
1351 match expected {
1352 ExpectedEncoder::PrometheusText => {
1353 assert!(matches!(encoder, EncoderConfig::PrometheusText { .. }))
1354 }
1355 ExpectedEncoder::JsonLines => {
1356 assert!(matches!(encoder, EncoderConfig::JsonLines { .. }))
1357 }
1358 }
1359 }
1360
1361 #[test]
1362 fn default_sink_is_stdout() {
1363 assert!(matches!(default_sink(), SinkConfig::Stdout));
1364 }
1365
1366 #[test]
1367 fn while_without_duration_is_rejected() {
1368 let yaml = r#"
1369version: 2
1370defaults:
1371 rate: 1
1372scenarios:
1373 - id: src
1374 signal_type: metrics
1375 name: src
1376 generator: { type: constant, value: 1 }
1377 - id: gated
1378 signal_type: metrics
1379 name: gated
1380 generator: { type: constant, value: 1 }
1381 while: { ref: src, op: ">", value: 0 }
1382"#;
1383 let err = normalize_yaml(yaml).expect_err("missing duration must fail");
1384 match err {
1385 NormalizeError::WhileWithoutDuration { source_id } => {
1386 assert_eq!(source_id, "gated");
1387 }
1388 other => panic!("expected WhileWithoutDuration, got {other:?}"),
1389 }
1390 }
1391
1392 #[test]
1393 fn defaults_duration_satisfies_while_without_duration() {
1394 let yaml = r#"
1395version: 2
1396defaults:
1397 rate: 1
1398 duration: 5m
1399scenarios:
1400 - id: src
1401 signal_type: metrics
1402 name: src
1403 generator: { type: constant, value: 1 }
1404 - id: gated
1405 signal_type: metrics
1406 name: gated
1407 generator: { type: constant, value: 1 }
1408 while: { ref: src, op: ">", value: 0 }
1409"#;
1410 let file = normalize_yaml(yaml).expect("defaults.duration satisfies the gate");
1411 assert!(file.entries[1].while_clause.is_some());
1412 assert_eq!(file.entries[1].duration.as_deref(), Some("5m"));
1413 }
1414
1415 #[test]
1416 fn delay_without_while_is_rejected() {
1417 let yaml = r#"
1418version: 2
1419defaults:
1420 rate: 1
1421 duration: 1m
1422scenarios:
1423 - id: gated
1424 signal_type: metrics
1425 name: gated
1426 generator: { type: constant, value: 1 }
1427 delay: { open: "5s", close: "10s" }
1428"#;
1429 let err = normalize_yaml(yaml).expect_err("delay without while must fail");
1430 match err {
1431 NormalizeError::DelayWithoutWhile { source_id } => {
1432 assert_eq!(source_id, "gated");
1433 }
1434 other => panic!("expected DelayWithoutWhile, got {other:?}"),
1435 }
1436 }
1437
1438 #[test]
1439 fn delay_open_zero_is_accepted() {
1440 let yaml = r#"
1441version: 2
1442defaults:
1443 rate: 1
1444 duration: 5m
1445scenarios:
1446 - id: src
1447 signal_type: metrics
1448 name: src
1449 generator: { type: constant, value: 1 }
1450 - id: gated
1451 signal_type: metrics
1452 name: gated
1453 generator: { type: constant, value: 1 }
1454 while: { ref: src, op: ">", value: 0 }
1455 delay: { open: "0s", close: "5s" }
1456"#;
1457 let file = normalize_yaml(yaml).expect("delay open=0s must parse and normalize");
1458 let gated = file
1459 .entries
1460 .iter()
1461 .find(|e| e.id.as_deref() == Some("gated"))
1462 .unwrap();
1463 let delay = gated.delay_clause.as_ref().expect("delay clause present");
1464 assert_eq!(delay.open, Some(std::time::Duration::ZERO));
1465 assert_eq!(delay.close, Some(std::time::Duration::from_secs(5)));
1466 }
1467
1468 #[test]
1469 fn delay_close_zero_is_accepted() {
1470 let yaml = r#"
1471version: 2
1472defaults:
1473 rate: 1
1474 duration: 5m
1475scenarios:
1476 - id: src
1477 signal_type: metrics
1478 name: src
1479 generator: { type: constant, value: 1 }
1480 - id: gated
1481 signal_type: metrics
1482 name: gated
1483 generator: { type: constant, value: 1 }
1484 while: { ref: src, op: ">", value: 0 }
1485 delay: { open: "250ms", close: "0ms" }
1486"#;
1487 let file = normalize_yaml(yaml).expect("delay close=0ms must parse and normalize");
1488 let gated = file
1489 .entries
1490 .iter()
1491 .find(|e| e.id.as_deref() == Some("gated"))
1492 .unwrap();
1493 let delay = gated.delay_clause.as_ref().expect("delay clause present");
1494 assert_eq!(delay.open, Some(std::time::Duration::from_millis(250)));
1495 assert_eq!(delay.close, Some(std::time::Duration::ZERO));
1496 }
1497
1498 #[rustfmt::skip]
1499 #[rstest::rstest]
1500 #[case::nan(".nan")]
1501 #[case::pos_inf(".inf")]
1502 #[case::neg_inf("-.inf")]
1503 fn while_value_non_finite_is_rejected_at_compile(#[case] yaml_value: &str) {
1504 let yaml = format!(r#"
1505version: 2
1506defaults:
1507 rate: 1
1508 duration: 1m
1509scenarios:
1510 - id: src
1511 signal_type: metrics
1512 name: src
1513 generator: {{ type: constant, value: 1 }}
1514 - id: gated
1515 signal_type: metrics
1516 name: gated
1517 generator: {{ type: constant, value: 1 }}
1518 while: {{ ref: src, op: ">", value: {yaml_value} }}
1519"#);
1520 let err = normalize_yaml(&yaml).expect_err("non-finite while.value must fail");
1521 match err {
1522 NormalizeError::WhileValueIsNan { source_id } => {
1523 assert_eq!(source_id, "gated");
1524 }
1525 other => panic!("expected WhileValueIsNan, got {other:?}"),
1526 }
1527 }
1528
1529 #[rustfmt::skip]
1530 #[rstest::rstest]
1531 #[case::nan(".nan")]
1532 #[case::pos_inf(".inf")]
1533 #[case::neg_inf("-.inf")]
1534 fn close_snap_to_non_finite_is_rejected_at_compile(#[case] yaml_value: &str) {
1535 let yaml = format!(r#"
1536version: 2
1537defaults:
1538 rate: 1
1539 duration: 1m
1540scenarios:
1541 - id: src
1542 signal_type: metrics
1543 name: src
1544 generator: {{ type: constant, value: 1 }}
1545 - id: gated
1546 signal_type: metrics
1547 name: gated
1548 generator: {{ type: constant, value: 1 }}
1549 while: {{ ref: src, op: ">", value: 0 }}
1550 delay:
1551 close:
1552 duration: 5s
1553 snap_to: {yaml_value}
1554"#);
1555 let err = normalize_yaml(&yaml).expect_err("non-finite close.snap_to must fail");
1556 match err {
1557 NormalizeError::CloseSnapToIsNan { source_id } => {
1558 assert_eq!(source_id, "gated");
1559 }
1560 other => panic!("expected CloseSnapToIsNan, got {other:?}"),
1561 }
1562 }
1563
1564 #[test]
1565 fn while_inherits_from_defaults() {
1566 let yaml = r#"
1567version: 2
1568defaults:
1569 rate: 1
1570 duration: 1m
1571 while: { ref: src, op: ">", value: 0 }
1572scenarios:
1573 - id: src
1574 signal_type: metrics
1575 name: src
1576 generator: { type: constant, value: 1 }
1577 - id: gated
1578 signal_type: metrics
1579 name: gated
1580 generator: { type: constant, value: 1 }
1581"#;
1582 let file = normalize_yaml(yaml).expect("defaults while inherits");
1583 let gated = file
1584 .entries
1585 .iter()
1586 .find(|e| e.id.as_deref() == Some("gated"))
1587 .unwrap();
1588 assert!(gated.while_clause.is_some());
1589 }
1590}