Skip to main content

sonda_core/compiler/
normalize.rs

1//! Defaults resolution and entry normalization for v2 scenario files.
2//!
3//! This module implements **Phase 2** of the v2 compilation pipeline: it takes
4//! a [`ScenarioFile`] produced by [`super::parse::parse`] and flattens the
5//! shared `defaults:` block into each entry, applying the documented
6//! precedence rules.
7//!
8//! # Precedence (entry-level fields)
9//!
10//! For every entry, the resolver picks the first non-`None` value in this
11//! order:
12//!
13//! 1. the value set on the entry,
14//! 2. the value set under the file-level `defaults:` block,
15//! 3. a built-in fallback for `encoder` and `sink` (see below).
16//!
17//! The higher-precedence levels (pack `shared_labels`, pack per-metric,
18//! override labels, CLI flags) are not applied here; they belong to later
19//! compilation phases.
20//!
21//! # Built-in fallbacks
22//!
23//! When neither the entry nor `defaults:` sets an encoder, the normalizer
24//! picks one based on the entry's `signal_type`:
25//!
26//! | Signal type | Default encoder  |
27//! |-------------|------------------|
28//! | `metrics`   | `prometheus_text`|
29//! | `histogram` | `prometheus_text`|
30//! | `summary`   | `prometheus_text`|
31//! | `logs`      | `json_lines`     |
32//!
33//! The built-in fallback for `sink` is always `stdout`.
34//!
35//! # Labels merge (inline vs. pack entries)
36//!
37//! Inline entries (those with their own `generator` / `log_generator`) have
38//! no downstream label-composition steps, so their labels are merged eagerly
39//! here: `defaults.labels ∪ entry.labels`, entry keys winning on collision.
40//! If either side is `None` the merged map equals the other side; if both
41//! are `None` the entry keeps `None`.
42//!
43//! Pack entries (`pack: some_name`) behave differently. Per spec §2.2 the
44//! final label map for a pack metric is composed at eight distinct
45//! precedence levels, ordered **low → high** (lowest number is applied
46//! first, each subsequent level overwrites on key collision):
47//!
48//! 1. Sonda built-in defaults (no label default today — listed for parity
49//!    with the non-label precedence chain)
50//! 2. `defaults.labels`
51//! 3. pack definition's top-level fields (shared rate/job, etc.)
52//! 4. pack `shared_labels`
53//! 5. pack per-metric labels
54//! 6. pack entry-level labels (the entry under `scenarios:`)
55//! 7. override-level labels (`entry.overrides[metric].labels`)
56//! 8. CLI flags (applied at runtime, PR 7 scope)
57//!
58//! Eagerly merging levels 2 and 6 into a single map (as we do for inline
59//! entries) would collapse those two layers, making it impossible for pack
60//! expansion to interleave levels 3–5 at their correct precedence. A pack
61//! `shared_labels: { job: snmp }` (level 4) must be able to override a
62//! `defaults.labels: { job: web }` (level 2) while still being overridden
63//! by an entry `labels: { job: api }` (level 6) — which requires preserving
64//! the boundary.
65//!
66//! Therefore, for pack entries, [`NormalizedEntry::labels`] carries **only
67//! the entry's own labels** (unchanged, including `None`) at level 6. The
68//! file-level `defaults.labels` map is surfaced separately on
69//! [`NormalizedFile::defaults_labels`] so pack expansion (Phase 3) can
70//! apply it at precedence level 2.
71//!
72//! # Pack entries (other fields)
73//!
74//! Pack entries still inherit `rate`, `duration`, `encoder`, and `sink`
75//! eagerly — those fields do not participate in pack-level composition, so
76//! there is no benefit to deferring them.
77//!
78//! # Validation
79//!
80//! After merging, every normalized entry must have a concrete `rate`
81//! value; missing `rate` is a compile-time error identifying the entry by
82//! index plus its `name`, falling back to `id`, then to `pack`, then to
83//! `<unnamed>` when none of those are set. Range checks on `rate` (must be
84//! `> 0`) are deferred to the existing validator invoked during
85//! `prepare_entries` in Phase 5.
86
87use std::collections::BTreeMap;
88
89use super::{AfterClause, Defaults, DelayClause, Entry, ScenarioFile, WhileClause};
90use crate::config::{
91    BurstConfig, CardinalitySpikeConfig, DistributionConfig, DynamicLabelConfig, GapConfig,
92    OnSinkError,
93};
94use crate::encoder::EncoderConfig;
95use crate::generator::{GeneratorConfig, LogGeneratorConfig};
96use crate::packs::MetricOverride;
97use crate::sink::SinkConfig;
98
99// ---------------------------------------------------------------------------
100// Error type
101// ---------------------------------------------------------------------------
102
103/// Errors produced during defaults resolution.
104#[derive(Debug, thiserror::Error)]
105#[non_exhaustive]
106pub enum NormalizeError {
107    /// An entry has no `rate` either inline or via the `defaults:` block.
108    ///
109    /// The offending entry is identified by its zero-based index and, when
110    /// available, its `name` or `id` for human-readable diagnostics.
111    #[error("entry {index} ({label}): missing required field 'rate' (set it on the entry or in defaults:)")]
112    MissingRate {
113        /// Zero-based index of the entry in the parsed `scenarios` list.
114        index: usize,
115        /// Human-readable label: the entry's `name`, falling back to `id`,
116        /// falling back to `pack`, falling back to `<unnamed>`.
117        label: String,
118    },
119
120    #[error(
121        "entry '{source_id}': scenarios with `while:` must have `duration:` set \
122         (either on the entry or via `defaults.duration`).\n\
123         Bounds the scenario's lifetime so paused state has a terminal point."
124    )]
125    WhileWithoutDuration { source_id: String },
126
127    #[error(
128        "entry '{source_id}': `delay:` requires `while:` on the same entry. \
129         `delay:` debounces `while:` transitions and has no meaning without it."
130    )]
131    DelayWithoutWhile { source_id: String },
132
133    #[error(
134        "entry '{source_id}': delay.close.snap_to already replaces the stale marker; \
135         do not also set delay.close.stale_marker: false"
136    )]
137    CloseEmitConflict { source_id: String },
138
139    #[error(
140        "entry '{source_id}': while.value must be a finite number; \
141         NaN and infinity are rejected because the strict comparison gate \
142         would never resolve deterministically"
143    )]
144    WhileValueIsNan { source_id: String },
145
146    #[error(
147        "entry '{source_id}': delay.close.snap_to must be a finite number; \
148         NaN and infinity cannot be emitted as a recovery sample"
149    )]
150    CloseSnapToIsNan { source_id: String },
151}
152
153// ---------------------------------------------------------------------------
154// Normalized representation
155// ---------------------------------------------------------------------------
156
157/// A v2 scenario file with all defaults applied.
158///
159/// This is the output of [`normalize`]. The `defaults:` block has been
160/// flattened into each [`NormalizedEntry`] for fields that do not participate
161/// in pack-level composition (`rate`, `duration`, `encoder`, `sink`). The
162/// `defaults.labels` map is handled specially: see the module docs for the
163/// full precedence chain.
164///
165/// # Invariants
166///
167/// - Every entry has a concrete `rate`, `encoder`, and `sink`.
168/// - For **inline** entries, [`NormalizedEntry::labels`] contains the merged
169///   result of `defaults.labels` and the entry's own labels (entry wins on
170///   conflict).
171/// - For **pack** entries, [`NormalizedEntry::labels`] contains only the
172///   entry's own labels (unchanged, possibly `None`). The file-level
173///   `defaults.labels` is carried on [`Self::defaults_labels`] for Phase 3
174///   pack expansion to apply at the correct precedence slot.
175/// - Pack entries retain their `pack` and `overrides` fields untouched —
176///   pack expansion is performed in a later phase.
177/// - `after` clauses, `phase_offset`, and `clock_group` are carried through
178///   unchanged.
179#[derive(Debug, Clone)]
180#[cfg_attr(feature = "config", derive(serde::Serialize))]
181pub struct NormalizedFile {
182    /// Schema version. Always `2` after normalization.
183    pub version: u32,
184    /// File-level `scenario_name` carried verbatim. Pure metadata —
185    /// ignored by every compiler phase, surfaced for runtime conflict checks.
186    #[cfg_attr(feature = "config", serde(skip_serializing_if = "Option::is_none"))]
187    pub scenario_name: Option<String>,
188    /// The file-level `defaults.labels` map, carried forward verbatim for
189    /// later compilation phases to apply at the correct precedence slot.
190    ///
191    /// For pack entries this is the level-2 label layer (per spec §2.2) that
192    /// pack expansion must interleave with pack `shared_labels` (level 4),
193    /// pack per-metric labels (level 5), and entry-level labels (level 6).
194    /// For inline entries the merge is already baked into
195    /// [`NormalizedEntry::labels`] so this map is redundant — but carrying
196    /// it here for both cases keeps the type uniform.
197    ///
198    /// `None` when the source file had no `defaults:` block or when
199    /// `defaults.labels` was omitted or empty.
200    #[cfg_attr(feature = "config", serde(skip_serializing_if = "Option::is_none"))]
201    pub defaults_labels: Option<BTreeMap<String, String>>,
202    /// All entries with defaults applied, in the order they appeared in
203    /// the source file.
204    pub entries: Vec<NormalizedEntry>,
205}
206
207/// A single scenario entry with all defaults resolved.
208///
209/// Fields that could inherit from `defaults:` are now guaranteed to hold a
210/// concrete value (`rate`, `encoder`, `sink`). Fields that do not inherit
211/// (pack references, histogram/summary configuration, `after` clauses)
212/// are carried through unchanged.
213///
214/// This type is deliberately close in shape to [`Entry`] so that later
215/// compilation phases can walk the same field set without a translation
216/// step. The invariants above make the "missing rate/encoder/sink" states
217/// unrepresentable after normalization.
218#[derive(Debug, Clone)]
219#[cfg_attr(feature = "config", derive(serde::Serialize))]
220pub struct NormalizedEntry {
221    /// Unique identifier for causal dependency references (`after.ref`).
222    pub id: Option<String>,
223    /// Signal type: `"metrics"`, `"logs"`, `"histogram"`, or `"summary"`.
224    pub signal_type: String,
225    /// Metric or scenario name. `None` for pack-backed entries.
226    pub name: Option<String>,
227    /// Event rate in events per second. Always set after normalization.
228    pub rate: f64,
229    /// Total run duration (e.g. `"30s"`, `"5m"`). `None` means "run until
230    /// stopped" and is preserved through normalization.
231    pub duration: Option<String>,
232    /// Value generator configuration (metrics signals only).
233    pub generator: Option<GeneratorConfig>,
234    /// Log generator configuration (logs signals only).
235    pub log_generator: Option<LogGeneratorConfig>,
236    /// Static labels attached to every emitted event.
237    ///
238    /// For **inline** entries this is the merged map of `defaults.labels`
239    /// and the entry's own labels, with entry keys winning on conflict.
240    ///
241    /// For **pack** entries this is the entry's own labels **unchanged**
242    /// (possibly `None`). The file-level `defaults.labels` is NOT merged in
243    /// — it is carried separately on [`NormalizedFile::defaults_labels`] so
244    /// pack expansion can apply it at the correct precedence level. See the
245    /// module docs for the full rationale.
246    pub labels: Option<BTreeMap<String, String>>,
247    /// Dynamic (rotating) label configurations.
248    pub dynamic_labels: Option<Vec<DynamicLabelConfig>>,
249    /// Encoder configuration. Always set after normalization.
250    pub encoder: EncoderConfig,
251    /// Sink configuration. Always set after normalization.
252    pub sink: SinkConfig,
253    /// Jitter amplitude applied to generated values.
254    pub jitter: Option<f64>,
255    /// Deterministic seed for jitter RNG.
256    pub jitter_seed: Option<u64>,
257    /// Recurring silent-period configuration.
258    pub gaps: Option<GapConfig>,
259    /// Recurring high-rate burst configuration.
260    pub bursts: Option<BurstConfig>,
261    /// Cardinality spike configurations.
262    pub cardinality_spikes: Option<Vec<CardinalitySpikeConfig>>,
263    /// Phase offset for staggered start within a clock group.
264    pub phase_offset: Option<String>,
265    /// Clock group for coordinated timing across entries.
266    pub clock_group: Option<String>,
267    /// Causal dependency on another signal's value.
268    pub after: Option<AfterClause>,
269    /// Continuous lifecycle gate on another signal's value.
270    #[cfg_attr(feature = "config", serde(skip_serializing_if = "Option::is_none"))]
271    pub while_clause: Option<WhileClause>,
272    /// Open / close debounce windows for `while_clause` transitions.
273    #[cfg_attr(feature = "config", serde(skip_serializing_if = "Option::is_none"))]
274    pub delay_clause: Option<DelayClause>,
275
276    // -- Pack-backed entry fields (carried through untouched) --
277    /// Pack name or file path. Mutually exclusive with `generator`.
278    pub pack: Option<String>,
279    /// Per-metric overrides within the referenced pack.
280    pub overrides: Option<BTreeMap<String, MetricOverride>>,
281
282    // -- Histogram / summary fields (carried through untouched) --
283    /// Distribution model for histogram or summary observations.
284    pub distribution: Option<DistributionConfig>,
285    /// Histogram bucket boundaries (histogram only).
286    pub buckets: Option<Vec<f64>>,
287    /// Summary quantile boundaries (summary only).
288    pub quantiles: Option<Vec<f64>>,
289    /// Number of observations sampled per tick.
290    pub observations_per_tick: Option<u32>,
291    /// Linear drift applied to the distribution mean each second.
292    pub mean_shift_per_sec: Option<f64>,
293    /// Deterministic seed for histogram/summary sampling.
294    pub seed: Option<u64>,
295    /// Resolved sink-error policy.
296    pub on_sink_error: OnSinkError,
297}
298
299// ---------------------------------------------------------------------------
300// Public API
301// ---------------------------------------------------------------------------
302
303/// Resolve the `defaults:` block into every entry of a parsed v2 scenario
304/// file.
305///
306/// The returned [`NormalizedFile`] contains a [`NormalizedEntry`] per input
307/// entry with the following fields materialized:
308///
309/// - `rate` inherits from `defaults.rate` when the entry omits it; missing
310///   on both is an error (see [`NormalizeError::MissingRate`]).
311/// - `duration` inherits from `defaults.duration`; absence is preserved as
312///   "run until stopped".
313/// - `encoder` inherits from `defaults.encoder`, otherwise defaults to
314///   `prometheus_text` for metrics/histogram/summary and `json_lines` for
315///   logs.
316/// - `sink` inherits from `defaults.sink`, otherwise defaults to `stdout`.
317/// - `labels` — **inline entries only**: the union of `defaults.labels` and
318///   the entry's labels (entry wins on conflict). **Pack entries** keep
319///   their own labels unchanged; `defaults.labels` is surfaced on
320///   [`NormalizedFile::defaults_labels`] for Phase 3 pack expansion.
321///
322/// All other fields (pack info, histogram parameters, `after` clause,
323/// `phase_offset`, `clock_group`, jitter, gaps, bursts, cardinality spikes,
324/// dynamic labels, etc.) are carried through untouched.
325///
326/// # Errors
327///
328/// Returns [`NormalizeError::MissingRate`] when an entry has no `rate`
329/// defined inline and the `defaults:` block does not supply one either.
330/// The error message identifies the entry by index and, when available,
331/// its `name`, `id`, or `pack` reference.
332pub fn normalize(file: ScenarioFile) -> Result<NormalizedFile, NormalizeError> {
333    let defaults = file.defaults;
334    let defaults_labels = defaults
335        .as_ref()
336        .and_then(|d| d.labels.as_ref())
337        .filter(|m| !m.is_empty())
338        .cloned();
339    let mut entries = Vec::with_capacity(file.scenarios.len());
340
341    for (index, entry) in file.scenarios.into_iter().enumerate() {
342        entries.push(normalize_entry(entry, index, defaults.as_ref())?);
343    }
344
345    Ok(NormalizedFile {
346        version: file.version,
347        scenario_name: file.scenario_name,
348        defaults_labels,
349        entries,
350    })
351}
352
353// ---------------------------------------------------------------------------
354// Internals
355// ---------------------------------------------------------------------------
356
357/// Apply defaults to a single entry and validate required fields.
358///
359/// For inline entries, `defaults.labels` is merged into the entry's labels
360/// eagerly. For pack entries, the merge is deferred to Phase 3 pack
361/// expansion so the correct §2.2 precedence chain can be applied; see the
362/// module docs for the full rationale.
363fn normalize_entry(
364    entry: Entry,
365    index: usize,
366    defaults: Option<&Defaults>,
367) -> Result<NormalizedEntry, NormalizeError> {
368    let rate = resolve_rate(&entry, defaults, index)?;
369    let diagnostic_label = entry_label_for_diagnostic(&entry, index);
370    let duration = entry
371        .duration
372        .or_else(|| defaults.and_then(|d| d.duration.clone()));
373    let encoder = entry
374        .encoder
375        .or_else(|| defaults.and_then(|d| d.encoder.clone()))
376        .unwrap_or_else(|| default_encoder_for(&entry.signal_type));
377    let sink = entry
378        .sink
379        .or_else(|| defaults.and_then(|d| d.sink.clone()))
380        .unwrap_or_else(default_sink);
381    let labels = if entry.pack.is_some() {
382        // Pack entries defer label composition to Phase 3 expansion; keep
383        // only the entry's own labels here so pack shared/per-metric labels
384        // can be inserted between defaults and entry levels (spec §2.2).
385        entry.labels
386    } else {
387        merge_labels(defaults.and_then(|d| d.labels.as_ref()), entry.labels)
388    };
389    let on_sink_error = entry
390        .on_sink_error
391        .or_else(|| defaults.and_then(|d| d.on_sink_error))
392        .unwrap_or_default();
393
394    let while_clause = entry
395        .while_clause
396        .or_else(|| defaults.and_then(|d| d.while_clause.clone()));
397    let delay_clause = entry
398        .delay_clause
399        .or_else(|| defaults.and_then(|d| d.delay_clause.clone()));
400
401    if delay_clause.is_some() && while_clause.is_none() {
402        return Err(NormalizeError::DelayWithoutWhile {
403            source_id: diagnostic_label,
404        });
405    }
406    if while_clause.is_some() && duration.is_none() {
407        return Err(NormalizeError::WhileWithoutDuration {
408            source_id: diagnostic_label,
409        });
410    }
411    if let Some(w) = while_clause.as_ref() {
412        if !w.value.is_finite() {
413            return Err(NormalizeError::WhileValueIsNan {
414                source_id: diagnostic_label,
415            });
416        }
417    }
418    if let Some(d) = delay_clause.as_ref() {
419        if d.close_snap_to.is_some() && d.close_stale_marker == Some(false) {
420            return Err(NormalizeError::CloseEmitConflict {
421                source_id: diagnostic_label,
422            });
423        }
424        if let Some(v) = d.close_snap_to {
425            if !v.is_finite() {
426                return Err(NormalizeError::CloseSnapToIsNan {
427                    source_id: diagnostic_label,
428                });
429            }
430        }
431    }
432
433    Ok(NormalizedEntry {
434        id: entry.id,
435        signal_type: entry.signal_type,
436        name: entry.name,
437        rate,
438        duration,
439        generator: entry.generator,
440        log_generator: entry.log_generator,
441        labels,
442        dynamic_labels: entry.dynamic_labels,
443        encoder,
444        sink,
445        jitter: entry.jitter,
446        jitter_seed: entry.jitter_seed,
447        gaps: entry.gaps,
448        bursts: entry.bursts,
449        cardinality_spikes: entry.cardinality_spikes,
450        phase_offset: entry.phase_offset,
451        clock_group: entry.clock_group,
452        after: entry.after,
453        while_clause,
454        delay_clause,
455        pack: entry.pack,
456        overrides: entry.overrides,
457        distribution: entry.distribution,
458        buckets: entry.buckets,
459        quantiles: entry.quantiles,
460        observations_per_tick: entry.observations_per_tick,
461        mean_shift_per_sec: entry.mean_shift_per_sec,
462        seed: entry.seed,
463        on_sink_error,
464    })
465}
466
467/// Resolve `rate` from the entry or defaults, producing a diagnostic error
468/// when neither is set.
469fn resolve_rate(
470    entry: &Entry,
471    defaults: Option<&Defaults>,
472    index: usize,
473) -> Result<f64, NormalizeError> {
474    if let Some(rate) = entry.rate {
475        return Ok(rate);
476    }
477    if let Some(rate) = defaults.and_then(|d| d.rate) {
478        return Ok(rate);
479    }
480    Err(NormalizeError::MissingRate {
481        index,
482        label: entry_label(entry),
483    })
484}
485
486/// Pick a human-readable label for an entry for use in error messages.
487///
488/// Preference order: `name` → `id` → `pack` → `<unnamed>`.
489fn entry_label(entry: &Entry) -> String {
490    entry
491        .name
492        .clone()
493        .or_else(|| entry.id.clone())
494        .or_else(|| entry.pack.clone())
495        .unwrap_or_else(|| "<unnamed>".to_string())
496}
497
498fn entry_label_for_diagnostic(entry: &Entry, index: usize) -> String {
499    entry
500        .id
501        .clone()
502        .or_else(|| entry.name.clone())
503        .or_else(|| entry.pack.clone())
504        .unwrap_or_else(|| format!("<entry {index}>"))
505}
506
507/// Return the built-in encoder default for a given signal type.
508///
509/// Unknown signal types fall through to `prometheus_text` as a neutral
510/// default. Parse-time validation rejects unknown signal types, so this
511/// branch is unreachable in practice; we still return a value rather than
512/// panic to keep the function total.
513fn default_encoder_for(signal_type: &str) -> EncoderConfig {
514    match signal_type {
515        "logs" => EncoderConfig::JsonLines { precision: None },
516        _ => EncoderConfig::PrometheusText { precision: None },
517    }
518}
519
520/// Return the built-in sink default (`stdout`).
521fn default_sink() -> SinkConfig {
522    SinkConfig::Stdout
523}
524
525/// Merge a file-level labels map with an entry-level labels map.
526///
527/// Entry-level keys win on conflict. If either side is `None`, the other
528/// side is returned unchanged. If both sides are `None`, returns `None`.
529fn merge_labels(
530    defaults_labels: Option<&BTreeMap<String, String>>,
531    entry_labels: Option<BTreeMap<String, String>>,
532) -> Option<BTreeMap<String, String>> {
533    match (defaults_labels, entry_labels) {
534        (None, None) => None,
535        (Some(d), None) => Some(d.clone()),
536        (None, Some(e)) => Some(e),
537        (Some(d), Some(e)) => {
538            let mut merged = d.clone();
539            for (k, v) in e {
540                merged.insert(k, v);
541            }
542            Some(merged)
543        }
544    }
545}
546
547// ---------------------------------------------------------------------------
548// Tests
549// ---------------------------------------------------------------------------
550
551#[cfg(test)]
552mod tests {
553    use super::super::parse::parse;
554    use super::*;
555
556    // ======================================================================
557    // Helpers
558    // ======================================================================
559
560    fn normalize_yaml(yaml: &str) -> Result<NormalizedFile, NormalizeError> {
561        let parsed = parse(yaml).expect("parse must succeed in normalization tests");
562        normalize(parsed)
563    }
564
565    fn is_prometheus_text(encoder: &EncoderConfig) -> bool {
566        matches!(encoder, EncoderConfig::PrometheusText { .. })
567    }
568
569    fn is_json_lines(encoder: &EncoderConfig) -> bool {
570        matches!(encoder, EncoderConfig::JsonLines { .. })
571    }
572
573    fn is_stdout(sink: &SinkConfig) -> bool {
574        matches!(sink, SinkConfig::Stdout)
575    }
576
577    // ======================================================================
578    // Defaults inheritance
579    // ======================================================================
580
581    #[test]
582    fn entry_inherits_rate_and_duration_from_defaults() {
583        let yaml = r#"
584version: 2
585defaults:
586  rate: 1
587  duration: 5m
588scenarios:
589  - signal_type: metrics
590    name: cpu
591    generator: { type: constant, value: 42 }
592"#;
593        let file = normalize_yaml(yaml).expect("must normalize");
594        let entry = &file.entries[0];
595        assert!((entry.rate - 1.0).abs() < f64::EPSILON);
596        assert_eq!(entry.duration.as_deref(), Some("5m"));
597    }
598
599    #[test]
600    fn entry_rate_overrides_defaults_rate() {
601        let yaml = r#"
602version: 2
603defaults:
604  rate: 1
605scenarios:
606  - signal_type: metrics
607    name: cpu
608    rate: 10
609    generator: { type: constant, value: 42 }
610"#;
611        let file = normalize_yaml(yaml).expect("must normalize");
612        assert!((file.entries[0].rate - 10.0).abs() < f64::EPSILON);
613    }
614
615    #[test]
616    fn entry_duration_overrides_defaults_duration() {
617        let yaml = r#"
618version: 2
619defaults:
620  rate: 1
621  duration: 5m
622scenarios:
623  - signal_type: metrics
624    name: cpu
625    duration: 30s
626    generator: { type: constant, value: 42 }
627"#;
628        let file = normalize_yaml(yaml).expect("must normalize");
629        assert_eq!(file.entries[0].duration.as_deref(), Some("30s"));
630    }
631
632    #[test]
633    fn entry_inherits_encoder_and_sink_from_defaults() {
634        let yaml = r#"
635version: 2
636defaults:
637  rate: 1
638  encoder: { type: influx_lp }
639  sink: { type: file, path: /tmp/out.txt }
640scenarios:
641  - signal_type: metrics
642    name: cpu
643    generator: { type: constant, value: 42 }
644"#;
645        let file = normalize_yaml(yaml).expect("must normalize");
646        let entry = &file.entries[0];
647        assert!(matches!(
648            entry.encoder,
649            EncoderConfig::InfluxLineProtocol { .. }
650        ));
651        assert!(matches!(entry.sink, SinkConfig::File { .. }));
652    }
653
654    #[test]
655    fn entry_encoder_overrides_defaults_encoder() {
656        let yaml = r#"
657version: 2
658defaults:
659  rate: 1
660  encoder: { type: influx_lp }
661scenarios:
662  - signal_type: metrics
663    name: cpu
664    encoder: { type: prometheus_text }
665    generator: { type: constant, value: 42 }
666"#;
667        let file = normalize_yaml(yaml).expect("must normalize");
668        assert!(is_prometheus_text(&file.entries[0].encoder));
669    }
670
671    // ======================================================================
672    // Built-in defaults
673    // ======================================================================
674
675    /// Expected built-in encoder for a signal type when defaults do not
676    /// supply one. Lets the parametrized test below classify the encoder
677    /// without introspecting its internal fields.
678    #[derive(Copy, Clone)]
679    enum ExpectedEncoder {
680        PrometheusText,
681        JsonLines,
682    }
683
684    #[rustfmt::skip]
685    #[rstest::rstest]
686    #[case::metrics(r#"
687version: 2
688scenarios:
689  - signal_type: metrics
690    name: cpu
691    rate: 1
692    generator: { type: constant, value: 42 }
693"#, ExpectedEncoder::PrometheusText)]
694    #[case::histogram(r#"
695version: 2
696scenarios:
697  - signal_type: histogram
698    name: http_latency
699    rate: 1
700    distribution: { type: exponential, rate: 10.0 }
701    buckets: [0.1, 0.5, 1.0]
702    observations_per_tick: 50
703    seed: 1
704"#, ExpectedEncoder::PrometheusText)]
705    #[case::summary(r#"
706version: 2
707scenarios:
708  - signal_type: summary
709    name: rpc_latency
710    rate: 1
711    distribution: { type: normal, mean: 0.1, stddev: 0.02 }
712    quantiles: [0.5, 0.9, 0.99]
713    observations_per_tick: 50
714    seed: 1
715"#, ExpectedEncoder::PrometheusText)]
716    #[case::logs(r#"
717version: 2
718scenarios:
719  - signal_type: logs
720    name: app_logs
721    rate: 1
722    log_generator:
723      type: template
724      templates:
725        - message: "hello"
726"#, ExpectedEncoder::JsonLines)]
727    fn signal_type_picks_built_in_encoder_and_stdout_sink(
728        #[case] yaml: &str,
729        #[case] expected: ExpectedEncoder,
730    ) {
731        let file = normalize_yaml(yaml).expect("must normalize");
732        let entry = &file.entries[0];
733        match expected {
734            ExpectedEncoder::PrometheusText => assert!(is_prometheus_text(&entry.encoder)),
735            ExpectedEncoder::JsonLines => assert!(is_json_lines(&entry.encoder)),
736        }
737        assert!(is_stdout(&entry.sink));
738    }
739
740    // ======================================================================
741    // Labels merge
742    // ======================================================================
743
744    #[test]
745    fn labels_merge_entry_wins_on_conflict() {
746        let yaml = r#"
747version: 2
748defaults:
749  rate: 1
750  labels:
751    device: rtr-edge-01
752    region: us-west-2
753scenarios:
754  - signal_type: metrics
755    name: cpu
756    labels:
757      region: us-east-1
758      interface: Gi0/0/0
759    generator: { type: constant, value: 42 }
760"#;
761        let file = normalize_yaml(yaml).expect("must normalize");
762        let labels = file.entries[0]
763            .labels
764            .as_ref()
765            .expect("merged labels must exist");
766        assert_eq!(
767            labels.get("device").map(String::as_str),
768            Some("rtr-edge-01")
769        );
770        assert_eq!(
771            labels.get("region").map(String::as_str),
772            Some("us-east-1"),
773            "entry value must win on conflict"
774        );
775        assert_eq!(labels.get("interface").map(String::as_str), Some("Gi0/0/0"));
776    }
777
778    #[test]
779    fn labels_from_defaults_alone_are_preserved() {
780        let yaml = r#"
781version: 2
782defaults:
783  rate: 1
784  labels:
785    env: staging
786scenarios:
787  - signal_type: metrics
788    name: cpu
789    generator: { type: constant, value: 42 }
790"#;
791        let file = normalize_yaml(yaml).expect("must normalize");
792        let labels = file.entries[0].labels.as_ref().expect("labels must exist");
793        assert_eq!(labels.get("env").map(String::as_str), Some("staging"));
794        assert_eq!(labels.len(), 1);
795    }
796
797    #[test]
798    fn entry_labels_preserved_when_defaults_has_no_labels() {
799        let yaml = r#"
800version: 2
801defaults:
802  rate: 1
803scenarios:
804  - signal_type: metrics
805    name: cpu
806    labels:
807      job: api
808    generator: { type: constant, value: 42 }
809"#;
810        let file = normalize_yaml(yaml).expect("must normalize");
811        let labels = file.entries[0].labels.as_ref().expect("labels must exist");
812        assert_eq!(labels.get("job").map(String::as_str), Some("api"));
813        assert_eq!(labels.len(), 1);
814    }
815
816    #[test]
817    fn no_labels_anywhere_produces_none() {
818        let yaml = r#"
819version: 2
820scenarios:
821  - signal_type: metrics
822    name: cpu
823    rate: 1
824    generator: { type: constant, value: 42 }
825"#;
826        let file = normalize_yaml(yaml).expect("must normalize");
827        assert!(file.entries[0].labels.is_none());
828    }
829
830    // ======================================================================
831    // Missing rate error
832    //
833    // The `label` field in MissingRate follows a priority chain:
834    //   name > id > pack name.
835    // This table exercises each arm — inline entry with only a name,
836    // pack entry with id (id wins over pack), pack entry with neither
837    // (falls back to pack name).
838    // ======================================================================
839
840    #[rustfmt::skip]
841    #[rstest::rstest]
842    #[case::inline_uses_name(r#"
843version: 2
844scenarios:
845  - signal_type: metrics
846    name: cpu
847    generator: { type: constant, value: 1.0 }
848"#, "cpu")]
849    #[case::pack_prefers_id(r#"
850version: 2
851scenarios:
852  - id: snmp_iface
853    signal_type: metrics
854    pack: telegraf_snmp_interface
855"#, "snmp_iface")]
856    #[case::pack_falls_back_to_pack_name(r#"
857version: 2
858scenarios:
859  - signal_type: metrics
860    pack: telegraf_snmp_interface
861"#, "telegraf_snmp_interface")]
862    fn missing_rate_error_label_follows_priority_chain(
863        #[case] yaml: &str,
864        #[case] expected_label: &str,
865    ) {
866        let err = normalize_yaml(yaml).expect_err("missing rate must fail");
867        match err {
868            NormalizeError::MissingRate { index, label } => {
869                assert_eq!(index, 0);
870                assert_eq!(label, expected_label);
871            }
872            other => panic!("expected MissingRate, got {other:?}"),
873        }
874    }
875
876    #[test]
877    fn missing_rate_message_mentions_entry_and_hint() {
878        let yaml = r#"
879version: 2
880scenarios:
881  - signal_type: metrics
882    name: bare
883    generator: { type: constant, value: 1.0 }
884"#;
885        let err = normalize_yaml(yaml).expect_err("missing rate must fail");
886        let msg = err.to_string();
887        assert!(msg.contains("entry 0"), "error should mention entry index");
888        assert!(msg.contains("bare"), "error should mention entry name");
889        assert!(msg.contains("rate"), "error should mention rate");
890        assert!(
891            msg.contains("defaults"),
892            "error should hint at defaults block"
893        );
894    }
895
896    // ======================================================================
897    // Shorthand normalization
898    // ======================================================================
899
900    #[test]
901    fn shorthand_single_signal_normalizes_through_wrapped_form() {
902        let yaml = r#"
903version: 2
904name: cpu_usage
905signal_type: metrics
906rate: 5
907generator: { type: constant, value: 42 }
908"#;
909        let file = normalize_yaml(yaml).expect("must normalize shorthand");
910        assert_eq!(file.entries.len(), 1);
911        let entry = &file.entries[0];
912        assert!((entry.rate - 5.0).abs() < f64::EPSILON);
913        assert_eq!(entry.name.as_deref(), Some("cpu_usage"));
914        assert!(is_prometheus_text(&entry.encoder));
915        assert!(is_stdout(&entry.sink));
916    }
917
918    #[test]
919    fn shorthand_logs_signal_picks_json_lines_default() {
920        let yaml = r#"
921version: 2
922name: app_logs
923signal_type: logs
924rate: 2
925log_generator:
926  type: template
927  templates:
928    - message: "hello"
929"#;
930        let file = normalize_yaml(yaml).expect("must normalize logs shorthand");
931        assert!(is_json_lines(&file.entries[0].encoder));
932    }
933
934    // ======================================================================
935    // Pack entry normalization
936    // ======================================================================
937
938    #[test]
939    fn pack_entry_inherits_defaults_but_defers_label_merge() {
940        // Spec §2.2 reserves precedence levels 3–5 (pack shared fields,
941        // pack shared_labels, pack per-metric labels) for Phase 3 expansion
942        // between defaults (level 2) and entry labels (level 6). Eagerly
943        // merging here would collapse that chain. This test documents the
944        // asymmetry: pack entries keep their own labels verbatim, while
945        // `defaults.labels` rides on NormalizedFile::defaults_labels.
946        //
947        // Example from the user: defaults.labels = {job: web}, entry =
948        // {labels: {device: rtr-01}, pack: mypack}. Phase 3 will expand the
949        // pack's shared_labels (e.g. {job: snmp}) on top of defaults,
950        // then apply entry labels on top — yielding {job: snmp, device: rtr-01}.
951        // If we merged here the pack's job override would be unreachable.
952        let yaml = r#"
953version: 2
954defaults:
955  rate: 1
956  duration: 10m
957  encoder: { type: prometheus_text }
958  sink: { type: stdout }
959  labels:
960    job: web
961scenarios:
962  - id: primary_uplink
963    signal_type: metrics
964    pack: mypack
965    labels:
966      device: rtr-01
967    overrides:
968      ifOperStatus:
969        generator: { type: constant, value: 0.0 }
970"#;
971        let file = normalize_yaml(yaml).expect("must normalize pack entry");
972        let entry = &file.entries[0];
973        assert_eq!(entry.pack.as_deref(), Some("mypack"));
974        assert!(
975            entry.overrides.is_some(),
976            "overrides must be carried through untouched"
977        );
978        assert!((entry.rate - 1.0).abs() < f64::EPSILON);
979        assert_eq!(entry.duration.as_deref(), Some("10m"));
980        assert!(is_prometheus_text(&entry.encoder));
981        assert!(is_stdout(&entry.sink));
982
983        // Pack entry labels are NOT merged with defaults.labels.
984        let labels = entry.labels.as_ref().expect("entry labels must exist");
985        assert_eq!(labels.len(), 1, "only entry labels — defaults not merged");
986        assert_eq!(labels.get("device").map(String::as_str), Some("rtr-01"));
987        assert!(
988            !labels.contains_key("job"),
989            "defaults.labels must not leak into pack entry's labels"
990        );
991
992        // defaults.labels is preserved verbatim at the file level for
993        // Phase 3 pack expansion to apply.
994        let d = file
995            .defaults_labels
996            .as_ref()
997            .expect("defaults_labels must be surfaced");
998        assert_eq!(d.get("job").map(String::as_str), Some("web"));
999    }
1000
1001    #[test]
1002    fn normalized_file_defaults_labels_matches_source() {
1003        // Present when defaults.labels is set and non-empty.
1004        let yaml_with = r#"
1005version: 2
1006defaults:
1007  rate: 1
1008  labels:
1009    env: prod
1010    region: us-east-1
1011scenarios:
1012  - signal_type: metrics
1013    name: cpu
1014    generator: { type: constant, value: 42 }
1015"#;
1016        let file = normalize_yaml(yaml_with).expect("must normalize");
1017        let d = file
1018            .defaults_labels
1019            .as_ref()
1020            .expect("defaults_labels must be Some when defaults.labels is set");
1021        assert_eq!(d.len(), 2);
1022        assert_eq!(d.get("env").map(String::as_str), Some("prod"));
1023        assert_eq!(d.get("region").map(String::as_str), Some("us-east-1"));
1024
1025        // None when the file has no defaults block at all.
1026        let yaml_no_defaults = r#"
1027version: 2
1028scenarios:
1029  - signal_type: metrics
1030    name: cpu
1031    rate: 1
1032    generator: { type: constant, value: 42 }
1033"#;
1034        let file = normalize_yaml(yaml_no_defaults).expect("must normalize");
1035        assert!(file.defaults_labels.is_none());
1036
1037        // None when defaults exists but has no labels field.
1038        let yaml_no_labels = r#"
1039version: 2
1040defaults:
1041  rate: 1
1042  duration: 5m
1043scenarios:
1044  - signal_type: metrics
1045    name: cpu
1046    generator: { type: constant, value: 42 }
1047"#;
1048        let file = normalize_yaml(yaml_no_labels).expect("must normalize");
1049        assert!(file.defaults_labels.is_none());
1050    }
1051
1052    #[test]
1053    fn inline_and_pack_entries_compose_defaults_labels_asymmetrically() {
1054        // One file, two entries both "inheriting" defaults.labels. The
1055        // inline entry gets the eager merge; the pack entry does not.
1056        // defaults_labels must carry the source map verbatim for Phase 3.
1057        let yaml = r#"
1058version: 2
1059defaults:
1060  rate: 1
1061  labels:
1062    job: web
1063    region: us-east-1
1064scenarios:
1065  - signal_type: metrics
1066    name: cpu
1067    labels:
1068      host: node-01
1069    generator: { type: constant, value: 42 }
1070
1071  - signal_type: metrics
1072    pack: mypack
1073    labels:
1074      device: rtr-01
1075"#;
1076        let file = normalize_yaml(yaml).expect("must normalize");
1077        assert_eq!(file.entries.len(), 2);
1078
1079        // Inline entry: labels = defaults ∪ entry, entry wins on conflict.
1080        let inline = &file.entries[0];
1081        assert!(inline.pack.is_none());
1082        let inline_labels = inline.labels.as_ref().expect("inline labels must exist");
1083        assert_eq!(inline_labels.len(), 3, "defaults + entry merged");
1084        assert_eq!(inline_labels.get("job").map(String::as_str), Some("web"));
1085        assert_eq!(
1086            inline_labels.get("region").map(String::as_str),
1087            Some("us-east-1")
1088        );
1089        assert_eq!(
1090            inline_labels.get("host").map(String::as_str),
1091            Some("node-01")
1092        );
1093
1094        // Pack entry: labels = entry's own labels only. No merge happened.
1095        let pack = &file.entries[1];
1096        assert_eq!(pack.pack.as_deref(), Some("mypack"));
1097        let pack_labels = pack.labels.as_ref().expect("pack entry labels must exist");
1098        assert_eq!(pack_labels.len(), 1, "only entry-level labels, no merge");
1099        assert_eq!(
1100            pack_labels.get("device").map(String::as_str),
1101            Some("rtr-01")
1102        );
1103        assert!(!pack_labels.contains_key("job"));
1104        assert!(!pack_labels.contains_key("region"));
1105
1106        // File-level defaults_labels carries the source map verbatim.
1107        let d = file
1108            .defaults_labels
1109            .as_ref()
1110            .expect("defaults_labels must be Some");
1111        assert_eq!(d.len(), 2);
1112        assert_eq!(d.get("job").map(String::as_str), Some("web"));
1113        assert_eq!(d.get("region").map(String::as_str), Some("us-east-1"));
1114    }
1115
1116    // ======================================================================
1117    // Multi-scenario mixed entries
1118    // ======================================================================
1119
1120    #[test]
1121    fn multi_scenario_mixed_entries_all_normalize() {
1122        let yaml = r#"
1123version: 2
1124defaults:
1125  rate: 1
1126  duration: 5m
1127  encoder: { type: prometheus_text }
1128  sink: { type: stdout }
1129  labels:
1130    region: us-west-2
1131scenarios:
1132  - id: link_state
1133    signal_type: metrics
1134    name: interface_oper_state
1135    labels:
1136      interface: Gi0/0/0
1137      region: us-east-1
1138    generator: { type: flap, up_duration: 60s, down_duration: 30s }
1139
1140  - id: fast_metric
1141    signal_type: metrics
1142    name: cpu
1143    rate: 10
1144    generator: { type: constant, value: 42 }
1145
1146  - signal_type: logs
1147    name: app_logs
1148    log_generator:
1149      type: template
1150      templates:
1151        - message: "hello"
1152
1153  - signal_type: metrics
1154    pack: telegraf_snmp_interface
1155    labels:
1156      device: rtr-01
1157"#;
1158        let file = normalize_yaml(yaml).expect("must normalize multi-scenario");
1159        assert_eq!(file.entries.len(), 4);
1160
1161        // Entry 0: inline metric, inherits rate/duration/encoder/sink,
1162        // labels merged with entry's region winning.
1163        let e0 = &file.entries[0];
1164        assert!((e0.rate - 1.0).abs() < f64::EPSILON);
1165        assert_eq!(e0.duration.as_deref(), Some("5m"));
1166        assert!(is_prometheus_text(&e0.encoder));
1167        let labels0 = e0.labels.as_ref().expect("labels must exist");
1168        assert_eq!(labels0.get("region").map(String::as_str), Some("us-east-1"));
1169        assert_eq!(
1170            labels0.get("interface").map(String::as_str),
1171            Some("Gi0/0/0")
1172        );
1173
1174        // Entry 1: rate override wins, inherits everything else.
1175        let e1 = &file.entries[1];
1176        assert!((e1.rate - 10.0).abs() < f64::EPSILON);
1177        assert_eq!(e1.duration.as_deref(), Some("5m"));
1178        let labels1 = e1.labels.as_ref().expect("labels must exist");
1179        assert_eq!(
1180            labels1.get("region").map(String::as_str),
1181            Some("us-west-2"),
1182            "entry has no labels.region, defaults wins"
1183        );
1184
1185        // Entry 2: logs, picks json_lines default (defaults.encoder is
1186        // prometheus_text but is overridden for logs? No — defaults.encoder
1187        // is explicitly set in this file, so every entry inherits it, even
1188        // logs. This is consistent with precedence: defaults wins over
1189        // built-in, even when the built-in would be signal-type-aware.
1190        let e2 = &file.entries[2];
1191        assert!(
1192            is_prometheus_text(&e2.encoder),
1193            "explicit defaults.encoder applies to all entries including logs"
1194        );
1195
1196        // Entry 3: pack entry, carries through pack field but does NOT
1197        // merge defaults.labels (see module docs on the asymmetry).
1198        let e3 = &file.entries[3];
1199        assert_eq!(e3.pack.as_deref(), Some("telegraf_snmp_interface"));
1200        let labels3 = e3.labels.as_ref().expect("labels must exist");
1201        assert_eq!(labels3.len(), 1, "only entry-level labels on pack entry");
1202        assert_eq!(labels3.get("device").map(String::as_str), Some("rtr-01"));
1203        assert!(!labels3.contains_key("region"));
1204
1205        // defaults.labels still travels with the file for Phase 3 to apply.
1206        let d = file
1207            .defaults_labels
1208            .as_ref()
1209            .expect("defaults_labels must be Some");
1210        assert_eq!(d.get("region").map(String::as_str), Some("us-west-2"));
1211    }
1212
1213    // ======================================================================
1214    // Fields carried through untouched
1215    // ======================================================================
1216
1217    #[test]
1218    fn after_clause_and_timing_fields_preserved() {
1219        let yaml = r#"
1220version: 2
1221defaults:
1222  rate: 1
1223scenarios:
1224  - id: src
1225    signal_type: metrics
1226    name: source
1227    generator: { type: constant, value: 100.0 }
1228
1229  - signal_type: metrics
1230    name: dependent
1231    phase_offset: 5s
1232    clock_group: group_a
1233    generator: { type: constant, value: 1.0 }
1234    after:
1235      ref: src
1236      op: ">"
1237      value: 50.0
1238      delay: 2s
1239"#;
1240        let file = normalize_yaml(yaml).expect("must normalize");
1241        let dep = &file.entries[1];
1242        assert_eq!(dep.phase_offset.as_deref(), Some("5s"));
1243        assert_eq!(dep.clock_group.as_deref(), Some("group_a"));
1244        let after = dep.after.as_ref().expect("after must be preserved");
1245        assert_eq!(after.ref_id, "src");
1246        assert_eq!(after.delay.as_deref(), Some("2s"));
1247    }
1248
1249    #[test]
1250    fn histogram_fields_preserved() {
1251        let yaml = r#"
1252version: 2
1253defaults:
1254  rate: 1
1255scenarios:
1256  - signal_type: histogram
1257    name: latency
1258    distribution: { type: exponential, rate: 10.0 }
1259    buckets: [0.1, 0.5, 1.0]
1260    observations_per_tick: 100
1261    mean_shift_per_sec: 0.01
1262    seed: 42
1263"#;
1264        let file = normalize_yaml(yaml).expect("must normalize");
1265        let entry = &file.entries[0];
1266        assert!(entry.distribution.is_some());
1267        assert_eq!(entry.buckets.as_ref().map(Vec::len), Some(3));
1268        assert_eq!(entry.observations_per_tick, Some(100));
1269        assert_eq!(entry.mean_shift_per_sec, Some(0.01));
1270        assert_eq!(entry.seed, Some(42));
1271    }
1272
1273    // ======================================================================
1274    // Contract tests
1275    // ======================================================================
1276
1277    #[test]
1278    fn normalize_error_is_send_and_sync() {
1279        fn assert_send_sync<T: Send + Sync>() {}
1280        assert_send_sync::<NormalizeError>();
1281    }
1282
1283    #[test]
1284    fn normalized_types_are_send_and_sync() {
1285        fn assert_send_sync<T: Send + Sync>() {}
1286        assert_send_sync::<NormalizedFile>();
1287        assert_send_sync::<NormalizedEntry>();
1288    }
1289
1290    // ======================================================================
1291    // Empty scenarios
1292    // ======================================================================
1293
1294    #[test]
1295    fn empty_scenarios_list_normalizes_to_empty_entries() {
1296        let yaml = r#"
1297version: 2
1298scenarios: []
1299"#;
1300        let file = normalize_yaml(yaml).expect("must normalize empty list");
1301        assert_eq!(file.version, 2);
1302        assert!(file.entries.is_empty());
1303    }
1304
1305    // ======================================================================
1306    // helper unit tests (internal)
1307    // ======================================================================
1308
1309    #[test]
1310    fn merge_labels_both_none_returns_none() {
1311        assert!(merge_labels(None, None).is_none());
1312    }
1313
1314    #[test]
1315    fn merge_labels_only_defaults_returns_defaults_clone() {
1316        let mut d = BTreeMap::new();
1317        d.insert("a".to_string(), "1".to_string());
1318        let merged = merge_labels(Some(&d), None).expect("must return map");
1319        assert_eq!(merged.get("a").map(String::as_str), Some("1"));
1320    }
1321
1322    #[test]
1323    fn merge_labels_only_entry_returns_entry() {
1324        let mut e = BTreeMap::new();
1325        e.insert("b".to_string(), "2".to_string());
1326        let merged = merge_labels(None, Some(e)).expect("must return map");
1327        assert_eq!(merged.get("b").map(String::as_str), Some("2"));
1328    }
1329
1330    #[test]
1331    fn merge_labels_entry_overrides_defaults_on_conflict() {
1332        let mut d = BTreeMap::new();
1333        d.insert("k".to_string(), "from_defaults".to_string());
1334        let mut e = BTreeMap::new();
1335        e.insert("k".to_string(), "from_entry".to_string());
1336        let merged = merge_labels(Some(&d), Some(e)).expect("must return map");
1337        assert_eq!(merged.get("k").map(String::as_str), Some("from_entry"));
1338    }
1339
1340    #[rustfmt::skip]
1341    #[rstest::rstest]
1342    #[case::metrics("metrics",     ExpectedEncoder::PrometheusText)]
1343    #[case::histogram("histogram", ExpectedEncoder::PrometheusText)]
1344    #[case::summary("summary",     ExpectedEncoder::PrometheusText)]
1345    #[case::logs("logs",           ExpectedEncoder::JsonLines)]
1346    fn default_encoder_per_signal_type(
1347        #[case] signal_type: &str,
1348        #[case] expected: ExpectedEncoder,
1349    ) {
1350        let encoder = default_encoder_for(signal_type);
1351        match expected {
1352            ExpectedEncoder::PrometheusText => {
1353                assert!(matches!(encoder, EncoderConfig::PrometheusText { .. }))
1354            }
1355            ExpectedEncoder::JsonLines => {
1356                assert!(matches!(encoder, EncoderConfig::JsonLines { .. }))
1357            }
1358        }
1359    }
1360
1361    #[test]
1362    fn default_sink_is_stdout() {
1363        assert!(matches!(default_sink(), SinkConfig::Stdout));
1364    }
1365
1366    #[test]
1367    fn while_without_duration_is_rejected() {
1368        let yaml = r#"
1369version: 2
1370defaults:
1371  rate: 1
1372scenarios:
1373  - id: src
1374    signal_type: metrics
1375    name: src
1376    generator: { type: constant, value: 1 }
1377  - id: gated
1378    signal_type: metrics
1379    name: gated
1380    generator: { type: constant, value: 1 }
1381    while: { ref: src, op: ">", value: 0 }
1382"#;
1383        let err = normalize_yaml(yaml).expect_err("missing duration must fail");
1384        match err {
1385            NormalizeError::WhileWithoutDuration { source_id } => {
1386                assert_eq!(source_id, "gated");
1387            }
1388            other => panic!("expected WhileWithoutDuration, got {other:?}"),
1389        }
1390    }
1391
1392    #[test]
1393    fn defaults_duration_satisfies_while_without_duration() {
1394        let yaml = r#"
1395version: 2
1396defaults:
1397  rate: 1
1398  duration: 5m
1399scenarios:
1400  - id: src
1401    signal_type: metrics
1402    name: src
1403    generator: { type: constant, value: 1 }
1404  - id: gated
1405    signal_type: metrics
1406    name: gated
1407    generator: { type: constant, value: 1 }
1408    while: { ref: src, op: ">", value: 0 }
1409"#;
1410        let file = normalize_yaml(yaml).expect("defaults.duration satisfies the gate");
1411        assert!(file.entries[1].while_clause.is_some());
1412        assert_eq!(file.entries[1].duration.as_deref(), Some("5m"));
1413    }
1414
1415    #[test]
1416    fn delay_without_while_is_rejected() {
1417        let yaml = r#"
1418version: 2
1419defaults:
1420  rate: 1
1421  duration: 1m
1422scenarios:
1423  - id: gated
1424    signal_type: metrics
1425    name: gated
1426    generator: { type: constant, value: 1 }
1427    delay: { open: "5s", close: "10s" }
1428"#;
1429        let err = normalize_yaml(yaml).expect_err("delay without while must fail");
1430        match err {
1431            NormalizeError::DelayWithoutWhile { source_id } => {
1432                assert_eq!(source_id, "gated");
1433            }
1434            other => panic!("expected DelayWithoutWhile, got {other:?}"),
1435        }
1436    }
1437
1438    #[test]
1439    fn delay_open_zero_is_accepted() {
1440        let yaml = r#"
1441version: 2
1442defaults:
1443  rate: 1
1444  duration: 5m
1445scenarios:
1446  - id: src
1447    signal_type: metrics
1448    name: src
1449    generator: { type: constant, value: 1 }
1450  - id: gated
1451    signal_type: metrics
1452    name: gated
1453    generator: { type: constant, value: 1 }
1454    while: { ref: src, op: ">", value: 0 }
1455    delay: { open: "0s", close: "5s" }
1456"#;
1457        let file = normalize_yaml(yaml).expect("delay open=0s must parse and normalize");
1458        let gated = file
1459            .entries
1460            .iter()
1461            .find(|e| e.id.as_deref() == Some("gated"))
1462            .unwrap();
1463        let delay = gated.delay_clause.as_ref().expect("delay clause present");
1464        assert_eq!(delay.open, Some(std::time::Duration::ZERO));
1465        assert_eq!(delay.close, Some(std::time::Duration::from_secs(5)));
1466    }
1467
1468    #[test]
1469    fn delay_close_zero_is_accepted() {
1470        let yaml = r#"
1471version: 2
1472defaults:
1473  rate: 1
1474  duration: 5m
1475scenarios:
1476  - id: src
1477    signal_type: metrics
1478    name: src
1479    generator: { type: constant, value: 1 }
1480  - id: gated
1481    signal_type: metrics
1482    name: gated
1483    generator: { type: constant, value: 1 }
1484    while: { ref: src, op: ">", value: 0 }
1485    delay: { open: "250ms", close: "0ms" }
1486"#;
1487        let file = normalize_yaml(yaml).expect("delay close=0ms must parse and normalize");
1488        let gated = file
1489            .entries
1490            .iter()
1491            .find(|e| e.id.as_deref() == Some("gated"))
1492            .unwrap();
1493        let delay = gated.delay_clause.as_ref().expect("delay clause present");
1494        assert_eq!(delay.open, Some(std::time::Duration::from_millis(250)));
1495        assert_eq!(delay.close, Some(std::time::Duration::ZERO));
1496    }
1497
1498    #[rustfmt::skip]
1499    #[rstest::rstest]
1500    #[case::nan(".nan")]
1501    #[case::pos_inf(".inf")]
1502    #[case::neg_inf("-.inf")]
1503    fn while_value_non_finite_is_rejected_at_compile(#[case] yaml_value: &str) {
1504        let yaml = format!(r#"
1505version: 2
1506defaults:
1507  rate: 1
1508  duration: 1m
1509scenarios:
1510  - id: src
1511    signal_type: metrics
1512    name: src
1513    generator: {{ type: constant, value: 1 }}
1514  - id: gated
1515    signal_type: metrics
1516    name: gated
1517    generator: {{ type: constant, value: 1 }}
1518    while: {{ ref: src, op: ">", value: {yaml_value} }}
1519"#);
1520        let err = normalize_yaml(&yaml).expect_err("non-finite while.value must fail");
1521        match err {
1522            NormalizeError::WhileValueIsNan { source_id } => {
1523                assert_eq!(source_id, "gated");
1524            }
1525            other => panic!("expected WhileValueIsNan, got {other:?}"),
1526        }
1527    }
1528
1529    #[rustfmt::skip]
1530    #[rstest::rstest]
1531    #[case::nan(".nan")]
1532    #[case::pos_inf(".inf")]
1533    #[case::neg_inf("-.inf")]
1534    fn close_snap_to_non_finite_is_rejected_at_compile(#[case] yaml_value: &str) {
1535        let yaml = format!(r#"
1536version: 2
1537defaults:
1538  rate: 1
1539  duration: 1m
1540scenarios:
1541  - id: src
1542    signal_type: metrics
1543    name: src
1544    generator: {{ type: constant, value: 1 }}
1545  - id: gated
1546    signal_type: metrics
1547    name: gated
1548    generator: {{ type: constant, value: 1 }}
1549    while: {{ ref: src, op: ">", value: 0 }}
1550    delay:
1551      close:
1552        duration: 5s
1553        snap_to: {yaml_value}
1554"#);
1555        let err = normalize_yaml(&yaml).expect_err("non-finite close.snap_to must fail");
1556        match err {
1557            NormalizeError::CloseSnapToIsNan { source_id } => {
1558                assert_eq!(source_id, "gated");
1559            }
1560            other => panic!("expected CloseSnapToIsNan, got {other:?}"),
1561        }
1562    }
1563
1564    #[test]
1565    fn while_inherits_from_defaults() {
1566        let yaml = r#"
1567version: 2
1568defaults:
1569  rate: 1
1570  duration: 1m
1571  while: { ref: src, op: ">", value: 0 }
1572scenarios:
1573  - id: src
1574    signal_type: metrics
1575    name: src
1576    generator: { type: constant, value: 1 }
1577  - id: gated
1578    signal_type: metrics
1579    name: gated
1580    generator: { type: constant, value: 1 }
1581"#;
1582        let file = normalize_yaml(yaml).expect("defaults while inherits");
1583        let gated = file
1584            .entries
1585            .iter()
1586            .find(|e| e.id.as_deref() == Some("gated"))
1587            .unwrap();
1588        assert!(gated.while_clause.is_some());
1589    }
1590}