Skip to main content

sonda_core/compiler/
expand.rs

1//! Pack expansion for v2 scenario files.
2//!
3//! This module implements **Phase 3** of the v2 compilation pipeline. It takes
4//! a [`NormalizedFile`] (the output of [`super::normalize::normalize`]) and
5//! expands every pack-backed entry into one concrete per-metric signal while
6//! preserving the full label precedence chain from spec §2.2. After expansion,
7//! the returned [`ExpandedFile`] contains no unresolved pack references —
8//! every entry is a concrete signal that later phases can reason about in
9//! isolation.
10//!
11//! # Label precedence chain (for pack-expanded signals)
12//!
13//! For each metric produced by a pack expansion the final label map is
14//! composed from five layers, applied **low → high** (each subsequent level
15//! overwrites on key collision):
16//!
17//! | Level | Source |
18//! |------:|--------|
19//! | 2 | [`NormalizedFile::defaults_labels`] |
20//! | 4 | pack [`MetricPackDef::shared_labels`] |
21//! | 5 | pack per-metric [`MetricSpec::labels`] |
22//! | 6 | pack entry [`NormalizedEntry::labels`] |
23//! | 7 | override [`MetricOverride::labels`] |
24//!
25//! Levels 1 (built-in defaults) and 3 (entry non-label fields) do not
26//! contribute labels. Level 8 (CLI flags) is applied later and is out of
27//! scope here. Phase 2 deliberately left pack entry labels *unmerged* with
28//! `defaults.labels` so this pass can interleave levels 4 and 5 between
29//! them.
30//!
31//! Inline entries do **not** re-apply `defaults_labels`: Phase 2 already
32//! merged them eagerly and we must not double-apply. Inline entries are
33//! copied through with their label map intact.
34//!
35//! # Auto-generated pack entry IDs
36//!
37//! Spec §2.4 allows pack entries to omit `id`; spec matrix row 11.8 still
38//! requires sub-signal IDs to be addressable. When a pack entry has no `id`
39//! set, this pass synthesizes a deterministic identifier of the form
40//! `"{pack_def_name}_{entry_index}"` where `entry_index` is the pack entry's
41//! zero-based position in [`NormalizedFile::entries`]. The suffix is always
42//! appended (even for the first anonymous pack entry) so two anonymous pack
43//! entries referencing the same pack never collide.
44//!
45//! After synthesis, a post-expansion uniqueness check runs over every
46//! effective pack-entry id *and* every emitted [`ExpandedEntry::id`]:
47//! collisions between user-authored ids and auto-generated ids (or between
48//! two pack sub-signals) are rejected via
49//! [`ExpandError::DuplicateEntryId`]. The parser's id uniqueness pass only
50//! sees user-provided ids, so this pass closes the gap.
51//!
52//! ## Sub-signal IDs and duplicate metric names
53//!
54//! When a pack's metrics are unique by name (the common case), the per-metric
55//! sub-signal id takes the form `"{effective_entry_id}.{metric_name}"` —
56//! e.g. the `telegraf_snmp_interface` pack produces
57//! `net.ifOperStatus`, `net.ifHCInOctets`, etc.
58//!
59//! When two or more [`MetricSpec`][crate::packs::MetricSpec] entries in a
60//! single pack share a `name` (the `node_exporter_cpu` pack ships eight
61//! `node_cpu_seconds_total` specs differentiated only by `labels.mode`), the
62//! bare `{effective_entry_id}.{metric_name}` id would collide. This pass
63//! appends `"#{spec_index}"` **only to the colliding specs**, producing ids
64//! such as `cpu.node_cpu_seconds_total#0`, `cpu.node_cpu_seconds_total#1`,
65//! etc., where `spec_index` is the metric's zero-based position in
66//! [`MetricPackDef::metrics`]. Unique metric names keep their clean form so
67//! dotted `after.ref` into a pack sub-signal (matrix row 11.7) is still
68//! ergonomic for the majority of packs.
69//!
70//! ## Worked example
71//!
72//! Given a pack entry written as:
73//!
74//! ```yaml
75//! scenarios:
76//!   - signal_type: metrics      # no `id:`, anonymous entry at index 0
77//!     pack: telegraf_snmp_interface
78//! ```
79//!
80//! and assuming `telegraf_snmp_interface` contains four metrics
81//! (`ifOperStatus`, `ifHCInOctets`, `ifHCOutOctets`, `ifInErrors`), this pass
82//! emits four [`ExpandedEntry`]s with the following ids:
83//!
84//! | `id` | derivation |
85//! |------|------------|
86//! | `telegraf_snmp_interface_0.ifOperStatus` | auto pack-entry id + metric name |
87//! | `telegraf_snmp_interface_0.ifHCInOctets` | auto pack-entry id + metric name |
88//! | `telegraf_snmp_interface_0.ifHCOutOctets` | auto pack-entry id + metric name |
89//! | `telegraf_snmp_interface_0.ifInErrors` | auto pack-entry id + metric name |
90//!
91//! If the same pack entry had a user-provided `id: primary`, the ids above
92//! would instead read `primary.ifOperStatus`, `primary.ifHCInOctets`, and so
93//! on.
94//!
95//! # Field propagation (parent pack entry → expanded metric)
96//!
97//! Spec §4.3 step 7 lists the fields that propagate from a pack entry to
98//! each expanded signal. The full set is wider than the spec's illustrative
99//! list; this pass copies the following fields from the parent
100//! [`NormalizedEntry`] onto every emitted [`ExpandedEntry`]:
101//!
102//! | Field | Propagation rule |
103//! |-------|------------------|
104//! | `rate` | copied verbatim (inherited from defaults in Phase 2) |
105//! | `duration` | copied verbatim |
106//! | `encoder` | copied verbatim |
107//! | `sink` | copied verbatim |
108//! | `jitter`, `jitter_seed` | copied verbatim |
109//! | `gaps` | cloned verbatim |
110//! | `bursts` | cloned verbatim |
111//! | `cardinality_spikes` | cloned verbatim |
112//! | `dynamic_labels` | cloned verbatim |
113//! | `phase_offset` | cloned verbatim |
114//! | `clock_group` | cloned verbatim |
115//! | `after` | per-metric override `after` wins, else parent entry `after` (see below) |
116//!
117//! Per-metric override fields in [`MetricOverride`] (`generator`, `labels`,
118//! `after`) replace or layer on top of the parent's values as documented
119//! above and in the label precedence chain. No other fields on a
120//! [`MetricOverride`] exist today; adding one requires both extending
121//! [`MetricOverride`] and teaching this pass to propagate it.
122//!
123//! # No pack references survive
124//!
125//! After [`expand`] returns successfully, none of the entries in
126//! [`ExpandedFile::entries`] carry a `pack` reference. Subsequent compilation
127//! phases (after-clause resolution, clock group assignment, runtime wiring)
128//! can operate on a flat list of concrete signals.
129//!
130//! # Error surface
131//!
132//! All failure modes flow through [`ExpandError`]:
133//!
134//! - unknown override keys in a pack entry,
135//! - pack resolver failures (name lookup, file IO, YAML parse),
136//! - pack definitions with no metrics,
137//! - duplicate entry ids after synthesis (user-authored id colliding with
138//!   an auto-generated pack-entry id, or sub-signal ids colliding with one
139//!   another).
140
141use std::collections::{BTreeMap, BTreeSet};
142
143use super::normalize::{NormalizedEntry, NormalizedFile};
144use super::{AfterClause, DelayClause, WhileClause};
145use crate::config::{
146    BurstConfig, CardinalitySpikeConfig, DistributionConfig, DynamicLabelConfig, GapConfig,
147    OnSinkError,
148};
149use crate::encoder::EncoderConfig;
150use crate::generator::{GeneratorConfig, LogGeneratorConfig};
151use crate::packs::{MetricOverride, MetricPackDef};
152use crate::sink::SinkConfig;
153
154// ---------------------------------------------------------------------------
155// Error type
156// ---------------------------------------------------------------------------
157
158/// Errors produced during pack expansion.
159#[derive(Debug, thiserror::Error)]
160#[non_exhaustive]
161pub enum ExpandError {
162    /// The pack reference could not be resolved — either unknown name or a
163    /// file path load failure. The wrapped message includes the pack
164    /// reference and an indication of whether the resolver treated it as a
165    /// name lookup or a file path load.
166    #[error("pack '{reference}' could not be resolved: {message}")]
167    ResolveFailed {
168        /// The pack reference as written in the scenario file.
169        reference: String,
170        /// Diagnostic detail from the underlying resolver.
171        message: String,
172    },
173
174    /// An override in a pack entry referenced a metric name that does not
175    /// exist in the resolved pack definition.
176    ///
177    /// The error lists the pack's available metric names so the user can see
178    /// what was expected.
179    #[error(
180        "override references unknown metric '{key}'; pack '{pack_name}' contains: {available}"
181    )]
182    UnknownOverrideKey {
183        /// The offending override key.
184        key: String,
185        /// The pack definition name that was being expanded.
186        pack_name: String,
187        /// Comma-separated list of valid metric names from the pack.
188        available: String,
189    },
190
191    /// The pack definition has no metrics, so expansion has nothing to emit.
192    #[error("pack '{pack_name}' contains no metrics")]
193    EmptyPack {
194        /// The pack definition name that was being expanded.
195        pack_name: String,
196    },
197
198    /// Two entries ended up with the same identifier after pack expansion.
199    ///
200    /// The parser already rejects duplicate **user-provided** ids, but this
201    /// pass synthesizes ids for anonymous pack entries (see the module docs'
202    /// "Auto-generated pack entry IDs" section) and composes sub-signal ids
203    /// of the form `"{effective_entry_id}.{metric_name}"`. Those synthesized
204    /// ids can clash with user-authored ids or with one another; such
205    /// collisions are detected here so later phases (e.g. the Phase 4
206    /// reference index) see a unique id space.
207    ///
208    /// The `first_source` / `second_source` fields describe where each
209    /// collider originated so the diagnostic points at both contributors.
210    #[error(
211        "duplicate entry id '{id}' after pack expansion: \
212         {first_source} conflicts with {second_source}"
213    )]
214    DuplicateEntryId {
215        /// The colliding identifier.
216        id: String,
217        /// Description of the first source that produced the id.
218        first_source: String,
219        /// Description of the second source that produced the same id.
220        second_source: String,
221    },
222}
223
224// ---------------------------------------------------------------------------
225// Pack resolver trait
226// ---------------------------------------------------------------------------
227
228/// Resolves a pack reference into a [`MetricPackDef`].
229///
230/// The trait is intentionally narrow: implementations receive the raw
231/// reference string exactly as it appeared in the scenario file, decide
232/// whether to treat it as a pack name (catalog lookup) or a file path (when
233/// the string contains `/` or starts with `.`, per spec §2.4), and return
234/// the parsed definition.
235///
236/// Implementations must be pure with respect to the inputs they receive —
237/// the compiler does not cache results, so callers that want memoization
238/// should wrap their resolver.
239///
240/// The [`sonda`] CLI crate adapts its filesystem `PackCatalog` to this
241/// trait. Tests use [`InMemoryPackResolver`].
242pub trait PackResolver {
243    /// Resolve `reference` to a pack definition.
244    ///
245    /// `reference` is the string the user wrote under `pack:`. Per spec
246    /// §2.4, values containing `/` or starting with `.` are treated as file
247    /// paths; everything else is treated as a pack name and looked up on
248    /// the caller's search path.
249    ///
250    /// Errors must include enough context (path, underlying OS error, YAML
251    /// parse diagnostic) for the compiler to surface a useful diagnostic
252    /// without further decoration.
253    fn resolve(&self, reference: &str) -> Result<MetricPackDef, PackResolveError>;
254}
255
256/// Error produced by a [`PackResolver`] implementation.
257///
258/// Carries a human-readable message plus a classification of how the
259/// resolver interpreted the reference. The compiler folds this into
260/// [`ExpandError::ResolveFailed`] so users see a consistent diagnostic.
261#[derive(Debug, thiserror::Error)]
262#[error("{message}")]
263pub struct PackResolveError {
264    /// Diagnostic message describing the failure.
265    pub message: String,
266    /// Origin kind the resolver decided to use for the reference.
267    pub origin: PackResolveOrigin,
268}
269
270/// How a resolver interpreted a pack reference.
271#[derive(Debug, Clone, Copy, PartialEq, Eq)]
272pub enum PackResolveOrigin {
273    /// Interpreted as a pack name looked up on the catalog search path.
274    Name,
275    /// Interpreted as a filesystem path to a pack YAML file.
276    FilePath,
277}
278
279impl PackResolveError {
280    /// Construct a resolver error from a reference and a message.
281    ///
282    /// `origin` should reflect the path the resolver took to interpret the
283    /// reference so error messages can disambiguate "unknown pack name"
284    /// from "pack file not found".
285    pub fn new(message: impl Into<String>, origin: PackResolveOrigin) -> Self {
286        Self {
287            message: message.into(),
288            origin,
289        }
290    }
291}
292
293/// Classify a pack reference as a file path or a catalog name per spec §2.4.
294///
295/// Returns [`PackResolveOrigin::FilePath`] when `reference` contains a `/`
296/// or starts with a `.`; otherwise [`PackResolveOrigin::Name`].
297pub fn classify_pack_reference(reference: &str) -> PackResolveOrigin {
298    if reference.contains('/') || reference.starts_with('.') {
299        PackResolveOrigin::FilePath
300    } else {
301        PackResolveOrigin::Name
302    }
303}
304
305/// An in-memory [`PackResolver`] backed by a `BTreeMap`.
306///
307/// Useful for unit tests, embedded integrations, and any caller that
308/// constructs pack definitions in code rather than loading them from disk.
309/// Both pack names (catalog lookup) and file-path strings can be inserted —
310/// lookup is a direct key match.
311#[derive(Debug, Default, Clone)]
312pub struct InMemoryPackResolver {
313    packs: BTreeMap<String, MetricPackDef>,
314}
315
316impl InMemoryPackResolver {
317    /// Create an empty resolver.
318    pub fn new() -> Self {
319        Self::default()
320    }
321
322    /// Insert a pack definition keyed by `reference`.
323    ///
324    /// The key is matched verbatim against the pack reference string in
325    /// the scenario file. Callers that need to support both "pack by name"
326    /// and "pack by file path" for the same definition should insert it
327    /// under both keys.
328    pub fn insert(&mut self, reference: impl Into<String>, pack: MetricPackDef) {
329        self.packs.insert(reference.into(), pack);
330    }
331}
332
333impl PackResolver for InMemoryPackResolver {
334    fn resolve(&self, reference: &str) -> Result<MetricPackDef, PackResolveError> {
335        match self.packs.get(reference) {
336            Some(pack) => Ok(pack.clone()),
337            None => Err(PackResolveError::new(
338                format!("pack reference '{reference}' not found in resolver"),
339                classify_pack_reference(reference),
340            )),
341        }
342    }
343}
344
345// ---------------------------------------------------------------------------
346// Expanded representation
347// ---------------------------------------------------------------------------
348
349/// A v2 scenario file whose pack entries have been fully expanded.
350///
351/// This is the output of [`expand`]. Every entry is a concrete signal —
352/// there are no unresolved pack references. Inline entries from the
353/// [`NormalizedFile`] pass through verbatim; pack entries are replaced by
354/// one [`ExpandedEntry`] per metric in the pack.
355///
356/// # Invariants
357///
358/// - No entry has a `pack` or `overrides` field — those have been resolved.
359/// - Every entry has a concrete `rate`, `encoder`, and `sink` (inherited
360///   from [`NormalizedEntry`]).
361/// - Entry IDs remain unique across the file, including auto-generated
362///   IDs synthesized for anonymous pack entries.
363#[derive(Debug, Clone)]
364#[cfg_attr(feature = "config", derive(serde::Serialize))]
365pub struct ExpandedFile {
366    /// Schema version. Always `2` after expansion.
367    pub version: u32,
368    /// File-level `scenario_name` carried verbatim. Pure metadata —
369    /// ignored by every compiler phase, surfaced for runtime conflict checks.
370    #[cfg_attr(feature = "config", serde(skip_serializing_if = "Option::is_none"))]
371    pub scenario_name: Option<String>,
372    /// All entries with pack expansion applied, in source order.
373    ///
374    /// Pack entries contribute one entry per metric, in the order metrics
375    /// appear in the resolved pack definition. Inline entries contribute
376    /// one entry each, unchanged from the normalized input.
377    pub entries: Vec<ExpandedEntry>,
378}
379
380/// A single concrete scenario entry after pack expansion.
381///
382/// This is the fully-resolved form of a signal that later compilation
383/// phases (`after` compiler, clock group assignment, runtime launcher)
384/// consume. The type deliberately drops pack-related fields
385/// (`pack`, `overrides`) because they cannot appear here, and drops
386/// histogram/summary fields because spec §2.4 forbids pack entries from
387/// carrying them — inline histogram/summary entries still flow through but
388/// pack expansion never produces them.
389///
390/// Sub-signal IDs produced by pack expansion have the form
391/// `"{effective_entry_id}.{metric_name}"`; see the module docs for the
392/// auto-ID scheme used when the pack entry lacks an explicit `id`.
393#[derive(Debug, Clone)]
394#[cfg_attr(feature = "config", derive(serde::Serialize))]
395pub struct ExpandedEntry {
396    /// Signal identifier. Concrete for every expanded entry: either the
397    /// user-provided inline id, or a pack-expansion sub-signal id of the
398    /// form `"{effective_entry_id}.{metric_name}"`.
399    ///
400    /// Inline entries without an `id` in the source carry `None` here (that
401    /// survives verbatim from the normalized input). Pack-expanded entries
402    /// always have `Some(_)`: if the parent pack entry lacked an id, one
403    /// was synthesized (see module docs).
404    pub id: Option<String>,
405    /// Signal type: `"metrics"`, `"logs"`, `"histogram"`, or `"summary"`.
406    pub signal_type: String,
407    /// Metric or scenario name. Always populated after expansion: inline
408    /// entries carried their own name through normalization; pack-expanded
409    /// entries use the pack metric's name.
410    pub name: String,
411    /// Event rate in events per second. Inherited from the parent
412    /// normalized entry.
413    pub rate: f64,
414    /// Total run duration (e.g. `"30s"`, `"5m"`).
415    pub duration: Option<String>,
416    /// Value generator configuration (metrics signals only).
417    pub generator: Option<GeneratorConfig>,
418    /// Log generator configuration (logs signals only).
419    pub log_generator: Option<LogGeneratorConfig>,
420    /// Static labels after the full precedence chain has been applied.
421    ///
422    /// For pack-expanded entries this is the level-2-through-7 merge
423    /// described in the module docs. For inline entries it is the
424    /// already-merged map produced by Phase 2 normalization (unchanged).
425    ///
426    /// `None` when no source contributed any labels.
427    pub labels: Option<BTreeMap<String, String>>,
428    /// Dynamic (rotating) label configurations.
429    pub dynamic_labels: Option<Vec<DynamicLabelConfig>>,
430    /// Encoder configuration.
431    pub encoder: EncoderConfig,
432    /// Sink configuration.
433    pub sink: SinkConfig,
434    /// Jitter amplitude applied to generated values.
435    pub jitter: Option<f64>,
436    /// Deterministic seed for jitter RNG.
437    pub jitter_seed: Option<u64>,
438    /// Recurring silent-period configuration.
439    pub gaps: Option<GapConfig>,
440    /// Recurring high-rate burst configuration.
441    pub bursts: Option<BurstConfig>,
442    /// Cardinality spike configurations.
443    pub cardinality_spikes: Option<Vec<CardinalitySpikeConfig>>,
444    /// Phase offset for staggered start within a clock group.
445    pub phase_offset: Option<String>,
446    /// Clock group for coordinated timing across entries.
447    pub clock_group: Option<String>,
448    /// Causal dependency on another signal's value.
449    ///
450    /// For pack-expanded signals, an override-level `after` replaces the
451    /// parent pack entry's `after`; otherwise the parent's `after` is
452    /// propagated verbatim. Resolution into timing offsets is Phase 4's job.
453    pub after: Option<AfterClause>,
454    /// Continuous lifecycle gate on another signal's value.
455    ///
456    /// Override-level `while:` replaces entry-level `while:` for that
457    /// metric; otherwise the parent's `while:` is propagated verbatim.
458    #[cfg_attr(feature = "config", serde(skip_serializing_if = "Option::is_none"))]
459    pub while_clause: Option<WhileClause>,
460    /// Open / close debounce windows for `while_clause` transitions.
461    #[cfg_attr(feature = "config", serde(skip_serializing_if = "Option::is_none"))]
462    pub delay_clause: Option<DelayClause>,
463
464    // -- Histogram / summary fields (inline entries only) --
465    //
466    // Pack entries cannot carry these (spec §2.4: pack entries must have
467    // signal_type: metrics, parse-time validation enforces that). They
468    // survive here purely as carry-through for inline histogram/summary
469    // signals.
470    /// Distribution model for histogram or summary observations.
471    pub distribution: Option<DistributionConfig>,
472    /// Histogram bucket boundaries (histogram only).
473    pub buckets: Option<Vec<f64>>,
474    /// Summary quantile boundaries (summary only).
475    pub quantiles: Option<Vec<f64>>,
476    /// Number of observations sampled per tick.
477    pub observations_per_tick: Option<u32>,
478    /// Linear drift applied to the distribution mean each second.
479    pub mean_shift_per_sec: Option<f64>,
480    /// Deterministic seed for histogram/summary sampling.
481    pub seed: Option<u64>,
482    /// Resolved sink-error policy.
483    pub on_sink_error: OnSinkError,
484}
485
486// ---------------------------------------------------------------------------
487// Public API
488// ---------------------------------------------------------------------------
489
490/// Expand every pack entry in a normalized v2 scenario file.
491///
492/// Inline entries in [`NormalizedFile::entries`] are copied through
493/// verbatim (without re-applying `defaults_labels` — Phase 2 handled that).
494/// Pack entries are materialized into one [`ExpandedEntry`] per metric in
495/// the resolved pack, with labels composed according to the module-level
496/// precedence chain and fields propagated per spec §4.3.
497///
498/// Id uniqueness — including collisions between user-provided ids and
499/// auto-synthesized pack-entry ids — is enforced after expansion; the parser
500/// only validates user-provided ids.
501///
502/// # Errors
503///
504/// - [`ExpandError::ResolveFailed`] when the resolver cannot produce a
505///   [`MetricPackDef`] for a pack reference.
506/// - [`ExpandError::UnknownOverrideKey`] when an override targets a metric
507///   that is not present in the resolved pack definition.
508/// - [`ExpandError::EmptyPack`] when the resolved pack has no metrics.
509/// - [`ExpandError::DuplicateEntryId`] when two entries end up with the
510///   same identifier after synthesis (e.g. a user-authored inline id
511///   colliding with an auto-generated pack-entry id, or two sub-signal
512///   ids composing to the same string).
513pub fn expand<R: PackResolver>(
514    file: NormalizedFile,
515    resolver: &R,
516) -> Result<ExpandedFile, ExpandError> {
517    let defaults_labels = file.defaults_labels;
518    let mut entries: Vec<ExpandedEntry> = Vec::with_capacity(file.entries.len());
519    // Collects every id that occupies the signal-id namespace so we can
520    // catch collisions between user-authored ids, synthesized pack-entry
521    // ids, and pack sub-signal ids in a single pass.
522    let mut id_registry: BTreeMap<String, String> = BTreeMap::new();
523
524    for (index, entry) in file.entries.into_iter().enumerate() {
525        if entry.pack.is_some() {
526            expand_pack_entry(
527                entry,
528                index,
529                defaults_labels.as_ref(),
530                resolver,
531                &mut entries,
532                &mut id_registry,
533            )?;
534        } else {
535            let expanded = expand_inline_entry(entry);
536            if let Some(id) = expanded.id.as_ref() {
537                record_id(&mut id_registry, id, format!("inline entry '{id}'"))?;
538            }
539            entries.push(expanded);
540        }
541    }
542
543    Ok(ExpandedFile {
544        version: file.version,
545        scenario_name: file.scenario_name,
546        entries,
547    })
548}
549
550/// Insert an identifier into the post-expansion uniqueness registry.
551///
552/// Returns [`ExpandError::DuplicateEntryId`] if `id` was already registered,
553/// tagging both the previous and current source so the diagnostic points at
554/// both contributors.
555fn record_id(
556    registry: &mut BTreeMap<String, String>,
557    id: &str,
558    source: String,
559) -> Result<(), ExpandError> {
560    if let Some(prior) = registry.get(id) {
561        return Err(ExpandError::DuplicateEntryId {
562            id: id.to_string(),
563            first_source: prior.clone(),
564            second_source: source,
565        });
566    }
567    registry.insert(id.to_string(), source);
568    Ok(())
569}
570
571// ---------------------------------------------------------------------------
572// Inline pass-through
573// ---------------------------------------------------------------------------
574
575/// Convert an inline [`NormalizedEntry`] into an [`ExpandedEntry`].
576///
577/// Labels are preserved verbatim — Phase 2 normalization already merged
578/// `defaults_labels` into inline entries. Re-applying them here would
579/// double-merge a map the user already sees in the normalized output.
580fn expand_inline_entry(entry: NormalizedEntry) -> ExpandedEntry {
581    ExpandedEntry {
582        id: entry.id,
583        signal_type: entry.signal_type,
584        // Inline entries always have `name` by parse-time validation.
585        name: entry.name.unwrap_or_default(),
586        rate: entry.rate,
587        duration: entry.duration,
588        generator: entry.generator,
589        log_generator: entry.log_generator,
590        labels: entry.labels,
591        dynamic_labels: entry.dynamic_labels,
592        encoder: entry.encoder,
593        sink: entry.sink,
594        jitter: entry.jitter,
595        jitter_seed: entry.jitter_seed,
596        gaps: entry.gaps,
597        bursts: entry.bursts,
598        cardinality_spikes: entry.cardinality_spikes,
599        phase_offset: entry.phase_offset,
600        clock_group: entry.clock_group,
601        after: entry.after,
602        while_clause: entry.while_clause,
603        delay_clause: entry.delay_clause,
604        distribution: entry.distribution,
605        buckets: entry.buckets,
606        quantiles: entry.quantiles,
607        observations_per_tick: entry.observations_per_tick,
608        mean_shift_per_sec: entry.mean_shift_per_sec,
609        on_sink_error: entry.on_sink_error,
610        seed: entry.seed,
611    }
612}
613
614// ---------------------------------------------------------------------------
615// Pack expansion
616// ---------------------------------------------------------------------------
617
618/// Expand a single pack-backed [`NormalizedEntry`] into one [`ExpandedEntry`]
619/// per metric in the resolved pack, appending to `out` and tracking every
620/// produced id in `id_registry` for the post-expansion uniqueness check.
621fn expand_pack_entry<R: PackResolver>(
622    entry: NormalizedEntry,
623    entry_index: usize,
624    defaults_labels: Option<&BTreeMap<String, String>>,
625    resolver: &R,
626    out: &mut Vec<ExpandedEntry>,
627    id_registry: &mut BTreeMap<String, String>,
628) -> Result<(), ExpandError> {
629    // `entry.pack` is Some() by the caller's check; unwrap defensively.
630    let reference = entry
631        .pack
632        .as_deref()
633        .expect("expand_pack_entry called with non-pack entry; caller must check");
634
635    let pack = resolver
636        .resolve(reference)
637        .map_err(|e| ExpandError::ResolveFailed {
638            reference: reference.to_string(),
639            message: e.message,
640        })?;
641
642    if pack.metrics.is_empty() {
643        return Err(ExpandError::EmptyPack {
644            pack_name: pack.name,
645        });
646    }
647
648    validate_override_keys(&pack, entry.overrides.as_ref())?;
649
650    let (effective_entry_id, effective_id_source) = match entry.id.clone() {
651        Some(id) => (id.clone(), format!("pack entry '{id}' (user-provided id)")),
652        None => {
653            let synthesized = format!("{}_{}", pack.name, entry_index);
654            (
655                synthesized.clone(),
656                format!(
657                    "pack entry at index {entry_index} (auto-generated id '{synthesized}' \
658                     from pack '{}')",
659                    pack.name
660                ),
661            )
662        }
663    };
664
665    // The effective pack-entry id occupies the signal-id namespace even
666    // though no single `ExpandedEntry` carries it verbatim: its sub-signal
667    // ids live underneath (e.g. `{effective_entry_id}.{metric_name}`) and a
668    // future `after.ref` targeting `effective_entry_id` would resolve into
669    // this namespace. Register it so user-authored ids cannot silently
670    // shadow an auto-generated pack-entry id and vice versa.
671    record_id(id_registry, &effective_entry_id, effective_id_source)?;
672
673    // Per the module docs, sub-signal ids default to
674    // `"{effective_entry_id}.{metric_name}"` but metrics whose name collides
675    // with another spec in the same pack receive an additional
676    // `"#{spec_index}"` suffix. This keeps the common case clean while
677    // preventing id collisions for packs like `node_exporter_cpu` where
678    // multiple `MetricSpec`s share a metric name.
679    let duplicate_metric_names = duplicate_metric_names(&pack);
680
681    for (spec_index, metric) in pack.metrics.iter().enumerate() {
682        let override_for_metric = entry
683            .overrides
684            .as_ref()
685            .and_then(|map| map.get(&metric.name));
686
687        let labels = compose_pack_metric_labels(
688            defaults_labels,
689            pack.shared_labels.as_ref(),
690            metric.labels.as_ref(),
691            entry.labels.as_ref(),
692            override_for_metric.and_then(|o| o.labels.as_ref()),
693        );
694
695        let generator = select_pack_metric_generator(metric, override_for_metric);
696
697        // Override-level `after` replaces entry-level `after` for this
698        // specific expanded metric; otherwise propagate the parent's
699        // `after` verbatim. We do NOT resolve `after.ref` here — that is
700        // Phase 4's job.
701        let after = override_for_metric
702            .and_then(|o| o.after.clone())
703            .or_else(|| entry.after.clone());
704        let while_clause = override_for_metric
705            .and_then(|o| o.while_clause.clone())
706            .or_else(|| entry.while_clause.clone());
707        let delay_clause = override_for_metric
708            .and_then(|o| o.delay_clause.clone())
709            .or_else(|| entry.delay_clause.clone());
710
711        let sub_signal_id = if duplicate_metric_names.contains(metric.name.as_str()) {
712            format!("{}.{}#{}", effective_entry_id, metric.name, spec_index)
713        } else {
714            format!("{}.{}", effective_entry_id, metric.name)
715        };
716        record_id(
717            id_registry,
718            &sub_signal_id,
719            format!(
720                "pack sub-signal '{sub_signal_id}' (pack '{}', metric '{}' at index {spec_index})",
721                pack.name, metric.name
722            ),
723        )?;
724
725        out.push(ExpandedEntry {
726            id: Some(sub_signal_id),
727            signal_type: "metrics".to_string(),
728            name: metric.name.clone(),
729            rate: entry.rate,
730            duration: entry.duration.clone(),
731            generator: Some(generator),
732            log_generator: None,
733            labels,
734            dynamic_labels: entry.dynamic_labels.clone(),
735            encoder: entry.encoder.clone(),
736            sink: entry.sink.clone(),
737            jitter: entry.jitter,
738            jitter_seed: entry.jitter_seed,
739            gaps: entry.gaps.clone(),
740            bursts: entry.bursts.clone(),
741            cardinality_spikes: entry.cardinality_spikes.clone(),
742            phase_offset: entry.phase_offset.clone(),
743            clock_group: entry.clock_group.clone(),
744            after,
745            while_clause,
746            delay_clause,
747            distribution: None,
748            buckets: None,
749            quantiles: None,
750            observations_per_tick: None,
751            mean_shift_per_sec: None,
752            seed: None,
753            on_sink_error: entry.on_sink_error,
754        });
755    }
756
757    Ok(())
758}
759
760/// Return the set of metric names that appear more than once in `pack`.
761///
762/// Used by [`expand_pack_entry`] to decide which sub-signal ids need a
763/// `"#{spec_index}"` disambiguator per the scheme documented in the module
764/// docs. Unique metric names stay out of this set and keep their clean
765/// `{effective_entry_id}.{metric_name}` form.
766fn duplicate_metric_names(pack: &MetricPackDef) -> BTreeSet<&str> {
767    let mut seen: BTreeSet<&str> = BTreeSet::new();
768    let mut duplicates: BTreeSet<&str> = BTreeSet::new();
769    for metric in &pack.metrics {
770        if !seen.insert(metric.name.as_str()) {
771            duplicates.insert(metric.name.as_str());
772        }
773    }
774    duplicates
775}
776
777/// Reject overrides whose keys do not match any metric name in the pack.
778///
779/// Matches the message shape produced by
780/// [`crate::packs::expand_pack`] so v1 and v2 surfaces stay consistent.
781fn validate_override_keys(
782    pack: &MetricPackDef,
783    overrides: Option<&BTreeMap<String, MetricOverride>>,
784) -> Result<(), ExpandError> {
785    let Some(overrides) = overrides else {
786        return Ok(());
787    };
788    if overrides.is_empty() {
789        return Ok(());
790    }
791
792    let metric_names: BTreeSet<&str> = pack.metrics.iter().map(|m| m.name.as_str()).collect();
793    for key in overrides.keys() {
794        if !metric_names.contains(key.as_str()) {
795            let available: Vec<&str> = pack.metrics.iter().map(|m| m.name.as_str()).collect();
796            return Err(ExpandError::UnknownOverrideKey {
797                key: key.clone(),
798                pack_name: pack.name.clone(),
799                available: available.join(", "),
800            });
801        }
802    }
803    Ok(())
804}
805
806/// Compose the final label map for a single pack-expanded metric.
807///
808/// Applies the five label layers in the precedence order documented at
809/// the module level. `None` maps are skipped. Uses [`BTreeMap`] for
810/// deterministic iteration order so snapshot tests are stable.
811fn compose_pack_metric_labels(
812    defaults_labels: Option<&BTreeMap<String, String>>,
813    pack_shared_labels: Option<&std::collections::HashMap<String, String>>,
814    pack_metric_labels: Option<&std::collections::HashMap<String, String>>,
815    entry_labels: Option<&BTreeMap<String, String>>,
816    override_labels: Option<&BTreeMap<String, String>>,
817) -> Option<BTreeMap<String, String>> {
818    let mut merged: BTreeMap<String, String> = BTreeMap::new();
819
820    // Level 2: file-level defaults labels.
821    if let Some(src) = defaults_labels {
822        for (k, v) in src {
823            merged.insert(k.clone(), v.clone());
824        }
825    }
826
827    // Level 4: pack shared_labels.
828    if let Some(src) = pack_shared_labels {
829        for (k, v) in src {
830            merged.insert(k.clone(), v.clone());
831        }
832    }
833
834    // Level 5: pack per-metric labels.
835    if let Some(src) = pack_metric_labels {
836        for (k, v) in src {
837            merged.insert(k.clone(), v.clone());
838        }
839    }
840
841    // Level 6: entry-level labels on the pack entry.
842    if let Some(src) = entry_labels {
843        for (k, v) in src {
844            merged.insert(k.clone(), v.clone());
845        }
846    }
847
848    // Level 7: override-level labels.
849    if let Some(src) = override_labels {
850        for (k, v) in src {
851            merged.insert(k.clone(), v.clone());
852        }
853    }
854
855    if merged.is_empty() {
856        None
857    } else {
858        Some(merged)
859    }
860}
861
862/// Choose the generator for a pack-expanded metric.
863///
864/// Precedence: override generator > pack metric generator > `constant(0.0)`.
865/// Matches the fallback used by [`crate::packs::expand_pack`] so v1 and v2
866/// behave identically when a pack metric has no generator declared.
867fn select_pack_metric_generator(
868    metric: &crate::packs::MetricSpec,
869    metric_override: Option<&MetricOverride>,
870) -> GeneratorConfig {
871    if let Some(over) = metric_override {
872        if let Some(gen) = over.generator.clone() {
873            return gen;
874        }
875    }
876    metric
877        .generator
878        .clone()
879        .unwrap_or(GeneratorConfig::Constant { value: 0.0 })
880}
881
882// ---------------------------------------------------------------------------
883// Tests
884// ---------------------------------------------------------------------------
885
886#[cfg(test)]
887mod tests {
888    use super::*;
889    use crate::compiler::normalize::normalize;
890    use crate::compiler::parse::parse;
891    use crate::compiler::AfterOp;
892    use crate::packs::MetricSpec;
893    use std::collections::HashMap;
894
895    // -----------------------------------------------------------------------
896    // Test helpers
897    // -----------------------------------------------------------------------
898
899    fn telegraf_pack() -> MetricPackDef {
900        let mut shared = HashMap::new();
901        shared.insert("device".to_string(), String::new());
902        shared.insert("job".to_string(), "snmp".to_string());
903
904        MetricPackDef {
905            name: "telegraf_snmp_interface".to_string(),
906            description: "test".to_string(),
907            category: "network".to_string(),
908            shared_labels: Some(shared),
909            metrics: vec![
910                MetricSpec {
911                    name: "ifOperStatus".to_string(),
912                    labels: None,
913                    generator: Some(GeneratorConfig::Constant { value: 1.0 }),
914                },
915                MetricSpec {
916                    name: "ifHCInOctets".to_string(),
917                    labels: None,
918                    generator: Some(GeneratorConfig::Step {
919                        start: Some(0.0),
920                        step_size: 125_000.0,
921                        max: None,
922                    }),
923                },
924            ],
925        }
926    }
927
928    fn node_cpu_pack() -> MetricPackDef {
929        let mut shared = HashMap::new();
930        shared.insert("job".to_string(), "node_exporter".to_string());
931
932        let mut user_labels = HashMap::new();
933        user_labels.insert("mode".to_string(), "user".to_string());
934
935        let mut system_labels = HashMap::new();
936        system_labels.insert("mode".to_string(), "system".to_string());
937
938        MetricPackDef {
939            name: "node_exporter_cpu".to_string(),
940            description: "test".to_string(),
941            category: "infrastructure".to_string(),
942            shared_labels: Some(shared),
943            metrics: vec![
944                MetricSpec {
945                    name: "node_cpu_seconds_total".to_string(),
946                    labels: Some(user_labels),
947                    generator: Some(GeneratorConfig::Step {
948                        start: Some(0.0),
949                        step_size: 0.25,
950                        max: None,
951                    }),
952                },
953                MetricSpec {
954                    name: "node_cpu_seconds_total".to_string(),
955                    labels: Some(system_labels),
956                    generator: Some(GeneratorConfig::Step {
957                        start: Some(0.0),
958                        step_size: 0.10,
959                        max: None,
960                    }),
961                },
962            ],
963        }
964    }
965
966    fn expand_yaml(yaml: &str, resolver: &InMemoryPackResolver) -> ExpandedFile {
967        let parsed = parse(yaml).expect("parse must succeed");
968        let normalized = normalize(parsed).expect("normalize must succeed");
969        expand(normalized, resolver).expect("expand must succeed")
970    }
971
972    // -----------------------------------------------------------------------
973    // Resolver classification & in-memory impl
974    // -----------------------------------------------------------------------
975
976    #[rustfmt::skip]
977    #[rstest::rstest]
978    #[case::plain_name("telegraf_snmp_interface",  PackResolveOrigin::Name)]
979    #[case::dot_relative("./packs/custom.yaml",    PackResolveOrigin::FilePath)]
980    #[case::absolute_path("/abs/path/pack.yaml",   PackResolveOrigin::FilePath)]
981    #[case::plain_relative("rel/pack.yaml",        PackResolveOrigin::FilePath)]
982    fn classify_pack_reference_distinguishes_name_and_file_path(
983        #[case] reference: &str,
984        #[case] expected: PackResolveOrigin,
985    ) {
986        assert_eq!(classify_pack_reference(reference), expected);
987    }
988
989    #[test]
990    fn in_memory_resolver_returns_registered_pack() {
991        let mut r = InMemoryPackResolver::new();
992        r.insert("telegraf_snmp_interface", telegraf_pack());
993        let def = r.resolve("telegraf_snmp_interface").expect("must resolve");
994        assert_eq!(def.name, "telegraf_snmp_interface");
995    }
996
997    #[test]
998    fn in_memory_resolver_errors_on_missing_reference() {
999        let r = InMemoryPackResolver::new();
1000        let err = r.resolve("nope").expect_err("must error");
1001        assert_eq!(err.origin, PackResolveOrigin::Name);
1002        assert!(err.message.contains("nope"));
1003    }
1004
1005    #[test]
1006    fn in_memory_resolver_classifies_file_paths() {
1007        let r = InMemoryPackResolver::new();
1008        let err = r.resolve("./no-such.yaml").expect_err("must error");
1009        assert_eq!(err.origin, PackResolveOrigin::FilePath);
1010    }
1011
1012    // -----------------------------------------------------------------------
1013    // Happy path: pack expansion produces one entry per metric
1014    // -----------------------------------------------------------------------
1015
1016    #[test]
1017    fn expand_produces_one_entry_per_pack_metric() {
1018        let yaml = r#"
1019version: 2
1020defaults:
1021  rate: 1
1022scenarios:
1023  - id: primary
1024    signal_type: metrics
1025    pack: telegraf_snmp_interface
1026"#;
1027        let mut resolver = InMemoryPackResolver::new();
1028        resolver.insert("telegraf_snmp_interface", telegraf_pack());
1029        let expanded = expand_yaml(yaml, &resolver);
1030        assert_eq!(expanded.entries.len(), 2);
1031        assert_eq!(expanded.entries[0].name, "ifOperStatus");
1032        assert_eq!(expanded.entries[1].name, "ifHCInOctets");
1033    }
1034
1035    #[test]
1036    fn expanded_signal_type_is_metrics() {
1037        let yaml = r#"
1038version: 2
1039defaults: { rate: 1 }
1040scenarios:
1041  - signal_type: metrics
1042    pack: telegraf_snmp_interface
1043"#;
1044        let mut resolver = InMemoryPackResolver::new();
1045        resolver.insert("telegraf_snmp_interface", telegraf_pack());
1046        let expanded = expand_yaml(yaml, &resolver);
1047        for e in &expanded.entries {
1048            assert_eq!(e.signal_type, "metrics");
1049        }
1050    }
1051
1052    // -----------------------------------------------------------------------
1053    // Sub-signal IDs: user-provided and auto-generated
1054    // -----------------------------------------------------------------------
1055
1056    #[rustfmt::skip]
1057    #[rstest::rstest]
1058    // User-supplied id becomes the effective entry id; sub-signal ids use
1059    // the clean `{entry_id}.{metric}` shape.
1060    #[case::user_supplied_entry_id(r#"
1061version: 2
1062defaults: { rate: 1 }
1063scenarios:
1064  - id: primary
1065    signal_type: metrics
1066    pack: telegraf_snmp_interface
1067"#, "primary.ifOperStatus", "primary.ifHCInOctets")]
1068    // Anonymous pack entries use the auto-id scheme
1069    // `{pack_def_name}_{entry_index}`, so at index 0 the effective id is
1070    // `telegraf_snmp_interface_0`.
1071    #[case::auto_generated_entry_id(r#"
1072version: 2
1073defaults: { rate: 1 }
1074scenarios:
1075  - signal_type: metrics
1076    pack: telegraf_snmp_interface
1077"#, "telegraf_snmp_interface_0.ifOperStatus", "telegraf_snmp_interface_0.ifHCInOctets")]
1078    fn sub_signal_ids_follow_effective_entry_id(
1079        #[case] yaml: &str,
1080        #[case] expected_first: &str,
1081        #[case] expected_second: &str,
1082    ) {
1083        let mut resolver = InMemoryPackResolver::new();
1084        resolver.insert("telegraf_snmp_interface", telegraf_pack());
1085        let expanded = expand_yaml(yaml, &resolver);
1086        assert_eq!(expanded.entries[0].id.as_deref(), Some(expected_first));
1087        assert_eq!(expanded.entries[1].id.as_deref(), Some(expected_second));
1088    }
1089
1090    #[test]
1091    fn two_anonymous_pack_entries_disambiguate_by_index() {
1092        let yaml = r#"
1093version: 2
1094defaults: { rate: 1 }
1095scenarios:
1096  - signal_type: metrics
1097    pack: telegraf_snmp_interface
1098  - signal_type: metrics
1099    pack: telegraf_snmp_interface
1100"#;
1101        let mut resolver = InMemoryPackResolver::new();
1102        resolver.insert("telegraf_snmp_interface", telegraf_pack());
1103        let expanded = expand_yaml(yaml, &resolver);
1104        let ids: Vec<_> = expanded
1105            .entries
1106            .iter()
1107            .filter_map(|e| e.id.as_deref())
1108            .collect();
1109        assert!(ids.contains(&"telegraf_snmp_interface_0.ifOperStatus"));
1110        assert!(ids.contains(&"telegraf_snmp_interface_1.ifOperStatus"));
1111        // All IDs must be unique.
1112        let mut sorted = ids.clone();
1113        sorted.sort();
1114        sorted.dedup();
1115        assert_eq!(sorted.len(), ids.len(), "ids must be unique");
1116    }
1117
1118    // -----------------------------------------------------------------------
1119    // Label precedence chain
1120    // -----------------------------------------------------------------------
1121
1122    #[test]
1123    fn label_precedence_chain_applied_in_order() {
1124        // defaults -> shared -> metric -> entry -> override
1125        // We test that each layer overrides its predecessor on 'region'.
1126        let mut shared = HashMap::new();
1127        shared.insert("region".to_string(), "shared-region".to_string());
1128        shared.insert("job".to_string(), "snmp".to_string());
1129
1130        let mut metric_labels = HashMap::new();
1131        metric_labels.insert("region".to_string(), "metric-region".to_string());
1132
1133        let pack = MetricPackDef {
1134            name: "p".to_string(),
1135            description: "t".to_string(),
1136            category: "c".to_string(),
1137            shared_labels: Some(shared),
1138            metrics: vec![MetricSpec {
1139                name: "m".to_string(),
1140                labels: Some(metric_labels),
1141                generator: Some(GeneratorConfig::Constant { value: 0.0 }),
1142            }],
1143        };
1144
1145        let mut resolver = InMemoryPackResolver::new();
1146        resolver.insert("p", pack);
1147
1148        let yaml = r#"
1149version: 2
1150defaults:
1151  rate: 1
1152  labels:
1153    region: defaults-region
1154    env: prod
1155scenarios:
1156  - id: e
1157    signal_type: metrics
1158    pack: p
1159    labels:
1160      region: entry-region
1161      device: rtr-01
1162    overrides:
1163      m:
1164        labels:
1165          region: override-region
1166"#;
1167        let expanded = expand_yaml(yaml, &resolver);
1168        let labels = expanded.entries[0].labels.as_ref().unwrap();
1169
1170        // Highest precedence wins.
1171        assert_eq!(labels.get("region").unwrap(), "override-region");
1172        // Lower layers contribute when no higher layer overrides.
1173        assert_eq!(labels.get("env").unwrap(), "prod");
1174        assert_eq!(labels.get("job").unwrap(), "snmp");
1175        assert_eq!(labels.get("device").unwrap(), "rtr-01");
1176    }
1177
1178    #[test]
1179    fn defaults_labels_flow_into_pack_metric_labels() {
1180        // Spec §2.2: defaults.labels at precedence level 2 must reach the
1181        // final map for pack-expanded signals.
1182        let yaml = r#"
1183version: 2
1184defaults:
1185  rate: 1
1186  labels:
1187    env: prod
1188scenarios:
1189  - id: p
1190    signal_type: metrics
1191    pack: telegraf_snmp_interface
1192"#;
1193        let mut resolver = InMemoryPackResolver::new();
1194        resolver.insert("telegraf_snmp_interface", telegraf_pack());
1195        let expanded = expand_yaml(yaml, &resolver);
1196        let labels = expanded.entries[0].labels.as_ref().unwrap();
1197        assert_eq!(labels.get("env").unwrap(), "prod");
1198    }
1199
1200    #[test]
1201    fn pack_shared_labels_override_defaults_labels() {
1202        let mut shared = HashMap::new();
1203        shared.insert("job".to_string(), "snmp".to_string());
1204        let pack = MetricPackDef {
1205            name: "p".to_string(),
1206            description: "t".to_string(),
1207            category: "c".to_string(),
1208            shared_labels: Some(shared),
1209            metrics: vec![MetricSpec {
1210                name: "m".to_string(),
1211                labels: None,
1212                generator: Some(GeneratorConfig::Constant { value: 0.0 }),
1213            }],
1214        };
1215        let mut resolver = InMemoryPackResolver::new();
1216        resolver.insert("p", pack);
1217
1218        let yaml = r#"
1219version: 2
1220defaults:
1221  rate: 1
1222  labels:
1223    job: web
1224scenarios:
1225  - signal_type: metrics
1226    pack: p
1227"#;
1228        let expanded = expand_yaml(yaml, &resolver);
1229        let labels = expanded.entries[0].labels.as_ref().unwrap();
1230        assert_eq!(labels.get("job").unwrap(), "snmp");
1231    }
1232
1233    #[test]
1234    fn inline_entry_labels_pass_through_unchanged() {
1235        // Inline entries must NOT re-apply defaults_labels; Phase 2 already
1236        // merged them. Verify that exactly the merged set from normalize
1237        // shows up here — not doubled, not missing a defaults key.
1238        let yaml = r#"
1239version: 2
1240defaults:
1241  rate: 1
1242  labels:
1243    env: prod
1244scenarios:
1245  - signal_type: metrics
1246    name: cpu
1247    generator: { type: constant, value: 1 }
1248    labels:
1249      instance: web-01
1250"#;
1251        let resolver = InMemoryPackResolver::new();
1252        let expanded = expand_yaml(yaml, &resolver);
1253        let labels = expanded.entries[0].labels.as_ref().unwrap();
1254        assert_eq!(labels.get("env").unwrap(), "prod");
1255        assert_eq!(labels.get("instance").unwrap(), "web-01");
1256        assert_eq!(labels.len(), 2);
1257    }
1258
1259    // -----------------------------------------------------------------------
1260    // Generator precedence: override > spec > constant(0)
1261    // -----------------------------------------------------------------------
1262
1263    #[test]
1264    fn override_generator_replaces_pack_generator() {
1265        let yaml = r#"
1266version: 2
1267defaults: { rate: 1 }
1268scenarios:
1269  - id: e
1270    signal_type: metrics
1271    pack: telegraf_snmp_interface
1272    overrides:
1273      ifOperStatus:
1274        generator:
1275          type: flap
1276          up_duration: 60s
1277          down_duration: 30s
1278"#;
1279        let mut resolver = InMemoryPackResolver::new();
1280        resolver.insert("telegraf_snmp_interface", telegraf_pack());
1281        let expanded = expand_yaml(yaml, &resolver);
1282        // ifOperStatus got the flap override
1283        assert!(matches!(
1284            expanded.entries[0].generator.as_ref().unwrap(),
1285            GeneratorConfig::Flap { .. }
1286        ));
1287        // ifHCInOctets kept its pack default (step)
1288        assert!(matches!(
1289            expanded.entries[1].generator.as_ref().unwrap(),
1290            GeneratorConfig::Step { .. }
1291        ));
1292    }
1293
1294    #[test]
1295    fn missing_generator_falls_back_to_constant_zero() {
1296        let pack = MetricPackDef {
1297            name: "p".to_string(),
1298            description: "t".to_string(),
1299            category: "c".to_string(),
1300            shared_labels: None,
1301            metrics: vec![MetricSpec {
1302                name: "x".to_string(),
1303                labels: None,
1304                generator: None,
1305            }],
1306        };
1307        let mut resolver = InMemoryPackResolver::new();
1308        resolver.insert("p", pack);
1309
1310        let yaml = r#"
1311version: 2
1312defaults: { rate: 1 }
1313scenarios:
1314  - signal_type: metrics
1315    pack: p
1316"#;
1317        let expanded = expand_yaml(yaml, &resolver);
1318        match expanded.entries[0].generator.as_ref().unwrap() {
1319            GeneratorConfig::Constant { value } => assert_eq!(*value, 0.0),
1320            other => panic!("expected constant(0), got {other:?}"),
1321        }
1322    }
1323
1324    // -----------------------------------------------------------------------
1325    // After-clause propagation
1326    // -----------------------------------------------------------------------
1327
1328    #[test]
1329    fn entry_level_after_propagates_to_every_metric() {
1330        let yaml = r#"
1331version: 2
1332defaults: { rate: 1 }
1333scenarios:
1334  - id: tail
1335    signal_type: metrics
1336    pack: telegraf_snmp_interface
1337    after:
1338      ref: head
1339      op: ">"
1340      value: 5
1341"#;
1342        let mut resolver = InMemoryPackResolver::new();
1343        resolver.insert("telegraf_snmp_interface", telegraf_pack());
1344        let expanded = expand_yaml(yaml, &resolver);
1345        for e in &expanded.entries {
1346            let after = e.after.as_ref().expect("after must be propagated");
1347            assert_eq!(after.ref_id, "head");
1348            assert!(matches!(after.op, AfterOp::GreaterThan));
1349        }
1350    }
1351
1352    #[test]
1353    fn entry_level_while_propagates_to_every_metric() {
1354        let yaml = r#"
1355version: 2
1356defaults: { rate: 1, duration: 5m }
1357scenarios:
1358  - id: head
1359    signal_type: metrics
1360    name: head
1361    generator: { type: constant, value: 1 }
1362  - id: tail
1363    signal_type: metrics
1364    pack: telegraf_snmp_interface
1365    while:
1366      ref: head
1367      op: ">"
1368      value: 5
1369"#;
1370        let mut resolver = InMemoryPackResolver::new();
1371        resolver.insert("telegraf_snmp_interface", telegraf_pack());
1372        let expanded = expand_yaml(yaml, &resolver);
1373        let pack_subs: Vec<_> = expanded
1374            .entries
1375            .iter()
1376            .filter(|e| {
1377                e.id.as_deref()
1378                    .map(|s| s.starts_with("tail."))
1379                    .unwrap_or(false)
1380            })
1381            .collect();
1382        assert!(!pack_subs.is_empty());
1383        for e in pack_subs {
1384            let w = e.while_clause.as_ref().expect("while must be propagated");
1385            assert_eq!(w.ref_id, "head");
1386        }
1387    }
1388
1389    #[test]
1390    fn override_while_replaces_entry_while_for_that_metric() {
1391        let yaml = r#"
1392version: 2
1393defaults: { rate: 1, duration: 5m }
1394scenarios:
1395  - id: head
1396    signal_type: metrics
1397    name: head
1398    generator: { type: constant, value: 1 }
1399  - id: other
1400    signal_type: metrics
1401    name: other
1402    generator: { type: constant, value: 1 }
1403  - id: tail
1404    signal_type: metrics
1405    pack: telegraf_snmp_interface
1406    while:
1407      ref: head
1408      op: ">"
1409      value: 5
1410    overrides:
1411      ifOperStatus:
1412        while:
1413          ref: other
1414          op: "<"
1415          value: 1
1416"#;
1417        let mut resolver = InMemoryPackResolver::new();
1418        resolver.insert("telegraf_snmp_interface", telegraf_pack());
1419        let expanded = expand_yaml(yaml, &resolver);
1420        let oper = expanded
1421            .entries
1422            .iter()
1423            .find(|e| e.name == "ifOperStatus")
1424            .unwrap();
1425        assert_eq!(oper.while_clause.as_ref().unwrap().ref_id, "other");
1426        let in_octets = expanded
1427            .entries
1428            .iter()
1429            .find(|e| e.name == "ifHCInOctets")
1430            .unwrap();
1431        assert_eq!(in_octets.while_clause.as_ref().unwrap().ref_id, "head");
1432    }
1433
1434    #[test]
1435    fn override_after_replaces_entry_after_for_that_metric() {
1436        let yaml = r#"
1437version: 2
1438defaults: { rate: 1 }
1439scenarios:
1440  - id: tail
1441    signal_type: metrics
1442    pack: telegraf_snmp_interface
1443    after:
1444      ref: head
1445      op: ">"
1446      value: 5
1447    overrides:
1448      ifOperStatus:
1449        after:
1450          ref: other
1451          op: "<"
1452          value: 1
1453"#;
1454        let mut resolver = InMemoryPackResolver::new();
1455        resolver.insert("telegraf_snmp_interface", telegraf_pack());
1456        let expanded = expand_yaml(yaml, &resolver);
1457        let oper = expanded
1458            .entries
1459            .iter()
1460            .find(|e| e.name == "ifOperStatus")
1461            .unwrap();
1462        assert_eq!(oper.after.as_ref().unwrap().ref_id, "other");
1463        let in_octets = expanded
1464            .entries
1465            .iter()
1466            .find(|e| e.name == "ifHCInOctets")
1467            .unwrap();
1468        assert_eq!(in_octets.after.as_ref().unwrap().ref_id, "head");
1469    }
1470
1471    // -----------------------------------------------------------------------
1472    // Field propagation per spec §4.3 step 7
1473    // -----------------------------------------------------------------------
1474
1475    #[test]
1476    fn schedule_delivery_fields_propagate_to_every_metric() {
1477        let yaml = r#"
1478version: 2
1479defaults:
1480  rate: 1
1481  duration: 2m
1482scenarios:
1483  - id: p
1484    signal_type: metrics
1485    pack: telegraf_snmp_interface
1486    phase_offset: 5s
1487    clock_group: uplink
1488    jitter: 0.2
1489    jitter_seed: 42
1490    gaps:
1491      every: 2m
1492      for: 20s
1493    bursts:
1494      every: 5m
1495      for: 30s
1496      multiplier: 10
1497"#;
1498        let mut resolver = InMemoryPackResolver::new();
1499        resolver.insert("telegraf_snmp_interface", telegraf_pack());
1500        let expanded = expand_yaml(yaml, &resolver);
1501        for e in &expanded.entries {
1502            assert_eq!(e.rate, 1.0);
1503            assert_eq!(e.duration.as_deref(), Some("2m"));
1504            assert_eq!(e.phase_offset.as_deref(), Some("5s"));
1505            assert_eq!(e.clock_group.as_deref(), Some("uplink"));
1506            assert_eq!(e.jitter, Some(0.2));
1507            assert_eq!(e.jitter_seed, Some(42));
1508            assert!(e.gaps.is_some());
1509            assert!(e.bursts.is_some());
1510        }
1511    }
1512
1513    // -----------------------------------------------------------------------
1514    // No pack references survive
1515    // -----------------------------------------------------------------------
1516
1517    #[test]
1518    fn expanded_entries_have_no_pack_field() {
1519        // The ExpandedEntry type has no `pack` field by design. This test
1520        // documents that contract via the public surface: once expansion
1521        // runs, the output shape cannot carry unresolved pack references.
1522        let yaml = r#"
1523version: 2
1524defaults: { rate: 1 }
1525scenarios:
1526  - signal_type: metrics
1527    pack: telegraf_snmp_interface
1528"#;
1529        let mut resolver = InMemoryPackResolver::new();
1530        resolver.insert("telegraf_snmp_interface", telegraf_pack());
1531        let expanded = expand_yaml(yaml, &resolver);
1532        // Compile-time guarantee: no access to `pack` or `overrides` on
1533        // ExpandedEntry is possible. At runtime we just make sure entries
1534        // look like concrete signals.
1535        assert!(expanded.entries.iter().all(|e| e.generator.is_some()));
1536    }
1537
1538    // -----------------------------------------------------------------------
1539    // Error cases
1540    // -----------------------------------------------------------------------
1541
1542    #[test]
1543    fn unknown_override_key_is_an_error() {
1544        let yaml = r#"
1545version: 2
1546defaults: { rate: 1 }
1547scenarios:
1548  - signal_type: metrics
1549    pack: telegraf_snmp_interface
1550    overrides:
1551      not_a_metric:
1552        generator:
1553          type: constant
1554          value: 0
1555"#;
1556        let mut resolver = InMemoryPackResolver::new();
1557        resolver.insert("telegraf_snmp_interface", telegraf_pack());
1558        let parsed = parse(yaml).expect("parse");
1559        let normalized = normalize(parsed).expect("normalize");
1560        let err = expand(normalized, &resolver).expect_err("must fail");
1561        match err {
1562            ExpandError::UnknownOverrideKey {
1563                key,
1564                pack_name,
1565                available,
1566            } => {
1567                assert_eq!(key, "not_a_metric");
1568                assert_eq!(pack_name, "telegraf_snmp_interface");
1569                assert!(available.contains("ifOperStatus"));
1570            }
1571            other => panic!("wrong error variant: {other:?}"),
1572        }
1573    }
1574
1575    #[test]
1576    fn unresolvable_pack_is_an_error() {
1577        let yaml = r#"
1578version: 2
1579defaults: { rate: 1 }
1580scenarios:
1581  - signal_type: metrics
1582    pack: nonexistent
1583"#;
1584        let resolver = InMemoryPackResolver::new();
1585        let parsed = parse(yaml).expect("parse");
1586        let normalized = normalize(parsed).expect("normalize");
1587        let err = expand(normalized, &resolver).expect_err("must fail");
1588        match err {
1589            ExpandError::ResolveFailed { reference, message } => {
1590                assert_eq!(reference, "nonexistent");
1591                assert!(message.contains("nonexistent"));
1592            }
1593            other => panic!("wrong error variant: {other:?}"),
1594        }
1595    }
1596
1597    #[test]
1598    fn empty_pack_is_an_error() {
1599        let pack = MetricPackDef {
1600            name: "empty".to_string(),
1601            description: "t".to_string(),
1602            category: "c".to_string(),
1603            shared_labels: None,
1604            metrics: vec![],
1605        };
1606        let mut resolver = InMemoryPackResolver::new();
1607        resolver.insert("empty", pack);
1608        let yaml = r#"
1609version: 2
1610defaults: { rate: 1 }
1611scenarios:
1612  - signal_type: metrics
1613    pack: empty
1614"#;
1615        let parsed = parse(yaml).expect("parse");
1616        let normalized = normalize(parsed).expect("normalize");
1617        let err = expand(normalized, &resolver).expect_err("must fail");
1618        assert!(matches!(err, ExpandError::EmptyPack { pack_name } if pack_name == "empty"));
1619    }
1620
1621    // -----------------------------------------------------------------------
1622    // Inline entries pass through
1623    // -----------------------------------------------------------------------
1624
1625    #[test]
1626    fn inline_entries_pass_through_untouched() {
1627        let yaml = r#"
1628version: 2
1629scenarios:
1630  - id: cpu
1631    signal_type: metrics
1632    name: cpu_usage
1633    rate: 2
1634    duration: 60s
1635    generator: { type: constant, value: 1 }
1636    labels: { instance: web-01 }
1637"#;
1638        let resolver = InMemoryPackResolver::new();
1639        let expanded = expand_yaml(yaml, &resolver);
1640        assert_eq!(expanded.entries.len(), 1);
1641        let e = &expanded.entries[0];
1642        assert_eq!(e.id.as_deref(), Some("cpu"));
1643        assert_eq!(e.name, "cpu_usage");
1644        assert_eq!(e.rate, 2.0);
1645        assert_eq!(e.duration.as_deref(), Some("60s"));
1646        assert_eq!(
1647            e.labels.as_ref().unwrap().get("instance").unwrap(),
1648            "web-01"
1649        );
1650    }
1651
1652    #[test]
1653    fn mixed_inline_and_pack_entries_interleave_correctly() {
1654        let yaml = r#"
1655version: 2
1656defaults: { rate: 1 }
1657scenarios:
1658  - id: cpu
1659    signal_type: metrics
1660    name: cpu_usage
1661    generator: { type: constant, value: 1 }
1662  - id: net
1663    signal_type: metrics
1664    pack: telegraf_snmp_interface
1665"#;
1666        let mut resolver = InMemoryPackResolver::new();
1667        resolver.insert("telegraf_snmp_interface", telegraf_pack());
1668        let expanded = expand_yaml(yaml, &resolver);
1669        // 1 inline + 2 pack metrics = 3 total
1670        assert_eq!(expanded.entries.len(), 3);
1671        assert_eq!(expanded.entries[0].id.as_deref(), Some("cpu"));
1672        assert_eq!(expanded.entries[1].id.as_deref(), Some("net.ifOperStatus"));
1673        assert_eq!(expanded.entries[2].id.as_deref(), Some("net.ifHCInOctets"));
1674    }
1675
1676    // -----------------------------------------------------------------------
1677    // Multiple metric instances with same name (node_exporter_cpu case)
1678    // -----------------------------------------------------------------------
1679
1680    #[test]
1681    fn repeated_metric_names_produce_one_entry_per_spec_instance() {
1682        let yaml = r#"
1683version: 2
1684defaults: { rate: 1 }
1685scenarios:
1686  - id: cpu
1687    signal_type: metrics
1688    pack: node_exporter_cpu
1689"#;
1690        let mut resolver = InMemoryPackResolver::new();
1691        resolver.insert("node_exporter_cpu", node_cpu_pack());
1692        let expanded = expand_yaml(yaml, &resolver);
1693        assert_eq!(expanded.entries.len(), 2);
1694        assert_eq!(expanded.entries[0].name, "node_cpu_seconds_total");
1695        assert_eq!(expanded.entries[1].name, "node_cpu_seconds_total");
1696        // Distinct label `mode` differentiates them.
1697        assert_eq!(
1698            expanded.entries[0]
1699                .labels
1700                .as_ref()
1701                .unwrap()
1702                .get("mode")
1703                .unwrap(),
1704            "user"
1705        );
1706        assert_eq!(
1707            expanded.entries[1]
1708                .labels
1709                .as_ref()
1710                .unwrap()
1711                .get("mode")
1712                .unwrap(),
1713            "system"
1714        );
1715    }
1716
1717    #[test]
1718    fn repeated_metric_names_produce_unique_sub_signal_ids() {
1719        // Regression anchor: every ExpandedEntry.id must be unique even
1720        // when a pack ships multiple MetricSpec entries under one name
1721        // (e.g. node_exporter_cpu). Duplicate names receive a
1722        // "#{spec_index}" suffix per the module-level auto-ID docs.
1723        let yaml = r#"
1724version: 2
1725defaults: { rate: 1 }
1726scenarios:
1727  - id: cpu
1728    signal_type: metrics
1729    pack: node_exporter_cpu
1730"#;
1731        let mut resolver = InMemoryPackResolver::new();
1732        resolver.insert("node_exporter_cpu", node_cpu_pack());
1733        let expanded = expand_yaml(yaml, &resolver);
1734
1735        let ids: Vec<&str> = expanded
1736            .entries
1737            .iter()
1738            .map(|e| {
1739                e.id.as_deref()
1740                    .expect("pack-expanded entries always carry an id")
1741            })
1742            .collect();
1743        let mut unique = ids.clone();
1744        unique.sort();
1745        unique.dedup();
1746        assert_eq!(
1747            unique.len(),
1748            ids.len(),
1749            "sub-signal ids must be unique; saw {ids:?}"
1750        );
1751
1752        // Exact id shape: first two node_cpu_seconds_total specs live at
1753        // pack metric indices 0 and 1.
1754        assert_eq!(ids[0], "cpu.node_cpu_seconds_total#0");
1755        assert_eq!(ids[1], "cpu.node_cpu_seconds_total#1");
1756    }
1757
1758    #[test]
1759    fn unique_metric_names_keep_clean_sub_signal_ids() {
1760        // The `#{spec_index}` disambiguator is applied only when a metric
1761        // name collides with another spec in the same pack. Packs whose
1762        // metrics are unique by name (like telegraf_snmp_interface) keep
1763        // the clean `{effective_entry_id}.{metric_name}` form so dotted
1764        // `after.ref` into a pack sub-signal stays ergonomic.
1765        let yaml = r#"
1766version: 2
1767defaults: { rate: 1 }
1768scenarios:
1769  - id: net
1770    signal_type: metrics
1771    pack: telegraf_snmp_interface
1772"#;
1773        let mut resolver = InMemoryPackResolver::new();
1774        resolver.insert("telegraf_snmp_interface", telegraf_pack());
1775        let expanded = expand_yaml(yaml, &resolver);
1776
1777        let ids: Vec<&str> = expanded
1778            .entries
1779            .iter()
1780            .filter_map(|e| e.id.as_deref())
1781            .collect();
1782        assert_eq!(ids, vec!["net.ifOperStatus", "net.ifHCInOctets"]);
1783    }
1784
1785    // -----------------------------------------------------------------------
1786    // Post-expansion id uniqueness (user-provided vs. auto-synthesized)
1787    // -----------------------------------------------------------------------
1788
1789    #[rustfmt::skip]
1790    #[rstest::rstest]
1791    // Reviewer-described case: the user writes an inline id that equals
1792    // what the anonymous pack entry at the next position would synthesize.
1793    // The parser's id uniqueness pass never sees the synthesized id, so
1794    // this pass must catch the collision.
1795    #[case::inline_first_then_auto(r#"
1796version: 2
1797defaults: { rate: 1 }
1798scenarios:
1799  - id: telegraf_snmp_interface_1
1800    signal_type: metrics
1801    name: cpu
1802    generator: { type: constant, value: 1 }
1803  - signal_type: metrics
1804    pack: telegraf_snmp_interface
1805"#, "telegraf_snmp_interface_1", "inline entry", "auto-generated")]
1806    // Reverse ordering: anonymous pack entry comes first, user-written id
1807    // collides with the synthesized name afterward. The registry must flag
1808    // the collision regardless of source order.
1809    #[case::auto_first_then_inline(r#"
1810version: 2
1811defaults: { rate: 1 }
1812scenarios:
1813  - signal_type: metrics
1814    pack: telegraf_snmp_interface
1815  - id: telegraf_snmp_interface_0
1816    signal_type: metrics
1817    name: cpu
1818    generator: { type: constant, value: 1 }
1819"#, "telegraf_snmp_interface_0", "auto-generated", "inline entry")]
1820    fn duplicate_entry_id_detected_regardless_of_source_order(
1821        #[case] yaml: &str,
1822        #[case] expected_id: &str,
1823        #[case] expected_first_substr: &str,
1824        #[case] expected_second_substr: &str,
1825    ) {
1826        let mut resolver = InMemoryPackResolver::new();
1827        resolver.insert("telegraf_snmp_interface", telegraf_pack());
1828        let parsed = parse(yaml).expect("parse");
1829        let normalized = normalize(parsed).expect("normalize");
1830        let err = expand(normalized, &resolver).expect_err("must fail");
1831        match err {
1832            ExpandError::DuplicateEntryId {
1833                id,
1834                first_source,
1835                second_source,
1836            } => {
1837                assert_eq!(id, expected_id);
1838                assert!(
1839                    first_source.contains(expected_first_substr),
1840                    "unexpected first source: {first_source}"
1841                );
1842                assert!(
1843                    second_source.contains(expected_second_substr),
1844                    "unexpected second source: {second_source}"
1845                );
1846            }
1847            other => panic!("wrong error variant: {other:?}"),
1848        }
1849    }
1850
1851    #[test]
1852    fn duplicate_entry_id_error_preserves_both_sources() {
1853        // The diagnostic must name both contributors so users can locate
1854        // each side of the collision. Parser-level id validation rejects
1855        // `.` and `#` in user ids, so the only reachable collisions travel
1856        // between inline ids and synthesized pack-entry ids; both sources
1857        // appear in the error regardless of document order.
1858        //
1859        // The pack entry here sits at index 1, so its auto-id is
1860        // `telegraf_snmp_interface_1`; the inline entry claims that id
1861        // first.
1862        let yaml = r#"
1863version: 2
1864defaults: { rate: 1 }
1865scenarios:
1866  - id: telegraf_snmp_interface_1
1867    signal_type: metrics
1868    name: cpu
1869    generator: { type: constant, value: 1 }
1870  - signal_type: metrics
1871    pack: telegraf_snmp_interface
1872"#;
1873        let mut resolver = InMemoryPackResolver::new();
1874        resolver.insert("telegraf_snmp_interface", telegraf_pack());
1875        let parsed = parse(yaml).expect("parse");
1876        let normalized = normalize(parsed).expect("normalize");
1877        let err = expand(normalized, &resolver).expect_err("must fail");
1878        let rendered = err.to_string();
1879        assert!(
1880            rendered.contains("'telegraf_snmp_interface_1'"),
1881            "error must name the colliding id: {rendered}"
1882        );
1883        assert!(
1884            rendered.contains("inline entry"),
1885            "error must name the inline source: {rendered}"
1886        );
1887        assert!(
1888            rendered.contains("auto-generated"),
1889            "error must name the auto-generated source: {rendered}"
1890        );
1891    }
1892
1893    // -----------------------------------------------------------------------
1894    // Pack by file path
1895    // -----------------------------------------------------------------------
1896
1897    #[test]
1898    fn pack_by_file_path_is_resolved_through_trait() {
1899        let yaml = r#"
1900version: 2
1901defaults: { rate: 1 }
1902scenarios:
1903  - signal_type: metrics
1904    pack: ./packs/telegraf-snmp-interface.yaml
1905"#;
1906        let mut resolver = InMemoryPackResolver::new();
1907        resolver.insert("./packs/telegraf-snmp-interface.yaml", telegraf_pack());
1908        let expanded = expand_yaml(yaml, &resolver);
1909        assert_eq!(expanded.entries.len(), 2);
1910    }
1911
1912    // -----------------------------------------------------------------------
1913    // Contract: Send + Sync on types crossing threads
1914    // -----------------------------------------------------------------------
1915
1916    #[test]
1917    fn expanded_file_is_send_and_sync() {
1918        fn assert_send_sync<T: Send + Sync>() {}
1919        assert_send_sync::<ExpandedFile>();
1920        assert_send_sync::<ExpandedEntry>();
1921        assert_send_sync::<ExpandError>();
1922    }
1923}