use std::collections::{BTreeMap, VecDeque};
use super::expand::{ExpandedEntry, ExpandedFile};
use super::timing::{
self, constant_crossing_secs, csv_replay_crossing_secs, sawtooth_crossing_secs,
sequence_crossing_secs, sine_crossing_secs, spike_crossing_secs, step_crossing_secs,
uniform_crossing_secs, Operator, TimingError,
};
use super::AfterOp;
use crate::config::validate::parse_duration;
use crate::config::{
BurstConfig, CardinalitySpikeConfig, DistributionConfig, DynamicLabelConfig, GapConfig,
OnSinkError,
};
use crate::encoder::EncoderConfig;
use crate::generator::{GeneratorConfig, LogGeneratorConfig};
use crate::sink::SinkConfig;
#[derive(Debug, thiserror::Error)]
#[non_exhaustive]
pub enum CompileAfterError {
#[error(
"entry '{source_id}': after.ref '{ref_id}' does not match any signal id in this file. \
Available ids: [{available}]"
)]
UnknownRef {
source_id: String,
ref_id: String,
available: String,
},
#[error(
"after.ref '{ref_id}' is ambiguous: pack '{pack_entry_id}' ships multiple specs with \
this metric name. Use one of: [{candidates}]"
)]
AmbiguousSubSignalRef {
ref_id: String,
pack_entry_id: String,
candidates: String,
},
#[error("entry '{source_id}': after.ref references itself")]
SelfReference {
source_id: String,
},
#[error("circular dependency detected: {}", .cycle.join(" -> "))]
CircularDependency {
cycle: Vec<String>,
},
#[error(
"entry '{source_id}': after.ref '{ref_id}' uses generator '{generator}' which does \
not support {op} threshold crossings: {reason}"
)]
UnsupportedGenerator {
source_id: String,
ref_id: String,
generator: String,
op: String,
reason: String,
},
#[error("entry '{source_id}': after.ref '{ref_id}' op '{op}' value {value} -- {reason}")]
OutOfRangeThreshold {
source_id: String,
ref_id: String,
op: String,
value: f64,
reason: String,
},
#[error(
"entry '{source_id}': after.ref '{ref_id}' op '{op}' value {value} -- condition is \
true at t=0, timing is ambiguous: {reason}"
)]
AmbiguousAtT0 {
source_id: String,
ref_id: String,
op: String,
value: f64,
reason: String,
},
#[error(
"conflicting clock_group in dependency chain: entry '{first_entry}' has \
clock_group '{first_group}', entry '{second_entry}' has clock_group '{second_group}'"
)]
ConflictingClockGroup {
first_entry: String,
first_group: String,
second_entry: String,
second_group: String,
},
#[error(
"entry '{source_id}': after.ref '{ref_id}' resolves to a {signal_type} signal; \
only metrics signals can be `after` targets"
)]
NonMetricsTarget {
source_id: String,
ref_id: String,
signal_type: String,
},
#[error("entry '{source_id}': invalid duration '{input}' in {field}: {reason}")]
InvalidDuration {
source_id: String,
field: &'static str,
input: String,
reason: String,
},
}
#[derive(Debug, Clone)]
#[cfg_attr(feature = "config", derive(serde::Serialize))]
pub struct CompiledFile {
pub version: u32,
pub entries: Vec<CompiledEntry>,
}
#[derive(Debug, Clone)]
#[cfg_attr(feature = "config", derive(serde::Serialize))]
pub struct CompiledEntry {
pub id: Option<String>,
pub signal_type: String,
pub name: String,
pub rate: f64,
pub duration: Option<String>,
pub generator: Option<GeneratorConfig>,
pub log_generator: Option<LogGeneratorConfig>,
pub labels: Option<BTreeMap<String, String>>,
pub dynamic_labels: Option<Vec<DynamicLabelConfig>>,
pub encoder: EncoderConfig,
pub sink: SinkConfig,
pub jitter: Option<f64>,
pub jitter_seed: Option<u64>,
pub gaps: Option<GapConfig>,
pub bursts: Option<BurstConfig>,
pub cardinality_spikes: Option<Vec<CardinalitySpikeConfig>>,
pub phase_offset: Option<String>,
pub clock_group: Option<String>,
pub clock_group_is_auto: bool,
pub distribution: Option<DistributionConfig>,
pub buckets: Option<Vec<f64>>,
pub quantiles: Option<Vec<f64>>,
pub observations_per_tick: Option<u32>,
pub mean_shift_per_sec: Option<f64>,
pub seed: Option<u64>,
pub on_sink_error: OnSinkError,
}
pub fn compile_after(file: ExpandedFile) -> Result<CompiledFile, CompileAfterError> {
let ExpandedFile { version, entries } = file;
let id_to_idx = build_id_index(&entries);
for entry in &entries {
let Some(clause) = &entry.after else { continue };
let source_id = source_label(entry);
resolve_reference(&clause.ref_id, &id_to_idx, &source_id)?;
if let Some(own_id) = entry.id.as_deref() {
if own_id == clause.ref_id {
return Err(CompileAfterError::SelfReference {
source_id: source_id.into_owned(),
});
}
}
}
let n = entries.len();
let mut in_degree = vec![0u32; n];
let mut dependents: Vec<Vec<usize>> = vec![Vec::new(); n];
for (i, entry) in entries.iter().enumerate() {
if let Some(clause) = &entry.after {
let dep_idx = id_to_idx[clause.ref_id.as_str()];
in_degree[i] += 1;
dependents[dep_idx].push(i);
}
}
let mut queue: VecDeque<usize> = (0..n).filter(|&i| in_degree[i] == 0).collect();
let mut sorted: Vec<usize> = Vec::with_capacity(n);
while let Some(idx) = queue.pop_front() {
sorted.push(idx);
for &dependent in &dependents[idx] {
in_degree[dependent] -= 1;
if in_degree[dependent] == 0 {
queue.push_back(dependent);
}
}
}
if sorted.len() < n {
let cycle = find_cycle(&entries, &id_to_idx);
return Err(CompileAfterError::CircularDependency { cycle });
}
let mut total_offsets = vec![0.0_f64; n];
let mut base_offsets = vec![0.0_f64; n];
for (i, entry) in entries.iter().enumerate() {
if let Some(s) = entry.phase_offset.as_deref() {
base_offsets[i] = parse_duration_secs(s, &source_label(entry), "phase_offset")?;
}
}
for &idx in &sorted {
let entry = &entries[idx];
let Some(clause) = &entry.after else {
total_offsets[idx] = base_offsets[idx];
continue;
};
let source_id = source_label(entry).into_owned();
let dep_idx = id_to_idx[clause.ref_id.as_str()];
let target = &entries[dep_idx];
if target.signal_type != "metrics" {
return Err(CompileAfterError::NonMetricsTarget {
source_id,
ref_id: clause.ref_id.clone(),
signal_type: target.signal_type.clone(),
});
}
let generator = target.generator.as_ref().unwrap_or_else(|| {
unreachable!(
"metrics target '{ref_id}' has no generator — parser and expand \
pass both guarantee metrics entries always carry one",
ref_id = clause.ref_id
)
});
let op = operator_from(&clause.op);
let crossing = crossing_secs(generator, op, clause.value, target.rate).map_err(|err| {
timing_to_error(err, &source_id, &clause.ref_id, generator, op, clause.value)
})?;
let delay = match clause.delay.as_deref() {
Some(s) => parse_duration_secs(s, &source_id, "after.delay")?,
None => 0.0,
};
total_offsets[idx] = base_offsets[idx] + total_offsets[dep_idx] + crossing + delay;
}
let clock_groups = assign_clock_groups(&entries, &id_to_idx)?;
let mut out: Vec<CompiledEntry> = Vec::with_capacity(n);
for (i, entry) in entries.into_iter().enumerate() {
let phase_offset = if entry.after.is_some() || total_offsets[i] != 0.0 {
Some(format_duration_secs(total_offsets[i]))
} else {
entry.phase_offset.clone()
};
let (clock_group, clock_group_is_auto) = match &clock_groups[i] {
ClockGroupAssignment::Resolved { name, is_auto } => (Some(name.clone()), *is_auto),
ClockGroupAssignment::Unassigned => (entry.clock_group.clone(), false),
};
out.push(CompiledEntry {
id: entry.id,
signal_type: entry.signal_type,
name: entry.name,
rate: entry.rate,
duration: entry.duration,
generator: entry.generator,
log_generator: entry.log_generator,
labels: entry.labels,
dynamic_labels: entry.dynamic_labels,
encoder: entry.encoder,
sink: entry.sink,
jitter: entry.jitter,
jitter_seed: entry.jitter_seed,
gaps: entry.gaps,
bursts: entry.bursts,
cardinality_spikes: entry.cardinality_spikes,
phase_offset,
clock_group,
clock_group_is_auto,
distribution: entry.distribution,
buckets: entry.buckets,
quantiles: entry.quantiles,
observations_per_tick: entry.observations_per_tick,
mean_shift_per_sec: entry.mean_shift_per_sec,
on_sink_error: entry.on_sink_error,
seed: entry.seed,
});
}
Ok(CompiledFile {
version,
entries: out,
})
}
fn build_id_index(entries: &[ExpandedEntry]) -> BTreeMap<&str, usize> {
let mut idx = BTreeMap::new();
for (i, entry) in entries.iter().enumerate() {
if let Some(id) = entry.id.as_deref() {
idx.insert(id, i);
}
}
idx
}
fn resolve_reference(
ref_id: &str,
id_to_idx: &BTreeMap<&str, usize>,
source_id: &str,
) -> Result<usize, CompileAfterError> {
if let Some(&idx) = id_to_idx.get(ref_id) {
return Ok(idx);
}
let prefix = format!("{ref_id}#");
let candidates: Vec<&str> = id_to_idx
.keys()
.filter(|k| k.starts_with(&prefix))
.copied()
.collect();
if !candidates.is_empty() {
let pack_entry_id = ref_id
.rsplit_once('.')
.map(|(left, _)| left.to_string())
.unwrap_or_default();
return Err(CompileAfterError::AmbiguousSubSignalRef {
ref_id: ref_id.to_string(),
pack_entry_id,
candidates: candidates.join(", "),
});
}
let available: Vec<&str> = id_to_idx.keys().copied().collect();
Err(CompileAfterError::UnknownRef {
source_id: source_id.to_string(),
ref_id: ref_id.to_string(),
available: available.join(", "),
})
}
fn source_label(entry: &ExpandedEntry) -> std::borrow::Cow<'_, str> {
if let Some(id) = entry.id.as_deref() {
std::borrow::Cow::Borrowed(id)
} else {
std::borrow::Cow::Owned(format!("<anonymous:{}>", entry.name))
}
}
fn crossing_secs(
generator: &GeneratorConfig,
op: Operator,
threshold: f64,
rate: f64,
) -> Result<f64, TimingError> {
match generator {
GeneratorConfig::Constant { value } => constant_crossing_secs(op, threshold, *value),
GeneratorConfig::Uniform { .. } => uniform_crossing_secs(),
GeneratorConfig::Sine { .. } => sine_crossing_secs(),
GeneratorConfig::CsvReplay { .. } => csv_replay_crossing_secs(),
GeneratorConfig::Sawtooth {
min,
max,
period_secs,
} => sawtooth_crossing_secs(op, threshold, *min, *max, *period_secs),
GeneratorConfig::Sequence { values, repeat } => {
sequence_crossing_secs(op, threshold, values, *repeat, rate)
}
GeneratorConfig::Step {
start,
step_size,
max,
} => step_crossing_secs(op, threshold, start.unwrap_or(0.0), *step_size, *max, rate),
GeneratorConfig::Spike {
baseline,
magnitude,
duration_secs,
..
} => spike_crossing_secs(op, threshold, *baseline, *magnitude, *duration_secs),
GeneratorConfig::Flap {
up_duration,
down_duration,
up_value,
down_value,
} => {
let up_secs = duration_or_default(up_duration.as_deref(), 10.0, "flap.up_duration")?;
let down_secs =
duration_or_default(down_duration.as_deref(), 5.0, "flap.down_duration")?;
let up_val = up_value.unwrap_or(1.0);
let down_val = down_value.unwrap_or(0.0);
timing::flap_crossing_secs(op, threshold, up_secs, down_secs, up_val, down_val)
}
GeneratorConfig::Saturation {
baseline,
ceiling,
time_to_saturate,
} => {
let bl = baseline.unwrap_or(0.0);
let cl = ceiling.unwrap_or(100.0);
let period = duration_or_default(
time_to_saturate.as_deref(),
5.0 * 60.0,
"saturation.time_to_saturate",
)?;
sawtooth_crossing_secs(op, threshold, bl, cl, period)
}
GeneratorConfig::Leak {
baseline,
ceiling,
time_to_ceiling,
} => {
let bl = baseline.unwrap_or(0.0);
let cl = ceiling.unwrap_or(100.0);
let period = duration_or_default(
time_to_ceiling.as_deref(),
10.0 * 60.0,
"leak.time_to_ceiling",
)?;
sawtooth_crossing_secs(op, threshold, bl, cl, period)
}
GeneratorConfig::Degradation {
baseline,
ceiling,
time_to_degrade,
..
} => {
let bl = baseline.unwrap_or(0.0);
let cl = ceiling.unwrap_or(100.0);
let period = duration_or_default(
time_to_degrade.as_deref(),
5.0 * 60.0,
"degradation.time_to_degrade",
)?;
sawtooth_crossing_secs(op, threshold, bl, cl, period)
}
GeneratorConfig::Steady { .. } => timing::steady_crossing_secs(),
GeneratorConfig::SpikeEvent {
baseline,
spike_height,
spike_duration,
..
} => {
let bl = baseline.unwrap_or(0.0);
let height = spike_height.unwrap_or(100.0);
let dur = duration_or_default(
spike_duration.as_deref(),
10.0,
"spike_event.spike_duration",
)?;
spike_crossing_secs(op, threshold, bl, height, dur)
}
}
}
fn generator_kind(generator: &GeneratorConfig) -> &'static str {
match generator {
GeneratorConfig::Constant { .. } => "constant",
GeneratorConfig::Uniform { .. } => "uniform",
GeneratorConfig::Sine { .. } => "sine",
GeneratorConfig::Sawtooth { .. } => "sawtooth",
GeneratorConfig::Sequence { .. } => "sequence",
GeneratorConfig::Spike { .. } => "spike",
GeneratorConfig::CsvReplay { .. } => "csv_replay",
GeneratorConfig::Step { .. } => "step",
GeneratorConfig::Flap { .. } => "flap",
GeneratorConfig::Saturation { .. } => "saturation",
GeneratorConfig::Leak { .. } => "leak",
GeneratorConfig::Degradation { .. } => "degradation",
GeneratorConfig::Steady { .. } => "steady",
GeneratorConfig::SpikeEvent { .. } => "spike_event",
}
}
fn timing_to_error(
err: TimingError,
source_id: &str,
ref_id: &str,
generator: &GeneratorConfig,
op: Operator,
value: f64,
) -> CompileAfterError {
let op = op.to_string();
match err {
TimingError::Unsupported { message } => CompileAfterError::UnsupportedGenerator {
source_id: source_id.to_string(),
ref_id: ref_id.to_string(),
generator: generator_kind(generator).to_string(),
op,
reason: message,
},
TimingError::OutOfRange { message } => CompileAfterError::OutOfRangeThreshold {
source_id: source_id.to_string(),
ref_id: ref_id.to_string(),
op,
value,
reason: message,
},
TimingError::Ambiguous { message } => CompileAfterError::AmbiguousAtT0 {
source_id: source_id.to_string(),
ref_id: ref_id.to_string(),
op,
value,
reason: message,
},
TimingError::InvalidDuration {
field,
input,
reason,
} => CompileAfterError::InvalidDuration {
source_id: source_id.to_string(),
field,
input,
reason,
},
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
enum ClockGroupAssignment {
Resolved { name: String, is_auto: bool },
Unassigned,
}
fn assign_clock_groups(
entries: &[ExpandedEntry],
id_to_idx: &BTreeMap<&str, usize>,
) -> Result<Vec<ClockGroupAssignment>, CompileAfterError> {
let n = entries.len();
let mut adj: Vec<Vec<usize>> = vec![Vec::new(); n];
for (i, entry) in entries.iter().enumerate() {
if let Some(clause) = &entry.after {
if let Some(&dep_idx) = id_to_idx.get(clause.ref_id.as_str()) {
adj[i].push(dep_idx);
adj[dep_idx].push(i);
}
}
}
let mut component_id = vec![usize::MAX; n];
let mut components: Vec<Vec<usize>> = Vec::new();
for start in 0..n {
if component_id[start] != usize::MAX {
continue;
}
let cid = components.len();
let mut stack = vec![start];
let mut members = Vec::new();
while let Some(node) = stack.pop() {
if component_id[node] != usize::MAX {
continue;
}
component_id[node] = cid;
members.push(node);
for &next in &adj[node] {
if component_id[next] == usize::MAX {
stack.push(next);
}
}
}
components.push(members);
}
let mut out: Vec<ClockGroupAssignment> =
(0..n).map(|_| ClockGroupAssignment::Unassigned).collect();
for members in &components {
if members.len() < 2 {
continue;
}
let mut distinct: BTreeMap<&str, usize> = BTreeMap::new();
for &idx in members {
if let Some(cg) = entries[idx].clock_group.as_deref() {
if !cg.is_empty() {
distinct.entry(cg).or_insert(idx);
}
}
}
let (resolved, is_auto) = match distinct.len() {
0 => (auto_chain_name(members, entries), true),
1 => {
let (&k, _) = distinct.iter().next().expect("len == 1");
(k.to_string(), false)
}
_ => {
let mut iter = distinct.iter();
let (&first_group, &first_idx) = iter.next().expect("len >= 2");
let (&second_group, &second_idx) = iter.next().expect("len >= 2");
return Err(CompileAfterError::ConflictingClockGroup {
first_entry: source_label(&entries[first_idx]).into_owned(),
first_group: first_group.to_string(),
second_entry: source_label(&entries[second_idx]).into_owned(),
second_group: second_group.to_string(),
});
}
};
for &idx in members {
out[idx] = ClockGroupAssignment::Resolved {
name: resolved.clone(),
is_auto,
};
}
}
Ok(out)
}
fn auto_chain_name(members: &[usize], entries: &[ExpandedEntry]) -> String {
let mut ids: Vec<&str> = members
.iter()
.filter_map(|&i| entries[i].id.as_deref())
.collect();
ids.sort();
let first = ids.first().unwrap_or_else(|| {
unreachable!(
"multi-entry component has no id-bearing member — `after.ref` \
resolution guarantees every linked entry carries an id"
)
});
format!("chain_{first}")
}
fn find_cycle(entries: &[ExpandedEntry], id_to_idx: &BTreeMap<&str, usize>) -> Vec<String> {
#[derive(Clone, Copy, PartialEq, Eq)]
enum Color {
White,
Gray,
Black,
}
let n = entries.len();
let mut color = vec![Color::White; n];
let mut stack: Vec<usize> = Vec::new();
fn dfs(
node: usize,
entries: &[ExpandedEntry],
id_to_idx: &BTreeMap<&str, usize>,
color: &mut [Color],
stack: &mut Vec<usize>,
) -> Option<Vec<usize>> {
color[node] = Color::Gray;
stack.push(node);
if let Some(clause) = &entries[node].after {
if let Some(&dep) = id_to_idx.get(clause.ref_id.as_str()) {
match color[dep] {
Color::White => {
if let Some(cycle) = dfs(dep, entries, id_to_idx, color, stack) {
return Some(cycle);
}
}
Color::Gray => {
let start = stack.iter().position(|&x| x == dep).unwrap_or(0);
let mut cycle: Vec<usize> = stack[start..].to_vec();
cycle.push(dep);
return Some(cycle);
}
Color::Black => {}
}
}
}
color[node] = Color::Black;
stack.pop();
None
}
for start in 0..n {
if color[start] == Color::White {
if let Some(cycle) = dfs(start, entries, id_to_idx, &mut color, &mut stack) {
return cycle
.into_iter()
.map(|i| source_label(&entries[i]).into_owned())
.collect();
}
}
}
vec!["<unknown cycle>".to_string()]
}
fn operator_from(op: &AfterOp) -> Operator {
match op {
AfterOp::LessThan => Operator::LessThan,
AfterOp::GreaterThan => Operator::GreaterThan,
}
}
fn parse_duration_secs(
input: &str,
source_id: &str,
field: &'static str,
) -> Result<f64, CompileAfterError> {
parse_duration(input)
.map(|d| d.as_secs_f64())
.map_err(|e| CompileAfterError::InvalidDuration {
source_id: source_id.to_string(),
field,
input: input.to_string(),
reason: e.to_string(),
})
}
fn duration_or_default(
input: Option<&str>,
default_secs: f64,
field: &'static str,
) -> Result<f64, TimingError> {
match input {
Some(s) => {
parse_duration(s)
.map(|d| d.as_secs_f64())
.map_err(|e| TimingError::InvalidDuration {
field,
input: s.to_string(),
reason: e.to_string(),
})
}
None => Ok(default_secs),
}
}
pub fn format_duration_secs(secs: f64) -> String {
debug_assert!(
secs.is_finite() && secs >= 0.0,
"format_duration_secs received non-finite or negative value: {secs}"
);
if !secs.is_finite() || secs <= 0.0 {
return "0s".to_string();
}
let ms = (secs * 1000.0).round() as u64;
if ms.is_multiple_of(1000) {
let whole_secs = ms / 1000;
if whole_secs > 0 && whole_secs.is_multiple_of(3600) {
return format!("{}h", whole_secs / 3600);
}
if whole_secs > 0 && whole_secs.is_multiple_of(60) {
return format!("{}m", whole_secs / 60);
}
return format!("{whole_secs}s");
}
if ms < 1 {
return format!("{secs}s");
}
let secs_rounded = (ms as f64) / 1000.0;
format!("{secs_rounded}s")
}
#[cfg(test)]
mod tests {
use super::*;
use crate::compiler::expand::expand;
use crate::compiler::expand::InMemoryPackResolver;
use crate::compiler::normalize::normalize;
use crate::compiler::parse::parse;
fn compile(yaml: &str) -> Result<CompiledFile, String> {
compile_with_resolver(yaml, &InMemoryPackResolver::new())
}
fn compile_with_resolver(
yaml: &str,
resolver: &InMemoryPackResolver,
) -> Result<CompiledFile, String> {
let parsed = parse(yaml).map_err(|e| format!("parse: {e}"))?;
let normalized = normalize(parsed).map_err(|e| format!("normalize: {e}"))?;
let expanded = expand(normalized, resolver).map_err(|e| format!("expand: {e}"))?;
compile_after(expanded).map_err(|e| format!("compile_after: {e}"))
}
#[test]
fn unknown_ref_surfaces_available_ids() {
let yaml = r#"
version: 2
scenarios:
- id: cpu
signal_type: metrics
name: cpu_saturation
rate: 1
generator: { type: saturation, baseline: 0, ceiling: 100, time_to_saturate: 60s }
- id: log_entry
signal_type: logs
name: errors
rate: 1
log_generator: { type: template, templates: [{ message: "hi" }] }
after: { ref: nonexistent, op: ">", value: 50 }
"#;
let err = compile(yaml).expect_err("should fail");
assert!(err.contains("nonexistent"), "got: {err}");
assert!(err.contains("Available"), "got: {err}");
}
#[test]
fn self_reference_is_rejected() {
let yaml = r#"
version: 2
scenarios:
- id: loop
signal_type: metrics
name: util
rate: 1
generator: { type: saturation, baseline: 0, ceiling: 100, time_to_saturate: 60s }
after: { ref: loop, op: ">", value: 50 }
"#;
let err = compile(yaml).expect_err("self-ref is rejected");
assert!(err.contains("references itself"), "got: {err}");
}
#[test]
fn saturation_greater_than_sets_offset() {
let yaml = r#"
version: 2
scenarios:
- id: util
signal_type: metrics
name: util
rate: 1
generator: { type: saturation, baseline: 20, ceiling: 85, time_to_saturate: 120s }
- id: follower
signal_type: metrics
name: latency
rate: 1
generator: { type: constant, value: 1 }
after: { ref: util, op: ">", value: 70 }
"#;
let compiled = compile(yaml).expect("should compile");
let follower = &compiled.entries[1];
let expected_secs = (70.0 - 20.0) / (85.0 - 20.0) * 120.0;
let expected_str = format_duration_secs(expected_secs);
assert_eq!(
follower.phase_offset.as_deref(),
Some(expected_str.as_str())
);
}
#[rustfmt::skip]
#[rstest::rstest]
#[case::flap_less_than(r#"
version: 2
scenarios:
- id: link
signal_type: metrics
name: oper_state
rate: 1
generator: { type: flap, up_duration: 60s, down_duration: 30s }
- id: follower
signal_type: metrics
name: util
rate: 1
generator: { type: constant, value: 1 }
after: { ref: link, op: "<", value: 1 }
"#, "1m")]
#[case::spike_event_less_than(r#"
version: 2
scenarios:
- id: burst
signal_type: metrics
name: errs
rate: 1
generator: { type: spike_event, baseline: 0, spike_height: 100, spike_duration: 10s, spike_interval: 60s }
- id: follower
signal_type: metrics
name: recovery
rate: 1
generator: { type: constant, value: 1 }
after: { ref: burst, op: "<", value: 50 }
"#, "10s")]
#[case::step_greater_than(r#"
version: 2
scenarios:
- id: counter
signal_type: metrics
name: req_count
rate: 2
generator: { type: step, start: 0, step_size: 10 }
- id: follower
signal_type: metrics
name: alert
rate: 1
generator: { type: constant, value: 1 }
after: { ref: counter, op: ">", value: 55 }
"#, "3s")]
#[case::sequence_less_than(r#"
version: 2
scenarios:
- id: seq
signal_type: metrics
name: values
rate: 2
generator: { type: sequence, values: [10, 5, 2, 1], repeat: false }
- id: follower
signal_type: metrics
name: alert
rate: 1
generator: { type: constant, value: 1 }
after: { ref: seq, op: "<", value: 3 }
"#, "1s")]
fn follower_phase_offset_matches_expected_crossing(
#[case] yaml: &str,
#[case] expected_offset: &str,
) {
let compiled = compile(yaml).expect("should compile");
assert_eq!(
compiled.entries[1].phase_offset.as_deref(),
Some(expected_offset)
);
}
#[test]
fn step_less_than_is_unsupported() {
let yaml = r#"
version: 2
scenarios:
- id: counter
signal_type: metrics
name: x
rate: 1
generator: { type: step, start: 0, step_size: 1 }
- id: follower
signal_type: metrics
name: y
rate: 1
generator: { type: constant, value: 1 }
after: { ref: counter, op: "<", value: 5 }
"#;
let err = compile(yaml).expect_err("step < is unsupported");
assert!(err.contains("step"), "got: {err}");
}
#[rustfmt::skip]
#[rstest::rstest]
#[case::constant(r#"
version: 2
scenarios:
- id: k
signal_type: metrics
name: k
rate: 1
generator: { type: constant, value: 42 }
- id: follower
signal_type: metrics
name: y
rate: 1
generator: { type: constant, value: 1 }
after: { ref: k, op: ">", value: 100 }
"#, "constant")]
#[case::sine(r#"
version: 2
scenarios:
- id: wave
signal_type: metrics
name: s
rate: 1
generator: { type: sine, amplitude: 10, period_secs: 60, offset: 50 }
- id: follower
signal_type: metrics
name: f
rate: 1
generator: { type: constant, value: 1 }
after: { ref: wave, op: ">", value: 55 }
"#, "sine")]
#[case::steady(r#"
version: 2
scenarios:
- id: base
signal_type: metrics
name: s
rate: 1
generator: { type: steady, center: 50, amplitude: 5, period: 60s }
- id: follower
signal_type: metrics
name: f
rate: 1
generator: { type: constant, value: 1 }
after: { ref: base, op: ">", value: 55 }
"#, "steady")]
#[case::uniform(r#"
version: 2
scenarios:
- id: u
signal_type: metrics
name: u
rate: 1
generator: { type: uniform, min: 0, max: 10, seed: 1 }
- id: follower
signal_type: metrics
name: f
rate: 1
generator: { type: constant, value: 1 }
after: { ref: u, op: ">", value: 5 }
"#, "uniform")]
fn unresolvable_target_generator_is_rejected(
#[case] yaml: &str,
#[case] expected_substring: &str,
) {
let err = compile(yaml).expect_err("target generator must be rejected");
assert!(
err.contains(expected_substring),
"expected error to mention {expected_substring:?}, got: {err}"
);
}
#[test]
fn transitive_chain_accumulates() {
let yaml = r#"
version: 2
scenarios:
- id: a
signal_type: metrics
name: a
rate: 1
generator: { type: flap, up_duration: 60s, down_duration: 30s }
- id: b
signal_type: metrics
name: b
rate: 1
generator: { type: saturation, baseline: 20, ceiling: 85, time_to_saturate: 120s }
after: { ref: a, op: "<", value: 1 }
- id: c
signal_type: metrics
name: c
rate: 1
generator: { type: constant, value: 1 }
after: { ref: b, op: ">", value: 70 }
"#;
let compiled = compile(yaml).expect("chain compiles");
let expected_b_secs = 60.0;
let expected_c_secs = 60.0 + (70.0 - 20.0) / (85.0 - 20.0) * 120.0;
assert_eq!(
compiled.entries[1].phase_offset.as_deref(),
Some(format_duration_secs(expected_b_secs).as_str())
);
assert_eq!(
compiled.entries[2].phase_offset.as_deref(),
Some(format_duration_secs(expected_c_secs).as_str())
);
}
#[test]
fn delay_is_added_to_crossing_time() {
let yaml = r#"
version: 2
scenarios:
- id: link
signal_type: metrics
name: a
rate: 1
generator: { type: flap, up_duration: 60s, down_duration: 30s }
- id: follower
signal_type: metrics
name: b
rate: 1
generator: { type: constant, value: 1 }
after: { ref: link, op: "<", value: 1, delay: 15s }
"#;
let compiled = compile(yaml).expect("compile");
assert_eq!(compiled.entries[1].phase_offset.as_deref(), Some("75s"));
}
#[test]
fn explicit_phase_offset_is_added() {
let yaml = r#"
version: 2
scenarios:
- id: link
signal_type: metrics
name: a
rate: 1
generator: { type: flap, up_duration: 60s, down_duration: 30s }
- id: follower
signal_type: metrics
name: b
rate: 1
generator: { type: constant, value: 1 }
phase_offset: 10s
after: { ref: link, op: "<", value: 1 }
"#;
let compiled = compile(yaml).expect("compile");
assert_eq!(compiled.entries[1].phase_offset.as_deref(), Some("70s"));
}
#[test]
fn phase_offset_delay_and_crossing_sum() {
let yaml = r#"
version: 2
scenarios:
- id: link
signal_type: metrics
name: a
rate: 1
generator: { type: flap, up_duration: 60s, down_duration: 30s }
- id: follower
signal_type: metrics
name: b
rate: 1
generator: { type: constant, value: 1 }
phase_offset: 10s
after: { ref: link, op: "<", value: 1, delay: 5s }
"#;
let compiled = compile(yaml).expect("compile");
assert_eq!(compiled.entries[1].phase_offset.as_deref(), Some("75s"));
}
#[test]
fn two_entry_cycle_is_detected() {
let yaml = r#"
version: 2
scenarios:
- id: a
signal_type: metrics
name: a
rate: 1
generator: { type: saturation, baseline: 0, ceiling: 100, time_to_saturate: 60s }
after: { ref: b, op: ">", value: 1 }
- id: b
signal_type: metrics
name: b
rate: 1
generator: { type: saturation, baseline: 0, ceiling: 100, time_to_saturate: 60s }
after: { ref: a, op: ">", value: 1 }
"#;
let err = compile(yaml).expect_err("cycle should fail");
assert!(err.contains("circular"), "got: {err}");
assert!(err.contains("a") && err.contains("b"), "got: {err}");
}
#[test]
fn three_entry_cycle_path_is_returned() {
let yaml = r#"
version: 2
scenarios:
- id: a
signal_type: metrics
name: a
rate: 1
generator: { type: saturation, baseline: 0, ceiling: 100, time_to_saturate: 60s }
after: { ref: c, op: ">", value: 1 }
- id: b
signal_type: metrics
name: b
rate: 1
generator: { type: saturation, baseline: 0, ceiling: 100, time_to_saturate: 60s }
after: { ref: a, op: ">", value: 1 }
- id: c
signal_type: metrics
name: c
rate: 1
generator: { type: saturation, baseline: 0, ceiling: 100, time_to_saturate: 60s }
after: { ref: b, op: ">", value: 1 }
"#;
let err = compile(yaml).expect_err("cycle should fail");
assert!(err.contains("circular"), "got: {err}");
assert!(
err.contains("a -> "),
"cycle path should have an arrow. got: {err}"
);
}
#[test]
fn clock_group_auto_assigned_as_chain_plus_lowest_id() {
let yaml = r#"
version: 2
scenarios:
- id: alpha
signal_type: metrics
name: a
rate: 1
generator: { type: flap, up_duration: 60s, down_duration: 30s }
- id: bravo
signal_type: metrics
name: b
rate: 1
generator: { type: constant, value: 1 }
after: { ref: alpha, op: "<", value: 1 }
"#;
let compiled = compile(yaml).expect("compile");
assert_eq!(
compiled.entries[0].clock_group.as_deref(),
Some("chain_alpha")
);
assert_eq!(
compiled.entries[1].clock_group.as_deref(),
Some("chain_alpha")
);
}
#[test]
fn explicit_clock_group_propagates_to_chain_members() {
let yaml = r#"
version: 2
scenarios:
- id: alpha
signal_type: metrics
name: a
rate: 1
clock_group: failover
generator: { type: flap, up_duration: 60s, down_duration: 30s }
- id: bravo
signal_type: metrics
name: b
rate: 1
generator: { type: constant, value: 1 }
after: { ref: alpha, op: "<", value: 1 }
"#;
let compiled = compile(yaml).expect("compile");
assert_eq!(compiled.entries[0].clock_group.as_deref(), Some("failover"));
assert_eq!(compiled.entries[1].clock_group.as_deref(), Some("failover"));
}
#[test]
fn conflicting_clock_groups_are_rejected() {
let yaml = r#"
version: 2
scenarios:
- id: alpha
signal_type: metrics
name: a
rate: 1
clock_group: group_a
generator: { type: flap, up_duration: 60s, down_duration: 30s }
- id: bravo
signal_type: metrics
name: b
rate: 1
clock_group: group_b
generator: { type: constant, value: 1 }
after: { ref: alpha, op: "<", value: 1 }
"#;
let err = compile(yaml).expect_err("conflicting groups fail");
assert!(err.contains("conflicting clock_group"), "got: {err}");
assert!(
err.contains("group_a") && err.contains("group_b"),
"got: {err}"
);
}
#[test]
fn independent_signals_keep_no_clock_group() {
let yaml = r#"
version: 2
scenarios:
- id: independent
signal_type: metrics
name: a
rate: 1
generator: { type: saturation, baseline: 0, ceiling: 100, time_to_saturate: 60s }
"#;
let compiled = compile(yaml).expect("compile");
assert!(compiled.entries[0].clock_group.is_none());
}
#[test]
fn clock_group_empty_string_mixed_with_some_x_uses_x() {
let yaml = r#"
version: 2
scenarios:
- id: alpha
signal_type: metrics
name: a
rate: 1
clock_group: ""
generator: { type: flap, up_duration: 60s, down_duration: 30s }
- id: bravo
signal_type: metrics
name: b
rate: 1
clock_group: x
generator: { type: constant, value: 1 }
after: { ref: alpha, op: "<", value: 1 }
"#;
let compiled = compile(yaml).expect("compile");
assert_eq!(compiled.entries[0].clock_group.as_deref(), Some("x"));
assert_eq!(compiled.entries[1].clock_group.as_deref(), Some("x"));
}
#[test]
fn clock_group_whitespace_variants_conflict() {
let yaml = r#"
version: 2
scenarios:
- id: alpha
signal_type: metrics
name: a
rate: 1
clock_group: "x "
generator: { type: flap, up_duration: 60s, down_duration: 30s }
- id: bravo
signal_type: metrics
name: b
rate: 1
clock_group: x
generator: { type: constant, value: 1 }
after: { ref: alpha, op: "<", value: 1 }
"#;
let err = compile(yaml).expect_err("trailing whitespace must conflict");
assert!(err.contains("conflicting clock_group"), "got: {err}");
}
#[test]
fn log_signal_can_depend_on_metrics_target() {
let yaml = r#"
version: 2
scenarios:
- id: err_rate
signal_type: metrics
name: http_error_rate
rate: 1
generator: { type: saturation, baseline: 1, ceiling: 30, time_to_saturate: 90s }
- id: err_logs
signal_type: logs
name: app_logs
rate: 1
log_generator: { type: template, templates: [{ message: "upstream timeout" }] }
after: { ref: err_rate, op: ">", value: 10 }
"#;
let compiled = compile(yaml).expect("cross-signal after compiles");
assert!(compiled.entries[1].phase_offset.is_some());
}
#[test]
fn metrics_entry_cannot_depend_on_logs_target() {
let yaml = r#"
version: 2
scenarios:
- id: log_src
signal_type: logs
name: lg
rate: 1
log_generator: { type: template, templates: [{ message: "hi" }] }
- id: follower
signal_type: metrics
name: f
rate: 1
generator: { type: constant, value: 1 }
after: { ref: log_src, op: ">", value: 0 }
"#;
let err = compile(yaml).expect_err("logs target rejected");
assert!(err.contains("logs signal"), "got: {err}");
}
#[test]
fn flap_alias_produces_expected_up_duration_offset() {
let yaml_alias = r#"
version: 2
scenarios:
- id: link
signal_type: metrics
name: s
rate: 1
generator: { type: flap, up_duration: 60s, down_duration: 30s }
- id: follower
signal_type: metrics
name: f
rate: 1
generator: { type: constant, value: 1 }
after: { ref: link, op: "<", value: 1 }
"#;
let compiled = compile(yaml_alias).expect("compile");
assert_eq!(compiled.entries[1].phase_offset.as_deref(), Some("1m"));
}
fn resolver_with_test_pack() -> InMemoryPackResolver {
let yaml = r#"
name: testpack
category: test
description: test
metrics:
- name: state_flap
generator: { type: flap, up_duration: 60s, down_duration: 30s }
- name: util_sat
generator: { type: saturation, baseline: 0, ceiling: 100, time_to_saturate: 120s }
"#;
let pack =
serde_yaml_ng::from_str::<crate::packs::MetricPackDef>(yaml).expect("pack parses");
let mut r = InMemoryPackResolver::new();
r.insert("testpack", pack);
r
}
#[test]
fn dotted_pack_ref_resolves() {
let yaml = r#"
version: 2
scenarios:
- id: dev
signal_type: metrics
rate: 1
pack: testpack
- id: follower
signal_type: metrics
name: alert
rate: 1
generator: { type: constant, value: 1 }
after: { ref: dev.state_flap, op: "<", value: 1 }
"#;
let compiled = compile_with_resolver(yaml, &resolver_with_test_pack()).expect("compile");
let follower = compiled
.entries
.iter()
.find(|e| e.id.as_deref() == Some("follower"))
.expect("follower present");
assert_eq!(follower.phase_offset.as_deref(), Some("1m"));
}
#[test]
fn ambiguous_bare_pack_ref_is_rejected() {
let pack_yaml = r#"
name: ambig
category: test
description: test
metrics:
- name: cpu_util
labels: { mode: user }
generator: { type: sawtooth, min: 0, max: 100, period_secs: 60 }
- name: cpu_util
labels: { mode: system }
generator: { type: sawtooth, min: 0, max: 100, period_secs: 60 }
"#;
let pack =
serde_yaml_ng::from_str::<crate::packs::MetricPackDef>(pack_yaml).expect("pack parses");
let mut r = InMemoryPackResolver::new();
r.insert("ambig", pack);
let yaml = r#"
version: 2
scenarios:
- id: host
signal_type: metrics
rate: 1
pack: ambig
- id: follower
signal_type: metrics
name: alert
rate: 1
generator: { type: constant, value: 1 }
after: { ref: host.cpu_util, op: ">", value: 50 }
"#;
let err = compile_with_resolver(yaml, &r).expect_err("bare ref is ambiguous");
assert!(err.contains("ambiguous"), "got: {err}");
assert!(
err.contains("host.cpu_util#0") && err.contains("host.cpu_util#1"),
"candidates should be listed. got: {err}"
);
}
#[rustfmt::skip]
#[rstest::rstest]
#[case::after_delay(r#"
version: 2
scenarios:
- id: src
signal_type: metrics
name: a
rate: 1
generator: { type: flap, up_duration: 60s, down_duration: 30s }
- id: follower
signal_type: metrics
name: b
rate: 1
generator: { type: constant, value: 1 }
after: { ref: src, op: "<", value: 1, delay: "10seconds" }
"#, "follower", "after.delay", "10seconds")]
#[case::phase_offset_zero(r#"
version: 2
scenarios:
- id: src
signal_type: metrics
name: a
rate: 1
generator: { type: flap, up_duration: 60s, down_duration: 30s }
- id: follower
signal_type: metrics
name: b
rate: 1
phase_offset: "0s"
generator: { type: constant, value: 1 }
after: { ref: src, op: "<", value: 1 }
"#, "follower", "phase_offset", "0s")]
#[case::alias_flap_up_duration(r#"
version: 2
scenarios:
- id: src
signal_type: metrics
name: a
rate: 1
generator: { type: flap, up_duration: "oops", down_duration: 30s }
- id: follower
signal_type: metrics
name: b
rate: 1
generator: { type: constant, value: 1 }
after: { ref: src, op: "<", value: 1 }
"#, "follower", "flap.up_duration", "oops")]
fn invalid_duration_surfaces_invalid_duration(
#[case] yaml: &str,
#[case] expected_source_id: &str,
#[case] expected_field: &str,
#[case] expected_input: &str,
) {
let err = match compile_after_from_yaml(yaml) {
Err(e) => e,
Ok(_) => panic!("invalid duration must fail"),
};
match err {
CompileAfterError::InvalidDuration {
ref source_id,
field,
ref input,
..
} => {
assert_eq!(source_id, expected_source_id);
assert_eq!(field, expected_field);
assert_eq!(input, expected_input);
}
other => panic!("expected InvalidDuration, got {other:?}"),
}
}
fn compile_after_from_yaml(yaml: &str) -> Result<CompiledFile, CompileAfterError> {
let parsed = parse(yaml).expect("parse");
let normalized = normalize(parsed).expect("normalize");
let expanded = expand(normalized, &InMemoryPackResolver::new()).expect("expand");
compile_after(expanded)
}
#[rustfmt::skip]
#[rstest::rstest]
#[case::whole_seconds(30.0, "30s")]
#[case::whole_minutes(120.0, "2m")]
#[case::whole_hours(3600.0, "1h")]
#[case::zero(0.0, "0s")]
#[case::negative_zero(-0.0, "0s")]
fn format_duration_whole_units(#[case] secs: f64, #[case] expected: &str) {
assert_eq!(format_duration_secs(secs), expected);
}
#[test]
fn format_duration_fractional_seconds_round_trip() {
let result = format_duration_secs(92.307);
let dur = parse_duration(&result).expect("round-trip");
assert!(
(dur.as_secs_f64() - 92.307).abs() < 0.01,
"got {}, expected ~92.307",
dur.as_secs_f64()
);
}
}