Skip to main content

sonda_core/compiler/
compile_after.rs

1//! `after` clause compilation and clock-group assignment (Phases 4 & 5).
2//!
3//! This module takes an [`ExpandedFile`] (the output of
4//! [`super::expand::expand`]) and produces a [`CompiledFile`] where every
5//! `after:` clause has been resolved into a concrete `phase_offset` and
6//! every signal participating in a dependency chain has been assigned a
7//! deterministic `clock_group`. The runtime never sees [`AfterClause`]
8//! objects — by the time the runtime receives entries, causal ordering is
9//! expressed purely through `phase_offset` + `clock_group`.
10//!
11//! # Pipeline position
12//!
13//! ```text
14//! parse (§4.1) → normalize (§4.2) → expand (§4.3) → compile_after (§4.4 + §4.5) → runtime
15//! ```
16//!
17//! The type itself witnesses the transition: [`CompiledEntry`] drops
18//! [`AfterClause`] from its shape, exposing only `phase_offset: Option<String>`
19//! and `clock_group: Option<String>`. If you have a `CompiledEntry`, all
20//! `after` clauses in the file were resolvable.
21//!
22//! # Reference resolution (§3.2)
23//!
24//! The pass builds a flat `BTreeMap<String, &ExpandedEntry>` keyed on
25//! [`ExpandedEntry::id`]. Inline entries with `id: None` are still valid
26//! signals — they just cannot be referenced by any `after.ref`. Pack
27//! sub-signals are addressable as `{entry}.{metric}` (for unique-by-name
28//! packs) or `{entry}.{metric}#{spec_index}` for packs that ship multiple
29//! [`MetricSpec`][crate::packs::MetricSpec]s under the same metric name.
30//! When a user writes the bare `{entry}.{metric}` form against a
31//! duplicate-name pack the compiler emits
32//! [`CompileAfterError::AmbiguousSubSignalRef`] with the concrete
33//! candidates so they know which `#N` to pick.
34//!
35//! # Timing computation (§3.3)
36//!
37//! For each signal with `after: Some(_)`:
38//!
39//! 1. Lower any operational alias on the **target's** generator into its
40//!    core [`GeneratorConfig`] variant. The timing math in
41//!    [`super::timing`] operates exclusively on the desugared form, which
42//!    matches the runtime's view of the signal.
43//! 2. Dispatch on the core variant to the matching `*_crossing_secs`
44//!    function. Each generator has its own crossing formula; generators
45//!    with ambiguous or non-deterministic output (`sine`, `uniform`,
46//!    `csv_replay`) are rejected with [`CompileAfterError::UnsupportedGenerator`].
47//! 3. Propagate the computed crossing time to the dependent signal,
48//!    accumulating transitive offsets across the dependency chain.
49//!
50//! # Offset formula (§3.3, matrix 11.14)
51//!
52//! ```text
53//! total_secs = user_phase_offset_secs + Σ crossing_time_secs + Σ delay_secs
54//! ```
55//!
56//! The result is formatted back into a parseable duration string (e.g.
57//! `"162.308s"`) for storage on [`CompiledEntry::phase_offset`]. The string
58//! round-trips through
59//! [`crate::config::validate::parse_duration`] so downstream passes can
60//! treat it the same way they treat a user-supplied `phase_offset`.
61//!
62//! # Clock-group derivation (§4.5)
63//!
64//! The `after` dependency graph partitions signals into connected
65//! components (treating edges as undirected for grouping purposes).
66//! For every component with two or more members the pass assigns one
67//! clock group:
68//!
69//! - if no entry in the component has an explicit `clock_group`, it is
70//!   auto-assigned as `chain_{lowest_lex_entry_id}`;
71//! - if exactly one distinct non-empty value is present, that value
72//!   becomes the group for the whole component;
73//! - if two distinct values are present, the pass emits
74//!   [`CompileAfterError::ConflictingClockGroup`] naming both values and
75//!   the offending entries (matrix row 11.16).
76//!
77//! Single-entry components (signals with no `after` and no dependents)
78//! keep their explicit `clock_group` if set, otherwise stay `None`.
79//!
80//! # Cycle detection (§3.4, matrix row 10.6)
81//!
82//! A Kahn topological sort on the directed dependency graph yields the
83//! resolution order. When the sort's output covers fewer entries than the
84//! graph, a recursive DFS with white/gray/black coloring reconstructs the
85//! cycle path (e.g. `["A", "B", "C", "A"]`) and surfaces it via
86//! [`CompileAfterError::CircularDependency`]. Back-edge detection is
87//! driven by `color[dep] == Gray`; the path-reconstruction vector records
88//! the current ancestor chain so the cycle can be sliced out directly.
89//!
90//! # Cross-signal-type support (§3.5, matrix row 11.11)
91//!
92//! A dependent signal can be any `signal_type` — metrics, logs, histogram,
93//! or summary — but the **target** must be a metrics signal with a
94//! deterministic generator. Crossing math requires inverting a generator's
95//! analytical form, which the non-metric signal types do not have; the
96//! pass rejects such targets with
97//! [`CompileAfterError::NonMetricsTarget`].
98//!
99//! # Pack references
100//!
101//! Pack entries are not themselves referenceable — the expand pass does
102//! not emit an [`ExpandedEntry`] whose `id` matches the bare pack entry
103//! id (e.g. `B`). Only the individual sub-signals materialize as
104//! addressable entries, using the dotted form `{entry}.{metric}` (and
105//! `{entry}.{metric}#{spec_index}` for duplicate-name packs). Writing
106//! `after.ref: B` against a pack entry therefore fails with
107//! [`CompileAfterError::UnknownRef`]; the `available` list in the
108//! diagnostic shows the valid dotted ids. To attach `after:` to the whole
109//! pack, set it on the pack entry itself — the expand pass propagates it
110//! to every sub-signal — or use a specific dotted metric path.
111//!
112//! # Clock-group string equality
113//!
114//! Clock-group comparisons use exact string equality after filtering out
115//! empty strings: `Some("")` is treated as "no explicit value" and
116//! participates in auto-naming, while `Some("x")` and `Some("x ")` are
117//! considered distinct (trailing whitespace is significant). Mixing a
118//! blank and a concrete value inside one component resolves to the
119//! concrete value without error; mixing two different non-empty values
120//! (including whitespace variants) triggers
121//! [`CompileAfterError::ConflictingClockGroup`].
122
123use std::collections::{BTreeMap, VecDeque};
124
125use super::expand::{ExpandedEntry, ExpandedFile};
126use super::timing::{
127    self, constant_crossing_secs, csv_replay_crossing_secs, sawtooth_crossing_secs,
128    sequence_crossing_secs, sine_crossing_secs, spike_crossing_secs, step_crossing_secs,
129    uniform_crossing_secs, Operator, TimingError,
130};
131use super::{AfterOp, ClauseKind, DelayClause, WhileClause};
132use crate::config::validate::parse_duration;
133use crate::config::{
134    BurstConfig, CardinalitySpikeConfig, DistributionConfig, DynamicLabelConfig, GapConfig,
135    OnSinkError,
136};
137use crate::encoder::EncoderConfig;
138use crate::generator::{GeneratorConfig, LogGeneratorConfig};
139use crate::sink::SinkConfig;
140
141// ---------------------------------------------------------------------------
142// Error type
143// ---------------------------------------------------------------------------
144
145/// Errors produced by the `after` clause compilation pass.
146///
147/// Every variant captures enough context to identify the offending entry
148/// without re-reading the source YAML. Variants map one-to-one onto the
149/// spec §3.4 validation table so diagnostics stay aligned with the
150/// published error messages.
151#[derive(Debug, thiserror::Error)]
152#[non_exhaustive]
153pub enum CompileAfterError {
154    /// A clause's `ref` pointed to a signal id that does not exist in the
155    /// expanded file.
156    ///
157    /// The `available` list contains every known signal id (sorted) so the
158    /// user can spot the typo or missing entry quickly.
159    #[error(
160        "entry '{source_id}': {clause}.ref '{ref_id}' does not match any signal id in this file. \
161         Available ids: [{available}]"
162    )]
163    UnknownRef {
164        source_id: String,
165        ref_id: String,
166        clause: ClauseKind,
167        available: String,
168    },
169
170    /// An `after.ref` used the bare `{entry}.{metric}` form against a
171    /// duplicate-name pack metric. The user must pick one of the
172    /// `#{spec_index}` variants.
173    #[error(
174        "after.ref '{ref_id}' is ambiguous: pack '{pack_entry_id}' ships multiple specs with \
175         this metric name. Use one of: [{candidates}]"
176    )]
177    AmbiguousSubSignalRef {
178        /// The ambiguous reference as written.
179        ref_id: String,
180        /// The pack entry id that produced the colliding sub-signals.
181        pack_entry_id: String,
182        /// Comma-separated list of disambiguated sub-signal ids.
183        candidates: String,
184    },
185
186    /// An entry's clause `ref` pointed to its own id.
187    #[error("entry '{source_id}': {clause}.ref references itself")]
188    SelfReference {
189        source_id: String,
190        clause: ClauseKind,
191    },
192
193    /// The dependency graph contains a cycle.
194    ///
195    /// `cycle` is a path of `(entry id, edge label)` pairs starting and
196    /// ending at the same vertex; the label on each pair tags the outgoing
197    /// edge from that vertex. The final pair carries the same label as the
198    /// edge that closes the cycle, so `(a, After) -> (b, While) -> (a, After)`
199    /// renders as `a --[after]--> b --[while]--> a`. Pure-`after:` cycles
200    /// preserve the existing `a -> b -> a` short form.
201    #[error("{}", format_cycle(.cycle))]
202    CircularDependency { cycle: Vec<(String, ClauseKind)> },
203
204    /// The target of an `after.ref` uses a generator that does not support
205    /// the requested operator.
206    #[error(
207        "entry '{source_id}': after.ref '{ref_id}' uses generator '{generator}' which does \
208         not support {op} threshold crossings: {reason}"
209    )]
210    UnsupportedGenerator {
211        /// The dependent entry.
212        source_id: String,
213        /// The referenced target.
214        ref_id: String,
215        /// The target's generator type (as the serde tag, e.g. `"sine"`).
216        generator: String,
217        /// The operator from the after clause.
218        op: String,
219        /// Diagnostic detail from the timing-math layer.
220        reason: String,
221    },
222
223    /// The after clause threshold is outside the target signal's output
224    /// range — the crossing will never happen.
225    #[error("entry '{source_id}': after.ref '{ref_id}' op '{op}' value {value} -- {reason}")]
226    OutOfRangeThreshold {
227        /// The dependent entry.
228        source_id: String,
229        /// The referenced target.
230        ref_id: String,
231        /// The operator from the after clause.
232        op: String,
233        /// The threshold value from the after clause.
234        value: f64,
235        /// Diagnostic detail from the timing-math layer.
236        reason: String,
237    },
238
239    /// The crossing condition is already satisfied at `t=0`; the crossing
240    /// time is ambiguous.
241    #[error(
242        "entry '{source_id}': after.ref '{ref_id}' op '{op}' value {value} -- condition is \
243         true at t=0, timing is ambiguous: {reason}"
244    )]
245    AmbiguousAtT0 {
246        /// The dependent entry.
247        source_id: String,
248        /// The referenced target.
249        ref_id: String,
250        /// The operator from the after clause.
251        op: String,
252        /// The threshold value from the after clause.
253        value: f64,
254        /// Diagnostic detail from the timing-math layer.
255        reason: String,
256    },
257
258    /// Two entries in the same dependency chain have different explicit
259    /// `clock_group` values.
260    #[error(
261        "conflicting clock_group in dependency chain: entry '{first_entry}' has \
262         clock_group '{first_group}', entry '{second_entry}' has clock_group '{second_group}'"
263    )]
264    ConflictingClockGroup {
265        /// First entry whose clock_group participates in the conflict.
266        first_entry: String,
267        /// The first clock_group value.
268        first_group: String,
269        /// Second entry whose clock_group differs from the first.
270        second_entry: String,
271        /// The conflicting clock_group value.
272        second_group: String,
273    },
274
275    /// The target of a clause `ref` is not a metrics signal.
276    ///
277    /// Cross-signal-type clauses (spec §3.5) allow the **dependent** to be
278    /// any type, but the **target** must be metrics so the compiler can
279    /// invert its analytical model for crossing math (and so a `while:`
280    /// gate has a continuous numeric value to compare).
281    #[error(
282        "entry '{source_id}': {clause}.ref '{ref_id}' resolves to a {target_signal} signal; \
283         only metrics signals can be `{clause}` targets"
284    )]
285    NonMetricsTarget {
286        source_id: String,
287        ref_id: String,
288        clause: ClauseKind,
289        target_signal: String,
290    },
291
292    /// A duration string on `after.delay`, the entry's `phase_offset`, or
293    /// an alias parameter (e.g. `flap.up_duration`) was not parseable.
294    #[error("entry '{source_id}': invalid duration '{input}' in {field}: {reason}")]
295    InvalidDuration {
296        /// The entry whose duration field failed to parse.
297        source_id: String,
298        /// Which field carried the bad value (`"after.delay"`, `"phase_offset"`, etc.).
299        field: &'static str,
300        /// The offending string as written.
301        input: String,
302        /// The underlying parse error message.
303        reason: String,
304    },
305
306    #[error(
307        "entry '{source_id}': `while:` cannot reference '{ref_id}' — it emits a literal NaN \
308         ({nan}); strict comparisons against NaN never hold and would leave the scenario \
309         permanently paused"
310    )]
311    WhileNanSource {
312        source_id: String,
313        ref_id: String,
314        nan: NanSource,
315    },
316
317    #[error(
318        "entry '{source_id}': `while:` cannot reference '{ref_id}' — generator \
319         '{generator_kind}' is data-dependent; only analytical generators are supported as \
320         `while:` upstreams"
321    )]
322    WhileUnsupportedUpstreamGenerator {
323        source_id: String,
324        ref_id: String,
325        generator_kind: &'static str,
326    },
327}
328
329/// Where in a generator config a literal NaN was found.
330#[derive(Debug, Clone, PartialEq)]
331#[non_exhaustive]
332pub enum NanSource {
333    Constant,
334    SequenceValue {
335        index: usize,
336    },
337    CsvCell {
338        path: String,
339        row: usize,
340        column: usize,
341    },
342}
343
344impl std::fmt::Display for NanSource {
345    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
346        match self {
347            NanSource::Constant => f.write_str("constant.value: NaN"),
348            NanSource::SequenceValue { index } => {
349                write!(f, "sequence.values[{index}]: NaN")
350            }
351            NanSource::CsvCell { path, row, column } => {
352                write!(f, "csv_replay file '{path}' row {row} column {column}: NaN")
353            }
354        }
355    }
356}
357
358fn format_cycle(cycle: &[(String, ClauseKind)]) -> String {
359    if cycle.is_empty() {
360        return "circular dependency detected: <unknown cycle>".to_string();
361    }
362    let edge_kinds = &cycle[..cycle.len().saturating_sub(1)];
363    let any_while = edge_kinds.iter().any(|(_, k)| *k == ClauseKind::While);
364    if !any_while {
365        let names: Vec<&str> = cycle.iter().map(|(name, _)| name.as_str()).collect();
366        return format!("circular dependency detected: {}", names.join(" -> "));
367    }
368    let mut out = String::from("circular dependency detected: ");
369    for (i, (name, kind)) in cycle.iter().enumerate() {
370        out.push_str(name);
371        if i + 1 < cycle.len() {
372            use std::fmt::Write;
373            let _ = write!(out, " --[{kind}]--> ");
374        }
375    }
376    out
377}
378
379// ---------------------------------------------------------------------------
380// Compiled representation
381// ---------------------------------------------------------------------------
382
383/// A v2 scenario file with every `after:` clause resolved.
384///
385/// Mirrors [`ExpandedFile`] shape-for-shape, replacing [`ExpandedEntry`]
386/// with [`CompiledEntry`]. The type witnesses that all reference
387/// resolution, timing math, and clock-group assignment have completed
388/// successfully.
389#[derive(Debug, Clone)]
390#[cfg_attr(feature = "config", derive(serde::Serialize))]
391pub struct CompiledFile {
392    /// Schema version. Always `2` after compilation.
393    pub version: u32,
394    /// File-level `scenario_name` carried verbatim. Pure metadata —
395    /// ignored by every compiler phase, surfaced for runtime conflict checks.
396    #[cfg_attr(feature = "config", serde(skip_serializing_if = "Option::is_none"))]
397    pub scenario_name: Option<String>,
398    /// Concrete scenario entries, in source order. Pack-expanded sub-signals
399    /// appear consecutively as their parent entry was processed.
400    pub entries: Vec<CompiledEntry>,
401}
402
403/// A single scenario entry with `after:` resolved and `clock_group`
404/// finalized.
405///
406/// The `after: Option<AfterClause>` field from [`ExpandedEntry`] is gone
407/// — the causal information it carried has been folded into
408/// [`Self::phase_offset`] (computed crossing time plus any user-provided
409/// offset plus the optional `delay`) and [`Self::clock_group`] (either the
410/// user's explicit value or an auto-assigned `chain_{lowest_lex_id}`).
411///
412/// All other fields are copied verbatim from [`ExpandedEntry`]; this is a
413/// pure enrichment pass, not a structural rewrite.
414#[derive(Debug, Clone)]
415#[cfg_attr(feature = "config", derive(serde::Serialize))]
416pub struct CompiledEntry {
417    /// Signal identifier, identical to [`ExpandedEntry::id`].
418    pub id: Option<String>,
419    /// Signal type: `"metrics"`, `"logs"`, `"histogram"`, or `"summary"`.
420    pub signal_type: String,
421    /// Metric or scenario name.
422    pub name: String,
423    /// Event rate in events per second.
424    pub rate: f64,
425    /// Total run duration (e.g. `"30s"`, `"5m"`).
426    pub duration: Option<String>,
427    /// Value generator configuration (metrics signals only).
428    pub generator: Option<GeneratorConfig>,
429    /// Log generator configuration (logs signals only).
430    pub log_generator: Option<LogGeneratorConfig>,
431    /// Static labels, already composed through the full precedence chain.
432    pub labels: Option<BTreeMap<String, String>>,
433    /// Dynamic (rotating) label configurations.
434    pub dynamic_labels: Option<Vec<DynamicLabelConfig>>,
435    /// Encoder configuration.
436    pub encoder: EncoderConfig,
437    /// Sink configuration.
438    pub sink: SinkConfig,
439    /// Jitter amplitude applied to generated values.
440    pub jitter: Option<f64>,
441    /// Deterministic seed for jitter RNG.
442    pub jitter_seed: Option<u64>,
443    /// Recurring silent-period configuration.
444    pub gaps: Option<GapConfig>,
445    /// Recurring high-rate burst configuration.
446    pub bursts: Option<BurstConfig>,
447    /// Cardinality spike configurations.
448    pub cardinality_spikes: Option<Vec<CardinalitySpikeConfig>>,
449    /// Phase offset. Equals `user_phase_offset + Σ crossing_time + Σ delay`
450    /// when the entry participated in an `after:` chain; otherwise the
451    /// user's original value (or `None`).
452    pub phase_offset: Option<String>,
453    /// Clock group — either the user's explicit value, or an
454    /// auto-assigned `chain_{lowest_lex_id}` for every member of a
455    /// dependency chain with no explicit group.
456    pub clock_group: Option<String>,
457    /// Provenance of [`Self::clock_group`].
458    ///
459    /// `true` exactly when the compiler synthesized the
460    /// `chain_{lowest_lex_id}` name for a multi-node `after:` component
461    /// that had no user-supplied value. `false` when the value was
462    /// adopted from an explicit user assignment (including explicit
463    /// values that happen to start with `chain_`). Always `false` when
464    /// [`Self::clock_group`] is `None`.
465    ///
466    /// Downstream display code uses this to decide whether to suffix the
467    /// rendered value with `(auto)`. The `chain_` prefix alone is not a
468    /// reliable proxy because users are free to write
469    /// `clock_group: chain_alpha` themselves.
470    pub clock_group_is_auto: bool,
471
472    // -- Histogram / summary fields (inline entries only) --
473    /// Distribution model for histogram or summary observations.
474    pub distribution: Option<DistributionConfig>,
475    /// Histogram bucket boundaries (histogram only).
476    pub buckets: Option<Vec<f64>>,
477    /// Summary quantile boundaries (summary only).
478    pub quantiles: Option<Vec<f64>>,
479    /// Number of observations sampled per tick.
480    pub observations_per_tick: Option<u32>,
481    /// Linear drift applied to the distribution mean each second.
482    pub mean_shift_per_sec: Option<f64>,
483    /// Deterministic seed for histogram/summary sampling.
484    pub seed: Option<u64>,
485    /// Resolved sink-error policy.
486    pub on_sink_error: OnSinkError,
487    /// Continuous lifecycle gate. Does not contribute to `phase_offset`.
488    #[cfg_attr(feature = "config", serde(skip_serializing_if = "Option::is_none"))]
489    pub while_clause: Option<WhileClause>,
490    #[cfg_attr(feature = "config", serde(skip_serializing_if = "Option::is_none"))]
491    pub delay_clause: Option<DelayClause>,
492    /// Upstream id this entry's `after:` resolved against, when one was
493    /// present. Folded into `phase_offset` for runtime; preserved here so
494    /// `--dry-run` can label it on entries that also carry a `while:` against
495    /// a different upstream.
496    #[cfg_attr(feature = "config", serde(skip_serializing_if = "Option::is_none"))]
497    pub after_ref: Option<String>,
498}
499
500// ---------------------------------------------------------------------------
501// Public API
502// ---------------------------------------------------------------------------
503
504/// Compile every `after:` clause in an expanded v2 scenario file.
505///
506/// The returned [`CompiledFile`] contains one [`CompiledEntry`] per input
507/// entry with `after:` resolved into `phase_offset`, and a deterministic
508/// `clock_group` assigned to every signal participating in a dependency
509/// chain. Entries without `after:` pass through unchanged.
510///
511/// # Behavior
512///
513/// - **Reference index.** A flat map keyed on [`ExpandedEntry::id`] covers
514///   inline entries and pack sub-signals alike. Bare `{entry}.{metric}`
515///   references against duplicate-name packs raise
516///   [`CompileAfterError::AmbiguousSubSignalRef`].
517/// - **Crossing math.** Aliases (`flap`, `saturation`, `leak`,
518///   `degradation`, `spike_event`, `steady`) are desugared on the target
519///   before dispatching to the matching `timing::*_crossing_secs` routine.
520/// - **Transitive accumulation.** Kahn's topological sort orders entries
521///   so that `phase_offset` for a dependent signal includes its target's
522///   already-resolved offset.
523/// - **Clock-group assignment.** Signals linked by `after:` are grouped
524///   into connected components. If any component member carries an
525///   explicit `clock_group`, that value is used; otherwise the group is
526///   named `chain_{lowest_lex_id}`.
527///
528/// # Errors
529///
530/// Every variant of [`CompileAfterError`] is reachable; see the type-level
531/// documentation for the one-to-one mapping onto spec §3.4 validation
532/// conditions.
533pub fn compile_after(file: ExpandedFile) -> Result<CompiledFile, CompileAfterError> {
534    let ExpandedFile {
535        version,
536        scenario_name,
537        entries,
538    } = file;
539
540    let id_to_idx = build_id_index(&entries);
541
542    for entry in &entries {
543        let source_id = source_label(entry);
544        for (ref_id, clause) in outgoing_edges(entry) {
545            resolve_reference(ref_id, &id_to_idx, &source_id, clause)?;
546            if entry.id.as_deref() == Some(ref_id) {
547                return Err(CompileAfterError::SelfReference {
548                    source_id: source_id.clone().into_owned(),
549                    clause,
550                });
551            }
552        }
553    }
554
555    let n = entries.len();
556    let mut in_degree = vec![0u32; n];
557    let mut dependents: Vec<Vec<(usize, ClauseKind)>> = vec![Vec::new(); n];
558    for (i, entry) in entries.iter().enumerate() {
559        for (ref_id, clause) in outgoing_edges(entry) {
560            let dep_idx = id_to_idx[ref_id];
561            in_degree[i] += 1;
562            dependents[dep_idx].push((i, clause));
563        }
564    }
565
566    let mut queue: VecDeque<usize> = (0..n).filter(|&i| in_degree[i] == 0).collect();
567    let mut sorted: Vec<usize> = Vec::with_capacity(n);
568    while let Some(idx) = queue.pop_front() {
569        sorted.push(idx);
570        for &(dependent, _) in &dependents[idx] {
571            in_degree[dependent] -= 1;
572            if in_degree[dependent] == 0 {
573                queue.push_back(dependent);
574            }
575        }
576    }
577    if sorted.len() < n {
578        let cycle = find_cycle(&entries, &id_to_idx);
579        return Err(CompileAfterError::CircularDependency { cycle });
580    }
581
582    for entry in &entries {
583        let Some(clause) = &entry.while_clause else {
584            continue;
585        };
586        let source_id = source_label(entry).into_owned();
587        let dep_idx = id_to_idx[clause.ref_id.as_str()];
588        let target = &entries[dep_idx];
589
590        if target.signal_type != "metrics" {
591            return Err(CompileAfterError::NonMetricsTarget {
592                source_id,
593                ref_id: clause.ref_id.clone(),
594                clause: ClauseKind::While,
595                target_signal: target.signal_type.clone(),
596            });
597        }
598        if let Some(generator) = target.generator.as_ref() {
599            if !is_supported_while_upstream(generator) {
600                return Err(CompileAfterError::WhileUnsupportedUpstreamGenerator {
601                    source_id,
602                    ref_id: clause.ref_id.clone(),
603                    generator_kind: generator_kind(generator),
604                });
605            }
606            if let Some(nan) = detect_nan_source(generator) {
607                return Err(CompileAfterError::WhileNanSource {
608                    source_id,
609                    ref_id: clause.ref_id.clone(),
610                    nan,
611                });
612            }
613        }
614    }
615
616    let mut total_offsets = vec![0.0_f64; n];
617    let mut base_offsets = vec![0.0_f64; n];
618
619    for (i, entry) in entries.iter().enumerate() {
620        if let Some(s) = entry.phase_offset.as_deref() {
621            base_offsets[i] = parse_duration_secs(s, &source_label(entry), "phase_offset")?;
622        }
623    }
624
625    for &idx in &sorted {
626        let entry = &entries[idx];
627        let Some(clause) = &entry.after else {
628            total_offsets[idx] = base_offsets[idx];
629            continue;
630        };
631
632        let source_id = source_label(entry).into_owned();
633        let dep_idx = id_to_idx[clause.ref_id.as_str()];
634        let target = &entries[dep_idx];
635
636        if target.signal_type != "metrics" {
637            return Err(CompileAfterError::NonMetricsTarget {
638                source_id,
639                ref_id: clause.ref_id.clone(),
640                clause: ClauseKind::After,
641                target_signal: target.signal_type.clone(),
642            });
643        }
644
645        let generator = target.generator.as_ref().unwrap_or_else(|| {
646            unreachable!(
647                "metrics target '{ref_id}' has no generator — parser and expand \
648                 pass both guarantee metrics entries always carry one",
649                ref_id = clause.ref_id
650            )
651        });
652
653        let op = operator_from(&clause.op);
654        let crossing = crossing_secs(generator, op, clause.value, target.rate).map_err(|err| {
655            timing_to_error(err, &source_id, &clause.ref_id, generator, op, clause.value)
656        })?;
657
658        let delay = match clause.delay.as_deref() {
659            Some(s) => parse_duration_secs(s, &source_id, "after.delay")?,
660            None => 0.0,
661        };
662
663        total_offsets[idx] = base_offsets[idx] + total_offsets[dep_idx] + crossing + delay;
664    }
665
666    let clock_groups = assign_clock_groups(&entries, &id_to_idx)?;
667
668    let mut out: Vec<CompiledEntry> = Vec::with_capacity(n);
669    for (i, entry) in entries.into_iter().enumerate() {
670        let phase_offset = if entry.after.is_some() || total_offsets[i] != 0.0 {
671            Some(format_duration_secs(total_offsets[i]))
672        } else {
673            entry.phase_offset.clone()
674        };
675
676        let (clock_group, clock_group_is_auto) = match &clock_groups[i] {
677            ClockGroupAssignment::Resolved { name, is_auto } => (Some(name.clone()), *is_auto),
678            ClockGroupAssignment::Unassigned => (entry.clock_group.clone(), false),
679        };
680
681        let after_ref = entry.after.as_ref().map(|c| c.ref_id.clone());
682
683        out.push(CompiledEntry {
684            id: entry.id,
685            signal_type: entry.signal_type,
686            name: entry.name,
687            rate: entry.rate,
688            duration: entry.duration,
689            generator: entry.generator,
690            log_generator: entry.log_generator,
691            labels: entry.labels,
692            dynamic_labels: entry.dynamic_labels,
693            encoder: entry.encoder,
694            sink: entry.sink,
695            jitter: entry.jitter,
696            jitter_seed: entry.jitter_seed,
697            gaps: entry.gaps,
698            bursts: entry.bursts,
699            cardinality_spikes: entry.cardinality_spikes,
700            phase_offset,
701            clock_group,
702            clock_group_is_auto,
703            distribution: entry.distribution,
704            buckets: entry.buckets,
705            quantiles: entry.quantiles,
706            observations_per_tick: entry.observations_per_tick,
707            mean_shift_per_sec: entry.mean_shift_per_sec,
708            on_sink_error: entry.on_sink_error,
709            seed: entry.seed,
710            while_clause: entry.while_clause,
711            delay_clause: entry.delay_clause,
712            after_ref,
713        });
714    }
715
716    Ok(CompiledFile {
717        version,
718        scenario_name,
719        entries: out,
720    })
721}
722
723/// Detect a literal NaN in a generator config. Returns `None` for analytical
724/// generators (sine, sawtooth, etc.) — runtime evaluation handles those.
725fn detect_nan_source(generator: &GeneratorConfig) -> Option<NanSource> {
726    match generator {
727        GeneratorConfig::Constant { value } if value.is_nan() => Some(NanSource::Constant),
728        GeneratorConfig::Sequence { values, .. } => values
729            .iter()
730            .position(|v| v.is_nan())
731            .map(|index| NanSource::SequenceValue { index }),
732        GeneratorConfig::CsvReplay { file, column, .. } => {
733            scan_csv_for_nan(file, column.unwrap_or(0))
734        }
735        _ => None,
736    }
737}
738
739fn scan_csv_for_nan(path: &str, column: usize) -> Option<NanSource> {
740    let contents = std::fs::read_to_string(path).ok()?;
741    for (row, line) in contents.lines().enumerate() {
742        let trimmed = line.trim();
743        if trimmed.is_empty() || trimmed.starts_with('#') {
744            continue;
745        }
746        let mut cells = trimmed.split(',');
747        let cell = cells.nth(column)?.trim();
748        if is_literal_nan(cell) {
749            return Some(NanSource::CsvCell {
750                path: path.to_string(),
751                row,
752                column,
753            });
754        }
755    }
756    None
757}
758
759fn is_literal_nan(cell: &str) -> bool {
760    matches!(cell.to_ascii_lowercase().as_str(), "nan" | "+nan" | "-nan")
761}
762
763// ---------------------------------------------------------------------------
764// Reference index + resolution
765// ---------------------------------------------------------------------------
766
767/// Build a map from signal id to its index in the entries list.
768///
769/// Only entries with `Some(id)` are included. [`ExpandedEntry::id`]
770/// uniqueness is enforced upstream by [`super::expand::expand`], so
771/// duplicate inserts cannot occur here.
772fn build_id_index(entries: &[ExpandedEntry]) -> BTreeMap<&str, usize> {
773    let mut idx = BTreeMap::new();
774    for (i, entry) in entries.iter().enumerate() {
775        if let Some(id) = entry.id.as_deref() {
776            idx.insert(id, i);
777        }
778    }
779    idx
780}
781
782/// Resolve a clause `ref` against the reference index, producing a
783/// precise diagnostic for unknown or ambiguous references.
784///
785/// Returns the resolved target index on success.
786fn resolve_reference(
787    ref_id: &str,
788    id_to_idx: &BTreeMap<&str, usize>,
789    source_id: &str,
790    clause: ClauseKind,
791) -> Result<usize, CompileAfterError> {
792    if let Some(&idx) = id_to_idx.get(ref_id) {
793        return Ok(idx);
794    }
795
796    let prefix = format!("{ref_id}#");
797    let candidates: Vec<&str> = id_to_idx
798        .keys()
799        .filter(|k| k.starts_with(&prefix))
800        .copied()
801        .collect();
802    if !candidates.is_empty() {
803        let pack_entry_id = ref_id
804            .rsplit_once('.')
805            .map(|(left, _)| left.to_string())
806            .unwrap_or_default();
807        return Err(CompileAfterError::AmbiguousSubSignalRef {
808            ref_id: ref_id.to_string(),
809            pack_entry_id,
810            candidates: candidates.join(", "),
811        });
812    }
813
814    let available: Vec<&str> = id_to_idx.keys().copied().collect();
815    Err(CompileAfterError::UnknownRef {
816        source_id: source_id.to_string(),
817        ref_id: ref_id.to_string(),
818        clause,
819        available: available.join(", "),
820    })
821}
822
823/// Yield every outgoing edge from `entry` as `(target_ref_id, edge_label)`.
824///
825/// Order is `after:` first, then `while:` — used by index building, cycle
826/// detection, and reference validation. Adding a new clause type (e.g.
827/// v2's `gated_by:`) extends this iterator and reaches every cycle-aware
828/// site automatically.
829fn outgoing_edges(entry: &ExpandedEntry) -> impl Iterator<Item = (&str, ClauseKind)> {
830    entry
831        .after
832        .as_ref()
833        .map(|c| (c.ref_id.as_str(), ClauseKind::After))
834        .into_iter()
835        .chain(
836            entry
837                .while_clause
838                .as_ref()
839                .map(|c| (c.ref_id.as_str(), ClauseKind::While)),
840        )
841}
842
843/// Format an entry into a human-readable label for error messages.
844///
845/// Priority: `id` → `name` → `<anonymous entry>`. Returns `Cow<str>` so
846/// the caller can avoid an allocation when the id is already available.
847fn source_label(entry: &ExpandedEntry) -> std::borrow::Cow<'_, str> {
848    if let Some(id) = entry.id.as_deref() {
849        std::borrow::Cow::Borrowed(id)
850    } else {
851        std::borrow::Cow::Owned(format!("<anonymous:{}>", entry.name))
852    }
853}
854
855// ---------------------------------------------------------------------------
856// Crossing time dispatch
857// ---------------------------------------------------------------------------
858
859/// Dispatch a [`GeneratorConfig`] to the matching `timing::*_crossing_secs`
860/// routine, desugaring operational aliases as needed.
861///
862/// The `rate` parameter is required by generators whose crossing time is
863/// expressed in ticks (`step`, `sequence`, and anything derived from a
864/// `Sequence` via the `flap` alias). Sawtooth / spike / sine variants
865/// already encode their periods in seconds and ignore the rate.
866fn crossing_secs(
867    generator: &GeneratorConfig,
868    op: Operator,
869    threshold: f64,
870    rate: f64,
871) -> Result<f64, TimingError> {
872    match generator {
873        GeneratorConfig::Constant { value } => constant_crossing_secs(op, threshold, *value),
874        GeneratorConfig::Uniform { .. } => uniform_crossing_secs(),
875        GeneratorConfig::Sine { .. } => sine_crossing_secs(),
876        GeneratorConfig::CsvReplay { .. } => csv_replay_crossing_secs(),
877        GeneratorConfig::Sawtooth {
878            min,
879            max,
880            period_secs,
881        } => sawtooth_crossing_secs(op, threshold, *min, *max, *period_secs),
882        GeneratorConfig::Sequence { values, repeat } => {
883            sequence_crossing_secs(op, threshold, values, *repeat, rate)
884        }
885        GeneratorConfig::Step {
886            start,
887            step_size,
888            max,
889        } => step_crossing_secs(op, threshold, start.unwrap_or(0.0), *step_size, *max, rate),
890        GeneratorConfig::Spike {
891            baseline,
892            magnitude,
893            duration_secs,
894            ..
895        } => spike_crossing_secs(op, threshold, *baseline, *magnitude, *duration_secs),
896
897        // --- Operational aliases (desugar before dispatch) ---
898        GeneratorConfig::Flap {
899            up_duration,
900            down_duration,
901            up_value,
902            down_value,
903            enum_kind,
904        } => {
905            let up_secs = duration_or_default(up_duration.as_deref(), 10.0, "flap.up_duration")?;
906            let down_secs =
907                duration_or_default(down_duration.as_deref(), 5.0, "flap.down_duration")?;
908            let (up_default, down_default) = enum_kind.map(|e| e.defaults()).unwrap_or((1.0, 0.0));
909            let up_val = up_value.unwrap_or(up_default);
910            let down_val = down_value.unwrap_or(down_default);
911            timing::flap_crossing_secs(op, threshold, up_secs, down_secs, up_val, down_val)
912        }
913        GeneratorConfig::Saturation {
914            baseline,
915            ceiling,
916            time_to_saturate,
917        } => {
918            let bl = baseline.unwrap_or(0.0);
919            let cl = ceiling.unwrap_or(100.0);
920            let period = duration_or_default(
921                time_to_saturate.as_deref(),
922                5.0 * 60.0,
923                "saturation.time_to_saturate",
924            )?;
925            sawtooth_crossing_secs(op, threshold, bl, cl, period)
926        }
927        GeneratorConfig::Leak {
928            baseline,
929            ceiling,
930            time_to_ceiling,
931        } => {
932            let bl = baseline.unwrap_or(0.0);
933            let cl = ceiling.unwrap_or(100.0);
934            let period = duration_or_default(
935                time_to_ceiling.as_deref(),
936                10.0 * 60.0,
937                "leak.time_to_ceiling",
938            )?;
939            sawtooth_crossing_secs(op, threshold, bl, cl, period)
940        }
941        GeneratorConfig::Degradation {
942            baseline,
943            ceiling,
944            time_to_degrade,
945            ..
946        } => {
947            let bl = baseline.unwrap_or(0.0);
948            let cl = ceiling.unwrap_or(100.0);
949            let period = duration_or_default(
950                time_to_degrade.as_deref(),
951                5.0 * 60.0,
952                "degradation.time_to_degrade",
953            )?;
954            sawtooth_crossing_secs(op, threshold, bl, cl, period)
955        }
956        GeneratorConfig::Steady { .. } => timing::steady_crossing_secs(),
957        GeneratorConfig::SpikeEvent {
958            baseline,
959            spike_height,
960            spike_duration,
961            ..
962        } => {
963            let bl = baseline.unwrap_or(0.0);
964            let height = spike_height.unwrap_or(100.0);
965            let dur = duration_or_default(
966                spike_duration.as_deref(),
967                10.0,
968                "spike_event.spike_duration",
969            )?;
970            spike_crossing_secs(op, threshold, bl, height, dur)
971        }
972    }
973}
974
975/// `while:` upstreams must be analytical; data-driven generators like
976/// `csv_replay` are rejected the same way they are for `after:`.
977fn is_supported_while_upstream(generator: &GeneratorConfig) -> bool {
978    !matches!(generator, GeneratorConfig::CsvReplay { .. })
979}
980
981/// Return the generator's serde tag as a `&'static str` for error messages.
982fn generator_kind(generator: &GeneratorConfig) -> &'static str {
983    match generator {
984        GeneratorConfig::Constant { .. } => "constant",
985        GeneratorConfig::Uniform { .. } => "uniform",
986        GeneratorConfig::Sine { .. } => "sine",
987        GeneratorConfig::Sawtooth { .. } => "sawtooth",
988        GeneratorConfig::Sequence { .. } => "sequence",
989        GeneratorConfig::Spike { .. } => "spike",
990        GeneratorConfig::CsvReplay { .. } => "csv_replay",
991        GeneratorConfig::Step { .. } => "step",
992        GeneratorConfig::Flap { .. } => "flap",
993        GeneratorConfig::Saturation { .. } => "saturation",
994        GeneratorConfig::Leak { .. } => "leak",
995        GeneratorConfig::Degradation { .. } => "degradation",
996        GeneratorConfig::Steady { .. } => "steady",
997        GeneratorConfig::SpikeEvent { .. } => "spike_event",
998    }
999}
1000
1001/// Convert a [`TimingError`] into the appropriate [`CompileAfterError`]
1002/// variant, preserving the underlying reason string.
1003fn timing_to_error(
1004    err: TimingError,
1005    source_id: &str,
1006    ref_id: &str,
1007    generator: &GeneratorConfig,
1008    op: Operator,
1009    value: f64,
1010) -> CompileAfterError {
1011    let op = op.to_string();
1012    match err {
1013        TimingError::Unsupported { message } => CompileAfterError::UnsupportedGenerator {
1014            source_id: source_id.to_string(),
1015            ref_id: ref_id.to_string(),
1016            generator: generator_kind(generator).to_string(),
1017            op,
1018            reason: message,
1019        },
1020        TimingError::OutOfRange { message } => CompileAfterError::OutOfRangeThreshold {
1021            source_id: source_id.to_string(),
1022            ref_id: ref_id.to_string(),
1023            op,
1024            value,
1025            reason: message,
1026        },
1027        TimingError::Ambiguous { message } => CompileAfterError::AmbiguousAtT0 {
1028            source_id: source_id.to_string(),
1029            ref_id: ref_id.to_string(),
1030            op,
1031            value,
1032            reason: message,
1033        },
1034        TimingError::InvalidDuration {
1035            field,
1036            input,
1037            reason,
1038        } => CompileAfterError::InvalidDuration {
1039            source_id: source_id.to_string(),
1040            field,
1041            input,
1042            reason,
1043        },
1044    }
1045}
1046
1047// ---------------------------------------------------------------------------
1048// Clock-group assignment
1049// ---------------------------------------------------------------------------
1050
1051/// Resolved clock-group assignment for one entry.
1052///
1053/// Carries the resolved group name alongside its provenance:
1054///
1055/// - `Resolved { name, is_auto: true }` — auto-named
1056///   `chain_{lowest_lex_id}` for a multi-node component with no
1057///   user-supplied value.
1058/// - `Resolved { name, is_auto: false }` — user-supplied value adopted
1059///   for the component (or, in the single-node fall-through path, the
1060///   entry's own explicit value).
1061/// - `Unassigned` — single-node component with no explicit value.
1062///
1063/// Kept private to the compiler module; downstream consumers see only
1064/// the resulting `(clock_group, clock_group_is_auto)` pair on
1065/// [`CompiledEntry`].
1066#[derive(Debug, Clone, PartialEq, Eq)]
1067enum ClockGroupAssignment {
1068    Resolved { name: String, is_auto: bool },
1069    Unassigned,
1070}
1071
1072/// Assign a clock group to every entry based on the `after:` dependency
1073/// graph (treated as undirected for component detection).
1074///
1075/// The returned vector is indexed in parallel with `entries`. Each slot
1076/// witnesses both the chosen value and whether the compiler auto-named
1077/// it. See [`ClockGroupAssignment`] for the variants.
1078///
1079/// # Errors
1080///
1081/// Returns [`CompileAfterError::ConflictingClockGroup`] when a component
1082/// has two distinct non-empty explicit group values.
1083fn assign_clock_groups(
1084    entries: &[ExpandedEntry],
1085    id_to_idx: &BTreeMap<&str, usize>,
1086) -> Result<Vec<ClockGroupAssignment>, CompileAfterError> {
1087    let n = entries.len();
1088
1089    // Build an undirected adjacency list.
1090    let mut adj: Vec<Vec<usize>> = vec![Vec::new(); n];
1091    for (i, entry) in entries.iter().enumerate() {
1092        if let Some(clause) = &entry.after {
1093            if let Some(&dep_idx) = id_to_idx.get(clause.ref_id.as_str()) {
1094                adj[i].push(dep_idx);
1095                adj[dep_idx].push(i);
1096            }
1097        }
1098    }
1099
1100    let mut component_id = vec![usize::MAX; n];
1101    let mut components: Vec<Vec<usize>> = Vec::new();
1102
1103    for start in 0..n {
1104        if component_id[start] != usize::MAX {
1105            continue;
1106        }
1107        let cid = components.len();
1108        let mut stack = vec![start];
1109        let mut members = Vec::new();
1110        while let Some(node) = stack.pop() {
1111            if component_id[node] != usize::MAX {
1112                continue;
1113            }
1114            component_id[node] = cid;
1115            members.push(node);
1116            for &next in &adj[node] {
1117                if component_id[next] == usize::MAX {
1118                    stack.push(next);
1119                }
1120            }
1121        }
1122        components.push(members);
1123    }
1124
1125    let mut out: Vec<ClockGroupAssignment> =
1126        (0..n).map(|_| ClockGroupAssignment::Unassigned).collect();
1127    for members in &components {
1128        if members.len() < 2 {
1129            continue;
1130        }
1131
1132        // Collect all distinct non-empty explicit clock_group values.
1133        let mut distinct: BTreeMap<&str, usize> = BTreeMap::new();
1134        for &idx in members {
1135            if let Some(cg) = entries[idx].clock_group.as_deref() {
1136                if !cg.is_empty() {
1137                    distinct.entry(cg).or_insert(idx);
1138                }
1139            }
1140        }
1141
1142        let (resolved, is_auto) = match distinct.len() {
1143            0 => (auto_chain_name(members, entries), true),
1144            1 => {
1145                let (&k, _) = distinct.iter().next().expect("len == 1");
1146                (k.to_string(), false)
1147            }
1148            _ => {
1149                let mut iter = distinct.iter();
1150                let (&first_group, &first_idx) = iter.next().expect("len >= 2");
1151                let (&second_group, &second_idx) = iter.next().expect("len >= 2");
1152                return Err(CompileAfterError::ConflictingClockGroup {
1153                    first_entry: source_label(&entries[first_idx]).into_owned(),
1154                    first_group: first_group.to_string(),
1155                    second_entry: source_label(&entries[second_idx]).into_owned(),
1156                    second_group: second_group.to_string(),
1157                });
1158            }
1159        };
1160
1161        for &idx in members {
1162            out[idx] = ClockGroupAssignment::Resolved {
1163                name: resolved.clone(),
1164                is_auto,
1165            };
1166        }
1167    }
1168
1169    Ok(out)
1170}
1171
1172/// Build a deterministic `chain_{lowest_lex_id}` name from a component's
1173/// member indices.
1174///
1175/// Every multi-member component reaches this helper via an `after` edge,
1176/// and `after.ref` can only target an entry that carries an `id` (that's
1177/// how reference resolution works in [`build_id_index`]). Therefore the
1178/// component always has at least one `id`-bearing member, and
1179/// [`Iterator::next`] on the sorted id list is guaranteed to be `Some`.
1180fn auto_chain_name(members: &[usize], entries: &[ExpandedEntry]) -> String {
1181    let mut ids: Vec<&str> = members
1182        .iter()
1183        .filter_map(|&i| entries[i].id.as_deref())
1184        .collect();
1185    ids.sort();
1186    let first = ids.first().unwrap_or_else(|| {
1187        unreachable!(
1188            "multi-entry component has no id-bearing member — `after.ref` \
1189             resolution guarantees every linked entry carries an id"
1190        )
1191    });
1192    format!("chain_{first}")
1193}
1194
1195// ---------------------------------------------------------------------------
1196// Cycle detection
1197// ---------------------------------------------------------------------------
1198
1199/// Find a cycle in the directed dependency graph for error reporting.
1200///
1201/// Walks `outgoing_edges` (both `after:` and `while:`) so cycles closed by
1202/// a `while:` edge surface with the right edge labels. Each pair in the
1203/// returned vector is `(node id, kind of edge OUT of this node toward the
1204/// next pair)`. The first and last node ids match (the cycle close). The
1205/// trailing pair's label echoes the closing edge's kind.
1206fn find_cycle(
1207    entries: &[ExpandedEntry],
1208    id_to_idx: &BTreeMap<&str, usize>,
1209) -> Vec<(String, ClauseKind)> {
1210    #[derive(Clone, Copy, PartialEq, Eq)]
1211    enum Color {
1212        White,
1213        Gray,
1214        Black,
1215    }
1216
1217    let n = entries.len();
1218    let mut color = vec![Color::White; n];
1219    let mut path: Vec<(usize, ClauseKind)> = Vec::new();
1220
1221    fn dfs(
1222        node: usize,
1223        entries: &[ExpandedEntry],
1224        id_to_idx: &BTreeMap<&str, usize>,
1225        color: &mut [Color],
1226        path: &mut Vec<(usize, ClauseKind)>,
1227    ) -> Option<Vec<(usize, ClauseKind)>> {
1228        color[node] = Color::Gray;
1229        path.push((node, ClauseKind::After));
1230
1231        for (ref_id, clause) in outgoing_edges(&entries[node]) {
1232            let Some(&dep) = id_to_idx.get(ref_id) else {
1233                continue;
1234            };
1235            if let Some(last) = path.last_mut() {
1236                last.1 = clause;
1237            }
1238            match color[dep] {
1239                Color::White => {
1240                    if let Some(cycle) = dfs(dep, entries, id_to_idx, color, path) {
1241                        return Some(cycle);
1242                    }
1243                }
1244                Color::Gray => {
1245                    let start = path.iter().position(|&(x, _)| x == dep).unwrap_or(0);
1246                    let mut cycle: Vec<(usize, ClauseKind)> = path[start..].to_vec();
1247                    cycle.push((dep, clause));
1248                    return Some(cycle);
1249                }
1250                Color::Black => {}
1251            }
1252        }
1253
1254        color[node] = Color::Black;
1255        path.pop();
1256        None
1257    }
1258
1259    for start in 0..n {
1260        if color[start] == Color::White {
1261            if let Some(cycle) = dfs(start, entries, id_to_idx, &mut color, &mut path) {
1262                return cycle
1263                    .into_iter()
1264                    .map(|(i, kind)| (source_label(&entries[i]).into_owned(), kind))
1265                    .collect();
1266            }
1267        }
1268    }
1269
1270    vec![("<unknown cycle>".to_string(), ClauseKind::After)]
1271}
1272
1273// ---------------------------------------------------------------------------
1274// Utilities
1275// ---------------------------------------------------------------------------
1276
1277/// Map an [`AfterOp`] (v2 AST) onto the alias-free [`Operator`] used by
1278/// the timing math.
1279fn operator_from(op: &AfterOp) -> Operator {
1280    match op {
1281        AfterOp::LessThan => Operator::LessThan,
1282        AfterOp::GreaterThan => Operator::GreaterThan,
1283    }
1284}
1285
1286/// Parse a duration string into fractional seconds, emitting a compiler
1287/// error with field context on failure.
1288fn parse_duration_secs(
1289    input: &str,
1290    source_id: &str,
1291    field: &'static str,
1292) -> Result<f64, CompileAfterError> {
1293    parse_duration(input)
1294        .map(|d| d.as_secs_f64())
1295        .map_err(|e| CompileAfterError::InvalidDuration {
1296            source_id: source_id.to_string(),
1297            field,
1298            input: input.to_string(),
1299            reason: e.to_string(),
1300        })
1301}
1302
1303/// Resolve an optional duration string to seconds, falling back to
1304/// `default_secs` when `None`.
1305///
1306/// Parse failures surface as [`TimingError::InvalidDuration`] tagged with
1307/// the alias parameter name (e.g. `"flap.up_duration"`). The outer
1308/// [`timing_to_error`] then maps this straight into
1309/// [`CompileAfterError::InvalidDuration`] — the same variant used for
1310/// top-level `after.delay` and `phase_offset` parse failures — so users
1311/// see consistent diagnostics for every duration-shaped input regardless
1312/// of where it appears on the generator config.
1313fn duration_or_default(
1314    input: Option<&str>,
1315    default_secs: f64,
1316    field: &'static str,
1317) -> Result<f64, TimingError> {
1318    match input {
1319        Some(s) => {
1320            parse_duration(s)
1321                .map(|d| d.as_secs_f64())
1322                .map_err(|e| TimingError::InvalidDuration {
1323                    field,
1324                    input: s.to_string(),
1325                    reason: e.to_string(),
1326                })
1327        }
1328        None => Ok(default_secs),
1329    }
1330}
1331
1332/// Format an f64 seconds value as a duration string accepted by
1333/// [`parse_duration`].
1334///
1335/// The output prefers the shortest whole-unit representation (e.g. `"1m"`
1336/// for 60s, `"1h"` for 3600s) and falls back to fractional seconds for
1337/// values that cannot round-trip through a whole-unit form. Zero and
1338/// negative inputs normalize to `"0s"`. In debug builds a
1339/// `debug_assert!` guards against non-finite or negative inputs — these
1340/// can only arise from programmer error; the release fallback preserves
1341/// the `"0s"` normalization so production code never panics.
1342///
1343/// Sub-second values are emitted as fractional seconds (`0.5s` →
1344/// `"0.5s"`) rather than milliseconds; callers that need a specific
1345/// display form should format locally. All emitted strings round-trip
1346/// cleanly through [`parse_duration`] to the same [`std::time::Duration`].
1347pub fn format_duration_secs(secs: f64) -> String {
1348    debug_assert!(
1349        secs.is_finite() && secs >= 0.0,
1350        "format_duration_secs received non-finite or negative value: {secs}"
1351    );
1352    if !secs.is_finite() || secs <= 0.0 {
1353        return "0s".to_string();
1354    }
1355
1356    // Prefer whole-unit forms.
1357    let ms = (secs * 1000.0).round() as u64;
1358    if ms.is_multiple_of(1000) {
1359        let whole_secs = ms / 1000;
1360        if whole_secs > 0 && whole_secs.is_multiple_of(3600) {
1361            return format!("{}h", whole_secs / 3600);
1362        }
1363        if whole_secs > 0 && whole_secs.is_multiple_of(60) {
1364            return format!("{}m", whole_secs / 60);
1365        }
1366        return format!("{whole_secs}s");
1367    }
1368
1369    // Sub-millisecond: fall back to fractional seconds.
1370    if ms < 1 {
1371        return format!("{secs}s");
1372    }
1373
1374    // Fractional seconds expressible in whole milliseconds → use seconds
1375    // with enough precision to round-trip through parse_duration. Three
1376    // decimals is sufficient because `ms` is already whole ms.
1377    let secs_rounded = (ms as f64) / 1000.0;
1378    format!("{secs_rounded}s")
1379}
1380
1381// ===========================================================================
1382// Tests
1383// ===========================================================================
1384
1385#[cfg(test)]
1386mod tests {
1387    use super::*;
1388    use crate::compiler::expand::expand;
1389    use crate::compiler::expand::InMemoryPackResolver;
1390    use crate::compiler::normalize::normalize;
1391    use crate::compiler::parse::parse;
1392
1393    // -----------------------------------------------------------------------
1394    // Helpers
1395    // -----------------------------------------------------------------------
1396
1397    /// Compile a v2 YAML string end-to-end through parse → normalize →
1398    /// expand → compile_after, using the provided pack resolver.
1399    fn compile(yaml: &str) -> Result<CompiledFile, String> {
1400        compile_with_resolver(yaml, &InMemoryPackResolver::new())
1401    }
1402
1403    fn compile_with_resolver(
1404        yaml: &str,
1405        resolver: &InMemoryPackResolver,
1406    ) -> Result<CompiledFile, String> {
1407        let parsed = parse(yaml).map_err(|e| format!("parse: {e}"))?;
1408        let normalized = normalize(parsed).map_err(|e| format!("normalize: {e}"))?;
1409        let expanded = expand(normalized, resolver).map_err(|e| format!("expand: {e}"))?;
1410        compile_after(expanded).map_err(|e| format!("compile_after: {e}"))
1411    }
1412
1413    // -----------------------------------------------------------------------
1414    // Reference resolution
1415    // -----------------------------------------------------------------------
1416
1417    #[test]
1418    fn unknown_ref_surfaces_available_ids() {
1419        let yaml = r#"
1420version: 2
1421scenarios:
1422  - id: cpu
1423    signal_type: metrics
1424    name: cpu_saturation
1425    rate: 1
1426    generator: { type: saturation, baseline: 0, ceiling: 100, time_to_saturate: 60s }
1427  - id: log_entry
1428    signal_type: logs
1429    name: errors
1430    rate: 1
1431    log_generator: { type: template, templates: [{ message: "hi" }] }
1432    after: { ref: nonexistent, op: ">", value: 50 }
1433"#;
1434        let err = compile(yaml).expect_err("should fail");
1435        assert!(err.contains("nonexistent"), "got: {err}");
1436        assert!(err.contains("Available"), "got: {err}");
1437    }
1438
1439    #[test]
1440    fn self_reference_is_rejected() {
1441        let yaml = r#"
1442version: 2
1443scenarios:
1444  - id: loop
1445    signal_type: metrics
1446    name: util
1447    rate: 1
1448    generator: { type: saturation, baseline: 0, ceiling: 100, time_to_saturate: 60s }
1449    after: { ref: loop, op: ">", value: 50 }
1450"#;
1451        let err = compile(yaml).expect_err("self-ref is rejected");
1452        assert!(err.contains("references itself"), "got: {err}");
1453    }
1454
1455    // -----------------------------------------------------------------------
1456    // Simple crossing per operator/generator
1457    // -----------------------------------------------------------------------
1458
1459    #[test]
1460    fn saturation_greater_than_sets_offset() {
1461        let yaml = r#"
1462version: 2
1463scenarios:
1464  - id: util
1465    signal_type: metrics
1466    name: util
1467    rate: 1
1468    generator: { type: saturation, baseline: 20, ceiling: 85, time_to_saturate: 120s }
1469  - id: follower
1470    signal_type: metrics
1471    name: latency
1472    rate: 1
1473    generator: { type: constant, value: 1 }
1474    after: { ref: util, op: ">", value: 70 }
1475"#;
1476        let compiled = compile(yaml).expect("should compile");
1477        let follower = &compiled.entries[1];
1478        let expected_secs = (70.0 - 20.0) / (85.0 - 20.0) * 120.0;
1479        let expected_str = format_duration_secs(expected_secs);
1480        assert_eq!(
1481            follower.phase_offset.as_deref(),
1482            Some(expected_str.as_str())
1483        );
1484    }
1485
1486    #[rustfmt::skip]
1487    #[rstest::rstest]
1488    // Flap `<` crosses at the up_duration boundary (60s → "1m").
1489    #[case::flap_less_than(r#"
1490version: 2
1491scenarios:
1492  - id: link
1493    signal_type: metrics
1494    name: oper_state
1495    rate: 1
1496    generator: { type: flap, up_duration: 60s, down_duration: 30s }
1497  - id: follower
1498    signal_type: metrics
1499    name: util
1500    rate: 1
1501    generator: { type: constant, value: 1 }
1502    after: { ref: link, op: "<", value: 1 }
1503"#, "1m")]
1504    // spike_event `<` crosses at the spike_duration boundary (10s).
1505    #[case::spike_event_less_than(r#"
1506version: 2
1507scenarios:
1508  - id: burst
1509    signal_type: metrics
1510    name: errs
1511    rate: 1
1512    generator: { type: spike_event, baseline: 0, spike_height: 100, spike_duration: 10s, spike_interval: 60s }
1513  - id: follower
1514    signal_type: metrics
1515    name: recovery
1516    rate: 1
1517    generator: { type: constant, value: 1 }
1518    after: { ref: burst, op: "<", value: 50 }
1519"#, "10s")]
1520    // Step `>`: ceil((55-0)/10) = 6 ticks, rate=2 -> 3.0s.
1521    #[case::step_greater_than(r#"
1522version: 2
1523scenarios:
1524  - id: counter
1525    signal_type: metrics
1526    name: req_count
1527    rate: 2
1528    generator: { type: step, start: 0, step_size: 10 }
1529  - id: follower
1530    signal_type: metrics
1531    name: alert
1532    rate: 1
1533    generator: { type: constant, value: 1 }
1534    after: { ref: counter, op: ">", value: 55 }
1535"#, "3s")]
1536    // Sequence `<`: index 2 (value 2) is the first < 3; rate=2 -> 1.0s.
1537    #[case::sequence_less_than(r#"
1538version: 2
1539scenarios:
1540  - id: seq
1541    signal_type: metrics
1542    name: values
1543    rate: 2
1544    generator: { type: sequence, values: [10, 5, 2, 1], repeat: false }
1545  - id: follower
1546    signal_type: metrics
1547    name: alert
1548    rate: 1
1549    generator: { type: constant, value: 1 }
1550    after: { ref: seq, op: "<", value: 3 }
1551"#, "1s")]
1552    fn follower_phase_offset_matches_expected_crossing(
1553        #[case] yaml: &str,
1554        #[case] expected_offset: &str,
1555    ) {
1556        let compiled = compile(yaml).expect("should compile");
1557        assert_eq!(
1558            compiled.entries[1].phase_offset.as_deref(),
1559            Some(expected_offset)
1560        );
1561    }
1562
1563    #[test]
1564    fn step_less_than_is_unsupported() {
1565        let yaml = r#"
1566version: 2
1567scenarios:
1568  - id: counter
1569    signal_type: metrics
1570    name: x
1571    rate: 1
1572    generator: { type: step, start: 0, step_size: 1 }
1573  - id: follower
1574    signal_type: metrics
1575    name: y
1576    rate: 1
1577    generator: { type: constant, value: 1 }
1578    after: { ref: counter, op: "<", value: 5 }
1579"#;
1580        let err = compile(yaml).expect_err("step < is unsupported");
1581        assert!(err.contains("step"), "got: {err}");
1582    }
1583
1584    // -----------------------------------------------------------------------
1585    // Targets that cannot be resolved to a crossing time.
1586    //
1587    // `constant` values are out-of-range when the threshold is unreachable;
1588    // `sine`, `steady`, and `uniform` are blanket-unsupported because their
1589    // values never settle into a predictable threshold-crossing schedule.
1590    // Each error message must name the offending generator type so the
1591    // diagnostic points the user at the right signal.
1592    // -----------------------------------------------------------------------
1593
1594    #[rustfmt::skip]
1595    #[rstest::rstest]
1596    #[case::constant(r#"
1597version: 2
1598scenarios:
1599  - id: k
1600    signal_type: metrics
1601    name: k
1602    rate: 1
1603    generator: { type: constant, value: 42 }
1604  - id: follower
1605    signal_type: metrics
1606    name: y
1607    rate: 1
1608    generator: { type: constant, value: 1 }
1609    after: { ref: k, op: ">", value: 100 }
1610"#, "constant")]
1611    #[case::sine(r#"
1612version: 2
1613scenarios:
1614  - id: wave
1615    signal_type: metrics
1616    name: s
1617    rate: 1
1618    generator: { type: sine, amplitude: 10, period_secs: 60, offset: 50 }
1619  - id: follower
1620    signal_type: metrics
1621    name: f
1622    rate: 1
1623    generator: { type: constant, value: 1 }
1624    after: { ref: wave, op: ">", value: 55 }
1625"#, "sine")]
1626    #[case::steady(r#"
1627version: 2
1628scenarios:
1629  - id: base
1630    signal_type: metrics
1631    name: s
1632    rate: 1
1633    generator: { type: steady, center: 50, amplitude: 5, period: 60s }
1634  - id: follower
1635    signal_type: metrics
1636    name: f
1637    rate: 1
1638    generator: { type: constant, value: 1 }
1639    after: { ref: base, op: ">", value: 55 }
1640"#, "steady")]
1641    #[case::uniform(r#"
1642version: 2
1643scenarios:
1644  - id: u
1645    signal_type: metrics
1646    name: u
1647    rate: 1
1648    generator: { type: uniform, min: 0, max: 10, seed: 1 }
1649  - id: follower
1650    signal_type: metrics
1651    name: f
1652    rate: 1
1653    generator: { type: constant, value: 1 }
1654    after: { ref: u, op: ">", value: 5 }
1655"#, "uniform")]
1656    fn unresolvable_target_generator_is_rejected(
1657        #[case] yaml: &str,
1658        #[case] expected_substring: &str,
1659    ) {
1660        let err = compile(yaml).expect_err("target generator must be rejected");
1661        assert!(
1662            err.contains(expected_substring),
1663            "expected error to mention {expected_substring:?}, got: {err}"
1664        );
1665    }
1666
1667    // -----------------------------------------------------------------------
1668    // Transitive chains + delay additivity
1669    // -----------------------------------------------------------------------
1670
1671    #[test]
1672    fn transitive_chain_accumulates() {
1673        let yaml = r#"
1674version: 2
1675scenarios:
1676  - id: a
1677    signal_type: metrics
1678    name: a
1679    rate: 1
1680    generator: { type: flap, up_duration: 60s, down_duration: 30s }
1681  - id: b
1682    signal_type: metrics
1683    name: b
1684    rate: 1
1685    generator: { type: saturation, baseline: 20, ceiling: 85, time_to_saturate: 120s }
1686    after: { ref: a, op: "<", value: 1 }
1687  - id: c
1688    signal_type: metrics
1689    name: c
1690    rate: 1
1691    generator: { type: constant, value: 1 }
1692    after: { ref: b, op: ">", value: 70 }
1693"#;
1694        let compiled = compile(yaml).expect("chain compiles");
1695        let expected_b_secs = 60.0;
1696        let expected_c_secs = 60.0 + (70.0 - 20.0) / (85.0 - 20.0) * 120.0;
1697        assert_eq!(
1698            compiled.entries[1].phase_offset.as_deref(),
1699            Some(format_duration_secs(expected_b_secs).as_str())
1700        );
1701        assert_eq!(
1702            compiled.entries[2].phase_offset.as_deref(),
1703            Some(format_duration_secs(expected_c_secs).as_str())
1704        );
1705    }
1706
1707    #[test]
1708    fn delay_is_added_to_crossing_time() {
1709        let yaml = r#"
1710version: 2
1711scenarios:
1712  - id: link
1713    signal_type: metrics
1714    name: a
1715    rate: 1
1716    generator: { type: flap, up_duration: 60s, down_duration: 30s }
1717  - id: follower
1718    signal_type: metrics
1719    name: b
1720    rate: 1
1721    generator: { type: constant, value: 1 }
1722    after: { ref: link, op: "<", value: 1, delay: 15s }
1723"#;
1724        let compiled = compile(yaml).expect("compile");
1725        assert_eq!(compiled.entries[1].phase_offset.as_deref(), Some("75s"));
1726    }
1727
1728    #[test]
1729    fn explicit_phase_offset_is_added() {
1730        let yaml = r#"
1731version: 2
1732scenarios:
1733  - id: link
1734    signal_type: metrics
1735    name: a
1736    rate: 1
1737    generator: { type: flap, up_duration: 60s, down_duration: 30s }
1738  - id: follower
1739    signal_type: metrics
1740    name: b
1741    rate: 1
1742    generator: { type: constant, value: 1 }
1743    phase_offset: 10s
1744    after: { ref: link, op: "<", value: 1 }
1745"#;
1746        let compiled = compile(yaml).expect("compile");
1747        assert_eq!(compiled.entries[1].phase_offset.as_deref(), Some("70s"));
1748    }
1749
1750    #[test]
1751    fn phase_offset_delay_and_crossing_sum() {
1752        let yaml = r#"
1753version: 2
1754scenarios:
1755  - id: link
1756    signal_type: metrics
1757    name: a
1758    rate: 1
1759    generator: { type: flap, up_duration: 60s, down_duration: 30s }
1760  - id: follower
1761    signal_type: metrics
1762    name: b
1763    rate: 1
1764    generator: { type: constant, value: 1 }
1765    phase_offset: 10s
1766    after: { ref: link, op: "<", value: 1, delay: 5s }
1767"#;
1768        let compiled = compile(yaml).expect("compile");
1769        // 10s + 60s crossing + 5s delay = 75s.
1770        assert_eq!(compiled.entries[1].phase_offset.as_deref(), Some("75s"));
1771    }
1772
1773    // -----------------------------------------------------------------------
1774    // Cycle detection
1775    // -----------------------------------------------------------------------
1776
1777    #[test]
1778    fn two_entry_cycle_is_detected() {
1779        let yaml = r#"
1780version: 2
1781scenarios:
1782  - id: a
1783    signal_type: metrics
1784    name: a
1785    rate: 1
1786    generator: { type: saturation, baseline: 0, ceiling: 100, time_to_saturate: 60s }
1787    after: { ref: b, op: ">", value: 1 }
1788  - id: b
1789    signal_type: metrics
1790    name: b
1791    rate: 1
1792    generator: { type: saturation, baseline: 0, ceiling: 100, time_to_saturate: 60s }
1793    after: { ref: a, op: ">", value: 1 }
1794"#;
1795        let err = compile(yaml).expect_err("cycle should fail");
1796        assert!(err.contains("circular"), "got: {err}");
1797        assert!(err.contains("a") && err.contains("b"), "got: {err}");
1798    }
1799
1800    #[test]
1801    fn three_entry_cycle_path_is_returned() {
1802        let yaml = r#"
1803version: 2
1804scenarios:
1805  - id: a
1806    signal_type: metrics
1807    name: a
1808    rate: 1
1809    generator: { type: saturation, baseline: 0, ceiling: 100, time_to_saturate: 60s }
1810    after: { ref: c, op: ">", value: 1 }
1811  - id: b
1812    signal_type: metrics
1813    name: b
1814    rate: 1
1815    generator: { type: saturation, baseline: 0, ceiling: 100, time_to_saturate: 60s }
1816    after: { ref: a, op: ">", value: 1 }
1817  - id: c
1818    signal_type: metrics
1819    name: c
1820    rate: 1
1821    generator: { type: saturation, baseline: 0, ceiling: 100, time_to_saturate: 60s }
1822    after: { ref: b, op: ">", value: 1 }
1823"#;
1824        let err = compile(yaml).expect_err("cycle should fail");
1825        assert!(err.contains("circular"), "got: {err}");
1826        assert!(
1827            err.contains("a -> "),
1828            "cycle path should have an arrow. got: {err}"
1829        );
1830    }
1831
1832    // -----------------------------------------------------------------------
1833    // Clock-group assignment (matrix 11.15 / 11.16)
1834    // -----------------------------------------------------------------------
1835
1836    #[test]
1837    fn clock_group_auto_assigned_as_chain_plus_lowest_id() {
1838        let yaml = r#"
1839version: 2
1840scenarios:
1841  - id: alpha
1842    signal_type: metrics
1843    name: a
1844    rate: 1
1845    generator: { type: flap, up_duration: 60s, down_duration: 30s }
1846  - id: bravo
1847    signal_type: metrics
1848    name: b
1849    rate: 1
1850    generator: { type: constant, value: 1 }
1851    after: { ref: alpha, op: "<", value: 1 }
1852"#;
1853        let compiled = compile(yaml).expect("compile");
1854        assert_eq!(
1855            compiled.entries[0].clock_group.as_deref(),
1856            Some("chain_alpha")
1857        );
1858        assert_eq!(
1859            compiled.entries[1].clock_group.as_deref(),
1860            Some("chain_alpha")
1861        );
1862    }
1863
1864    #[test]
1865    fn explicit_clock_group_propagates_to_chain_members() {
1866        let yaml = r#"
1867version: 2
1868scenarios:
1869  - id: alpha
1870    signal_type: metrics
1871    name: a
1872    rate: 1
1873    clock_group: failover
1874    generator: { type: flap, up_duration: 60s, down_duration: 30s }
1875  - id: bravo
1876    signal_type: metrics
1877    name: b
1878    rate: 1
1879    generator: { type: constant, value: 1 }
1880    after: { ref: alpha, op: "<", value: 1 }
1881"#;
1882        let compiled = compile(yaml).expect("compile");
1883        assert_eq!(compiled.entries[0].clock_group.as_deref(), Some("failover"));
1884        assert_eq!(compiled.entries[1].clock_group.as_deref(), Some("failover"));
1885    }
1886
1887    #[test]
1888    fn conflicting_clock_groups_are_rejected() {
1889        let yaml = r#"
1890version: 2
1891scenarios:
1892  - id: alpha
1893    signal_type: metrics
1894    name: a
1895    rate: 1
1896    clock_group: group_a
1897    generator: { type: flap, up_duration: 60s, down_duration: 30s }
1898  - id: bravo
1899    signal_type: metrics
1900    name: b
1901    rate: 1
1902    clock_group: group_b
1903    generator: { type: constant, value: 1 }
1904    after: { ref: alpha, op: "<", value: 1 }
1905"#;
1906        let err = compile(yaml).expect_err("conflicting groups fail");
1907        assert!(err.contains("conflicting clock_group"), "got: {err}");
1908        assert!(
1909            err.contains("group_a") && err.contains("group_b"),
1910            "got: {err}"
1911        );
1912    }
1913
1914    #[test]
1915    fn independent_signals_keep_no_clock_group() {
1916        let yaml = r#"
1917version: 2
1918scenarios:
1919  - id: independent
1920    signal_type: metrics
1921    name: a
1922    rate: 1
1923    generator: { type: saturation, baseline: 0, ceiling: 100, time_to_saturate: 60s }
1924"#;
1925        let compiled = compile(yaml).expect("compile");
1926        assert!(compiled.entries[0].clock_group.is_none());
1927    }
1928
1929    #[test]
1930    fn clock_group_empty_string_mixed_with_some_x_uses_x() {
1931        // An explicit empty string is filtered out (treated as "no value")
1932        // and the concrete `"x"` wins for the whole component — no conflict.
1933        let yaml = r#"
1934version: 2
1935scenarios:
1936  - id: alpha
1937    signal_type: metrics
1938    name: a
1939    rate: 1
1940    clock_group: ""
1941    generator: { type: flap, up_duration: 60s, down_duration: 30s }
1942  - id: bravo
1943    signal_type: metrics
1944    name: b
1945    rate: 1
1946    clock_group: x
1947    generator: { type: constant, value: 1 }
1948    after: { ref: alpha, op: "<", value: 1 }
1949"#;
1950        let compiled = compile(yaml).expect("compile");
1951        assert_eq!(compiled.entries[0].clock_group.as_deref(), Some("x"));
1952        assert_eq!(compiled.entries[1].clock_group.as_deref(), Some("x"));
1953    }
1954
1955    #[test]
1956    fn clock_group_whitespace_variants_conflict() {
1957        // `"x "` and `"x"` differ under string equality, so the component
1958        // carries two distinct non-empty values -> ConflictingClockGroup.
1959        let yaml = r#"
1960version: 2
1961scenarios:
1962  - id: alpha
1963    signal_type: metrics
1964    name: a
1965    rate: 1
1966    clock_group: "x "
1967    generator: { type: flap, up_duration: 60s, down_duration: 30s }
1968  - id: bravo
1969    signal_type: metrics
1970    name: b
1971    rate: 1
1972    clock_group: x
1973    generator: { type: constant, value: 1 }
1974    after: { ref: alpha, op: "<", value: 1 }
1975"#;
1976        let err = compile(yaml).expect_err("trailing whitespace must conflict");
1977        assert!(err.contains("conflicting clock_group"), "got: {err}");
1978    }
1979
1980    // -----------------------------------------------------------------------
1981    // Cross-signal-type after (spec §3.5, matrix 11.11)
1982    // -----------------------------------------------------------------------
1983
1984    #[test]
1985    fn log_signal_can_depend_on_metrics_target() {
1986        let yaml = r#"
1987version: 2
1988scenarios:
1989  - id: err_rate
1990    signal_type: metrics
1991    name: http_error_rate
1992    rate: 1
1993    generator: { type: saturation, baseline: 1, ceiling: 30, time_to_saturate: 90s }
1994  - id: err_logs
1995    signal_type: logs
1996    name: app_logs
1997    rate: 1
1998    log_generator: { type: template, templates: [{ message: "upstream timeout" }] }
1999    after: { ref: err_rate, op: ">", value: 10 }
2000"#;
2001        let compiled = compile(yaml).expect("cross-signal after compiles");
2002        assert!(compiled.entries[1].phase_offset.is_some());
2003    }
2004
2005    #[test]
2006    fn metrics_entry_cannot_depend_on_logs_target() {
2007        let yaml = r#"
2008version: 2
2009scenarios:
2010  - id: log_src
2011    signal_type: logs
2012    name: lg
2013    rate: 1
2014    log_generator: { type: template, templates: [{ message: "hi" }] }
2015  - id: follower
2016    signal_type: metrics
2017    name: f
2018    rate: 1
2019    generator: { type: constant, value: 1 }
2020    after: { ref: log_src, op: ">", value: 0 }
2021"#;
2022        let err = compile(yaml).expect_err("logs target rejected");
2023        assert!(err.contains("logs signal"), "got: {err}");
2024    }
2025
2026    // -----------------------------------------------------------------------
2027    // Alias desugaring correctness: `flap` after-math matches desugared
2028    // sequence.
2029    // -----------------------------------------------------------------------
2030
2031    #[test]
2032    fn flap_alias_produces_expected_up_duration_offset() {
2033        let yaml_alias = r#"
2034version: 2
2035scenarios:
2036  - id: link
2037    signal_type: metrics
2038    name: s
2039    rate: 1
2040    generator: { type: flap, up_duration: 60s, down_duration: 30s }
2041  - id: follower
2042    signal_type: metrics
2043    name: f
2044    rate: 1
2045    generator: { type: constant, value: 1 }
2046    after: { ref: link, op: "<", value: 1 }
2047"#;
2048        let compiled = compile(yaml_alias).expect("compile");
2049        assert_eq!(compiled.entries[1].phase_offset.as_deref(), Some("1m"));
2050    }
2051
2052    // -----------------------------------------------------------------------
2053    // Pack sub-signal refs (matrix 11.7, 11.12, 11.13)
2054    // -----------------------------------------------------------------------
2055
2056    fn resolver_with_test_pack() -> InMemoryPackResolver {
2057        // Simple pack with unique-by-name metrics.
2058        let yaml = r#"
2059name: testpack
2060category: test
2061description: test
2062metrics:
2063  - name: state_flap
2064    generator: { type: flap, up_duration: 60s, down_duration: 30s }
2065  - name: util_sat
2066    generator: { type: saturation, baseline: 0, ceiling: 100, time_to_saturate: 120s }
2067"#;
2068        let pack =
2069            serde_yaml_ng::from_str::<crate::packs::MetricPackDef>(yaml).expect("pack parses");
2070        let mut r = InMemoryPackResolver::new();
2071        r.insert("testpack", pack);
2072        r
2073    }
2074
2075    #[test]
2076    fn dotted_pack_ref_resolves() {
2077        let yaml = r#"
2078version: 2
2079scenarios:
2080  - id: dev
2081    signal_type: metrics
2082    rate: 1
2083    pack: testpack
2084  - id: follower
2085    signal_type: metrics
2086    name: alert
2087    rate: 1
2088    generator: { type: constant, value: 1 }
2089    after: { ref: dev.state_flap, op: "<", value: 1 }
2090"#;
2091        let compiled = compile_with_resolver(yaml, &resolver_with_test_pack()).expect("compile");
2092        // Look for the follower entry (non-pack).
2093        let follower = compiled
2094            .entries
2095            .iter()
2096            .find(|e| e.id.as_deref() == Some("follower"))
2097            .expect("follower present");
2098        assert_eq!(follower.phase_offset.as_deref(), Some("1m"));
2099    }
2100
2101    #[test]
2102    fn ambiguous_bare_pack_ref_is_rejected() {
2103        // Use a pack with two specs sharing the metric name.
2104        let pack_yaml = r#"
2105name: ambig
2106category: test
2107description: test
2108metrics:
2109  - name: cpu_util
2110    labels: { mode: user }
2111    generator: { type: sawtooth, min: 0, max: 100, period_secs: 60 }
2112  - name: cpu_util
2113    labels: { mode: system }
2114    generator: { type: sawtooth, min: 0, max: 100, period_secs: 60 }
2115"#;
2116        let pack =
2117            serde_yaml_ng::from_str::<crate::packs::MetricPackDef>(pack_yaml).expect("pack parses");
2118        let mut r = InMemoryPackResolver::new();
2119        r.insert("ambig", pack);
2120
2121        let yaml = r#"
2122version: 2
2123scenarios:
2124  - id: host
2125    signal_type: metrics
2126    rate: 1
2127    pack: ambig
2128  - id: follower
2129    signal_type: metrics
2130    name: alert
2131    rate: 1
2132    generator: { type: constant, value: 1 }
2133    after: { ref: host.cpu_util, op: ">", value: 50 }
2134"#;
2135        let err = compile_with_resolver(yaml, &r).expect_err("bare ref is ambiguous");
2136        assert!(err.contains("ambiguous"), "got: {err}");
2137        assert!(
2138            err.contains("host.cpu_util#0") && err.contains("host.cpu_util#1"),
2139            "candidates should be listed. got: {err}"
2140        );
2141    }
2142
2143    // -----------------------------------------------------------------------
2144    // InvalidDuration coverage — every code path that can construct this
2145    // variant must have a dedicated regression test.
2146    //
2147    // Each case names the source id of the failing entry, the field that
2148    // flagged the malformed duration, and the literal input string so the
2149    // error round-trip is byte-exact.
2150    // -----------------------------------------------------------------------
2151
2152    #[rustfmt::skip]
2153    #[rstest::rstest]
2154    // `compile_after` is the first validation pass that actually parses
2155    // `after.delay` as a `std::time::Duration` — the parser only checks
2156    // the shape of the YAML. A malformed delay string must surface as
2157    // `CompileAfterError::InvalidDuration` tagged with
2158    // `field == "after.delay"`.
2159    #[case::after_delay(r#"
2160version: 2
2161scenarios:
2162  - id: src
2163    signal_type: metrics
2164    name: a
2165    rate: 1
2166    generator: { type: flap, up_duration: 60s, down_duration: 30s }
2167  - id: follower
2168    signal_type: metrics
2169    name: b
2170    rate: 1
2171    generator: { type: constant, value: 1 }
2172    after: { ref: src, op: "<", value: 1, delay: "10seconds" }
2173"#, "follower", "after.delay", "10seconds")]
2174    // `phase_offset: "0s"` is a well-known `parse_duration` rejection
2175    // (zero durations are invalid). Because the entry's `phase_offset`
2176    // is parsed inside `compile_after`, this must surface as
2177    // `CompileAfterError::InvalidDuration` with `field == "phase_offset"`.
2178    #[case::phase_offset_zero(r#"
2179version: 2
2180scenarios:
2181  - id: src
2182    signal_type: metrics
2183    name: a
2184    rate: 1
2185    generator: { type: flap, up_duration: 60s, down_duration: 30s }
2186  - id: follower
2187    signal_type: metrics
2188    name: b
2189    rate: 1
2190    phase_offset: "0s"
2191    generator: { type: constant, value: 1 }
2192    after: { ref: src, op: "<", value: 1 }
2193"#, "follower", "phase_offset", "0s")]
2194    // Invalid alias duration params (e.g. `flap.up_duration: "oops"`) must
2195    // also route through `InvalidDuration` — historically these were
2196    // folded into `OutOfRangeThreshold` because `duration_or_default`
2197    // wrapped them as `TimingError::OutOfRange`. PR 5 review flagged the
2198    // mis-classification; this regression anchors the fix.
2199    #[case::alias_flap_up_duration(r#"
2200version: 2
2201scenarios:
2202  - id: src
2203    signal_type: metrics
2204    name: a
2205    rate: 1
2206    generator: { type: flap, up_duration: "oops", down_duration: 30s }
2207  - id: follower
2208    signal_type: metrics
2209    name: b
2210    rate: 1
2211    generator: { type: constant, value: 1 }
2212    after: { ref: src, op: "<", value: 1 }
2213"#, "follower", "flap.up_duration", "oops")]
2214    fn invalid_duration_surfaces_invalid_duration(
2215        #[case] yaml: &str,
2216        #[case] expected_source_id: &str,
2217        #[case] expected_field: &str,
2218        #[case] expected_input: &str,
2219    ) {
2220        let err = match compile_after_from_yaml(yaml) {
2221            Err(e) => e,
2222            Ok(_) => panic!("invalid duration must fail"),
2223        };
2224        match err {
2225            CompileAfterError::InvalidDuration {
2226                ref source_id,
2227                field,
2228                ref input,
2229                ..
2230            } => {
2231                assert_eq!(source_id, expected_source_id);
2232                assert_eq!(field, expected_field);
2233                assert_eq!(input, expected_input);
2234            }
2235            other => panic!("expected InvalidDuration, got {other:?}"),
2236        }
2237    }
2238
2239    /// Pipe YAML straight to `compile_after` and return the typed error
2240    /// (rather than the stringified form the other helpers use). Enables
2241    /// the `InvalidDuration` tests above to pattern-match on the variant
2242    /// shape without redundant string assertions.
2243    fn compile_after_from_yaml(yaml: &str) -> Result<CompiledFile, CompileAfterError> {
2244        let parsed = parse(yaml).expect("parse");
2245        let normalized = normalize(parsed).expect("normalize");
2246        let expanded = expand(normalized, &InMemoryPackResolver::new()).expect("expand");
2247        compile_after(expanded)
2248    }
2249
2250    // -----------------------------------------------------------------------
2251    // format_duration_secs round-trip
2252    // -----------------------------------------------------------------------
2253
2254    #[rustfmt::skip]
2255    #[rstest::rstest]
2256    #[case::whole_seconds(30.0,            "30s")]
2257    #[case::whole_minutes(120.0,           "2m")]
2258    #[case::whole_hours(3600.0,            "1h")]
2259    // Exact zero (and the `-0.0` variant, which compares equal to 0.0)
2260    // both route through the `<= 0.0` fallback and emit `"0s"`.
2261    #[case::zero(0.0,                      "0s")]
2262    #[case::negative_zero(-0.0,            "0s")]
2263    fn format_duration_whole_units(#[case] secs: f64, #[case] expected: &str) {
2264        assert_eq!(format_duration_secs(secs), expected);
2265    }
2266
2267    #[test]
2268    fn format_duration_fractional_seconds_round_trip() {
2269        let result = format_duration_secs(92.307);
2270        let dur = parse_duration(&result).expect("round-trip");
2271        assert!(
2272            (dur.as_secs_f64() - 92.307).abs() < 0.01,
2273            "got {}, expected ~92.307",
2274            dur.as_secs_f64()
2275        );
2276    }
2277
2278    #[test]
2279    fn outgoing_edges_yields_after_then_while() {
2280        use crate::compiler::{WhileClause, WhileOp};
2281        let mut e = ExpandedEntry {
2282            id: Some("x".to_string()),
2283            signal_type: "metrics".to_string(),
2284            name: "x".to_string(),
2285            rate: 1.0,
2286            duration: None,
2287            generator: None,
2288            log_generator: None,
2289            labels: None,
2290            dynamic_labels: None,
2291            encoder: crate::encoder::EncoderConfig::PrometheusText { precision: None },
2292            sink: crate::sink::SinkConfig::Stdout,
2293            jitter: None,
2294            jitter_seed: None,
2295            gaps: None,
2296            bursts: None,
2297            cardinality_spikes: None,
2298            phase_offset: None,
2299            clock_group: None,
2300            after: Some(crate::compiler::AfterClause {
2301                ref_id: "a_target".to_string(),
2302                op: AfterOp::GreaterThan,
2303                value: 0.0,
2304                delay: None,
2305            }),
2306            while_clause: Some(WhileClause {
2307                ref_id: "w_target".to_string(),
2308                op: WhileOp::LessThan,
2309                value: 0.0,
2310            }),
2311            delay_clause: None,
2312            distribution: None,
2313            buckets: None,
2314            quantiles: None,
2315            observations_per_tick: None,
2316            mean_shift_per_sec: None,
2317            seed: None,
2318            on_sink_error: crate::OnSinkError::Warn,
2319        };
2320        let edges: Vec<_> = outgoing_edges(&e).collect();
2321        assert_eq!(
2322            edges,
2323            vec![
2324                ("a_target", ClauseKind::After),
2325                ("w_target", ClauseKind::While)
2326            ]
2327        );
2328
2329        e.while_clause = None;
2330        let edges_after_only: Vec<_> = outgoing_edges(&e).collect();
2331        assert_eq!(edges_after_only, vec![("a_target", ClauseKind::After)]);
2332
2333        e.after = None;
2334        let edges_none: Vec<_> = outgoing_edges(&e).collect();
2335        assert!(edges_none.is_empty());
2336    }
2337
2338    #[test]
2339    fn while_yaml_compiles_and_propagates_clause() {
2340        let yaml = r#"
2341version: 2
2342defaults:
2343  rate: 1
2344  duration: 1m
2345scenarios:
2346  - id: link
2347    signal_type: metrics
2348    name: link
2349    generator: { type: flap, up_duration: 60s, down_duration: 30s }
2350  - id: dependent
2351    signal_type: metrics
2352    name: dependent
2353    generator: { type: constant, value: 1 }
2354    while: { ref: link, op: ">", value: 0 }
2355"#;
2356        let compiled = compile_after_from_yaml(yaml).expect("while: must compile");
2357        let dep = compiled
2358            .entries
2359            .iter()
2360            .find(|e| e.id.as_deref() == Some("dependent"))
2361            .expect("dependent entry present");
2362        let w = dep.while_clause.as_ref().expect("while propagates");
2363        assert_eq!(w.ref_id, "link");
2364    }
2365
2366    #[test]
2367    fn defaults_duration_carries_into_while_compiled_entry() {
2368        let yaml = r#"
2369version: 2
2370defaults:
2371  rate: 1
2372  duration: 5m
2373scenarios:
2374  - id: link
2375    signal_type: metrics
2376    name: link
2377    generator: { type: flap, up_duration: 60s, down_duration: 30s }
2378  - id: dependent
2379    signal_type: metrics
2380    name: dependent
2381    generator: { type: constant, value: 1 }
2382    while: { ref: link, op: ">", value: 0 }
2383"#;
2384        let compiled = compile_after_from_yaml(yaml).expect("while: must compile");
2385        let dep = compiled
2386            .entries
2387            .iter()
2388            .find(|e| e.id.as_deref() == Some("dependent"))
2389            .expect("dependent entry present");
2390        assert!(dep.while_clause.is_some());
2391        assert_eq!(dep.duration.as_deref(), Some("5m"));
2392    }
2393
2394    #[test]
2395    fn mixed_after_while_cycle_uses_labeled_format() {
2396        let yaml = r#"
2397version: 2
2398defaults:
2399  rate: 1
2400  duration: 10m
2401scenarios:
2402  - id: a
2403    signal_type: metrics
2404    name: a
2405    generator: { type: saturation, baseline: 0, ceiling: 100, time_to_saturate: 60s }
2406    after: { ref: b, op: ">", value: 1 }
2407  - id: b
2408    signal_type: metrics
2409    name: b
2410    generator: { type: saturation, baseline: 0, ceiling: 100, time_to_saturate: 60s }
2411    while: { ref: a, op: ">", value: 0 }
2412"#;
2413        let err = compile_after_from_yaml(yaml).expect_err("mixed cycle must fail");
2414        match err {
2415            CompileAfterError::CircularDependency { ref cycle } => {
2416                let edge_kinds: Vec<_> = cycle[..cycle.len() - 1].iter().map(|(_, k)| *k).collect();
2417                assert!(
2418                    edge_kinds.contains(&ClauseKind::While),
2419                    "mixed cycle must include a While edge: {cycle:?}"
2420                );
2421                let display = err.to_string();
2422                assert!(
2423                    display.contains("--[after]-->") && display.contains("--[while]-->"),
2424                    "mixed cycle must render labeled arrows. got: {display}"
2425                );
2426            }
2427            other => panic!("expected CircularDependency, got {other:?}"),
2428        }
2429    }
2430
2431    #[test]
2432    fn pure_after_cycle_keeps_short_arrow_format() {
2433        let yaml = r#"
2434version: 2
2435defaults:
2436  rate: 1
2437scenarios:
2438  - id: a
2439    signal_type: metrics
2440    name: a
2441    generator: { type: saturation, baseline: 0, ceiling: 100, time_to_saturate: 60s }
2442    after: { ref: b, op: ">", value: 1 }
2443  - id: b
2444    signal_type: metrics
2445    name: b
2446    generator: { type: saturation, baseline: 0, ceiling: 100, time_to_saturate: 60s }
2447    after: { ref: a, op: ">", value: 1 }
2448"#;
2449        let err = compile_after_from_yaml(yaml).expect_err("pure-after cycle must fail");
2450        let display = err.to_string();
2451        assert!(
2452            display.contains(" -> ") && !display.contains("--["),
2453            "pure-after cycles must use the short arrow form. got: {display}"
2454        );
2455    }
2456
2457    #[test]
2458    fn deep_while_chain_compiles_quickly() {
2459        use std::fmt::Write;
2460        let mut yaml =
2461            String::from("version: 2\ndefaults:\n  rate: 1\n  duration: 1h\nscenarios:\n");
2462        let _ = writeln!(
2463            yaml,
2464            "  - id: n0\n    signal_type: metrics\n    name: n0\n    generator: {{ type: constant, value: 1 }}"
2465        );
2466        for i in 1..200 {
2467            let _ = writeln!(yaml,
2468                "  - id: n{i}\n    signal_type: metrics\n    name: n{i}\n    generator: {{ type: constant, value: 1 }}\n    while: {{ ref: n{prev}, op: \">\", value: 0 }}",
2469                prev = i - 1);
2470        }
2471        let start = std::time::Instant::now();
2472        let compiled = compile_after_from_yaml(&yaml).expect("deep chain must compile");
2473        let elapsed = start.elapsed();
2474        assert_eq!(compiled.entries.len(), 200);
2475        assert!(
2476            elapsed < std::time::Duration::from_secs(1),
2477            "200-node while: chain took {elapsed:?}; compile pipeline regressed"
2478        );
2479    }
2480
2481    #[test]
2482    fn self_while_reference_is_rejected_with_while_kind() {
2483        let yaml = r#"
2484version: 2
2485defaults:
2486  rate: 1
2487  duration: 1m
2488scenarios:
2489  - id: loop_w
2490    signal_type: metrics
2491    name: loop_w
2492    generator: { type: saturation, baseline: 0, ceiling: 100, time_to_saturate: 60s }
2493    while: { ref: loop_w, op: ">", value: 0 }
2494"#;
2495        let err = compile_after_from_yaml(yaml).expect_err("self-while must fail");
2496        match err {
2497            CompileAfterError::SelfReference { source_id, clause } => {
2498                assert_eq!(source_id, "loop_w");
2499                assert_eq!(clause, ClauseKind::While);
2500            }
2501            other => panic!("expected SelfReference(While), got {other:?}"),
2502        }
2503    }
2504
2505    #[test]
2506    fn while_targeting_logs_signal_is_rejected() {
2507        let yaml = r#"
2508version: 2
2509defaults:
2510  rate: 1
2511  duration: 1m
2512scenarios:
2513  - id: log_src
2514    signal_type: logs
2515    name: lg
2516    log_generator: { type: template, templates: [{ message: "hi" }] }
2517  - id: gated
2518    signal_type: metrics
2519    name: gated
2520    generator: { type: constant, value: 1 }
2521    while: { ref: log_src, op: ">", value: 0 }
2522"#;
2523        let err = compile_after_from_yaml(yaml).expect_err("non-metrics while target must fail");
2524        match err {
2525            CompileAfterError::NonMetricsTarget {
2526                ref_id,
2527                clause,
2528                target_signal,
2529                ..
2530            } => {
2531                assert_eq!(ref_id, "log_src");
2532                assert_eq!(clause, ClauseKind::While);
2533                assert_eq!(target_signal, "logs");
2534            }
2535            other => panic!("expected NonMetricsTarget(While), got {other:?}"),
2536        }
2537    }
2538
2539    #[test]
2540    fn while_against_nan_constant_is_rejected() {
2541        let yaml = r#"
2542version: 2
2543defaults:
2544  rate: 1
2545  duration: 1m
2546scenarios:
2547  - id: src
2548    signal_type: metrics
2549    name: src
2550    generator: { type: constant, value: .nan }
2551  - id: gated
2552    signal_type: metrics
2553    name: gated
2554    generator: { type: constant, value: 1 }
2555    while: { ref: src, op: ">", value: 0 }
2556"#;
2557        let err = compile_after_from_yaml(yaml).expect_err("constant NaN must reject");
2558        match err {
2559            CompileAfterError::WhileNanSource {
2560                ref_id,
2561                nan: NanSource::Constant,
2562                ..
2563            } => {
2564                assert_eq!(ref_id, "src");
2565            }
2566            other => panic!("expected WhileNanSource(Constant), got {other:?}"),
2567        }
2568    }
2569
2570    #[test]
2571    fn while_against_nan_sequence_value_is_rejected() {
2572        let yaml = r#"
2573version: 2
2574defaults:
2575  rate: 1
2576  duration: 1m
2577scenarios:
2578  - id: src
2579    signal_type: metrics
2580    name: src
2581    generator: { type: sequence, values: [1, 2, .nan, 3], repeat: false }
2582  - id: gated
2583    signal_type: metrics
2584    name: gated
2585    generator: { type: constant, value: 1 }
2586    while: { ref: src, op: ">", value: 0 }
2587"#;
2588        let err = compile_after_from_yaml(yaml).expect_err("sequence NaN must reject");
2589        match err {
2590            CompileAfterError::WhileNanSource {
2591                nan: NanSource::SequenceValue { index },
2592                ..
2593            } => {
2594                assert_eq!(index, 2);
2595            }
2596            other => panic!("expected WhileNanSource(SequenceValue), got {other:?}"),
2597        }
2598    }
2599
2600    #[test]
2601    fn while_against_csv_replay_upstream_is_rejected() {
2602        let dir = std::env::temp_dir().join("sonda-while-csv-upstream");
2603        std::fs::create_dir_all(&dir).expect("tempdir");
2604        let path = dir.join("ok.csv");
2605        std::fs::write(&path, "1\n2\n3\n").expect("write csv");
2606        let yaml = format!(
2607            r#"
2608version: 2
2609defaults:
2610  rate: 1
2611  duration: 1m
2612scenarios:
2613  - id: src
2614    signal_type: metrics
2615    name: src
2616    generator: {{ type: csv_replay, file: "{path}" }}
2617  - id: gated
2618    signal_type: metrics
2619    name: gated
2620    generator: {{ type: constant, value: 1 }}
2621    while: {{ ref: src, op: ">", value: 0 }}
2622"#,
2623            path = path.display()
2624        );
2625        let err = compile_after_from_yaml(&yaml).expect_err("csv_replay upstream must reject");
2626        match err {
2627            CompileAfterError::WhileUnsupportedUpstreamGenerator {
2628                ref_id,
2629                generator_kind,
2630                ..
2631            } => {
2632                assert_eq!(ref_id, "src");
2633                assert_eq!(generator_kind, "csv_replay");
2634            }
2635            other => panic!("expected WhileUnsupportedUpstreamGenerator, got {other:?}"),
2636        }
2637        std::fs::remove_file(&path).ok();
2638    }
2639
2640    #[test]
2641    fn while_against_log_template_upstream_is_rejected_as_non_metrics() {
2642        // A log signal cannot be a while: upstream — the existing
2643        // NonMetricsTarget gate already covers this. Locking it in here so
2644        // the rejection path stays observable alongside csv_replay's.
2645        let yaml = r#"
2646version: 2
2647defaults:
2648  rate: 1
2649  duration: 1m
2650scenarios:
2651  - id: src
2652    signal_type: logs
2653    name: lg
2654    log_generator: { type: template, templates: [{ message: "hi" }] }
2655  - id: gated
2656    signal_type: metrics
2657    name: gated
2658    generator: { type: constant, value: 1 }
2659    while: { ref: src, op: ">", value: 0 }
2660"#;
2661        let err = compile_after_from_yaml(yaml).expect_err("log upstream must reject");
2662        assert!(
2663            matches!(err, CompileAfterError::NonMetricsTarget { .. }),
2664            "expected NonMetricsTarget, got {err:?}"
2665        );
2666    }
2667
2668    #[rustfmt::skip]
2669    #[rstest::rstest]
2670    #[case::le("<=")]
2671    #[case::ge(">=")]
2672    #[case::eq("==")]
2673    #[case::ne("!=")]
2674    fn while_strict_operators_reject_non_strict(#[case] op: &str) {
2675        let yaml = format!(r#"
2676version: 2
2677defaults:
2678  rate: 1
2679  duration: 1m
2680scenarios:
2681  - id: src
2682    signal_type: metrics
2683    name: src
2684    generator: {{ type: constant, value: 1 }}
2685  - id: gated
2686    signal_type: metrics
2687    name: gated
2688    generator: {{ type: constant, value: 1 }}
2689    while: {{ ref: src, op: "{op}", value: 1 }}
2690"#);
2691        let err = parse(&yaml).expect_err("non-strict op must fail at parse");
2692        let msg = err.to_string();
2693        assert!(
2694            msg.contains("unsupported operator") && msg.contains("strict"),
2695            "error must use the 'unsupported operator … strict' wording. got: {msg}"
2696        );
2697    }
2698}