use std::collections::{BTreeMap, VecDeque};
use super::expand::{ExpandedEntry, ExpandedFile};
use super::timing::{
self, constant_crossing_secs, csv_replay_crossing_secs, sawtooth_crossing_secs,
sequence_crossing_secs, sine_crossing_secs, spike_crossing_secs, step_crossing_secs,
uniform_crossing_secs, Operator, TimingError,
};
use super::{AfterOp, ClauseKind, DelayClause, WhileClause};
use crate::config::validate::parse_duration;
use crate::config::{
BurstConfig, CardinalitySpikeConfig, DistributionConfig, DynamicLabelConfig, GapConfig,
OnSinkError,
};
use crate::encoder::EncoderConfig;
use crate::generator::{GeneratorConfig, LogGeneratorConfig};
use crate::sink::SinkConfig;
#[derive(Debug, thiserror::Error)]
#[non_exhaustive]
pub enum CompileAfterError {
#[error(
"entry '{source_id}': {clause}.ref '{ref_id}' does not match any signal id in this file. \
Available ids: [{available}]"
)]
UnknownRef {
source_id: String,
ref_id: String,
clause: ClauseKind,
available: String,
},
#[error(
"after.ref '{ref_id}' is ambiguous: pack '{pack_entry_id}' ships multiple specs with \
this metric name. Use one of: [{candidates}]"
)]
AmbiguousSubSignalRef {
ref_id: String,
pack_entry_id: String,
candidates: String,
},
#[error("entry '{source_id}': {clause}.ref references itself")]
SelfReference {
source_id: String,
clause: ClauseKind,
},
#[error("{}", format_cycle(.cycle))]
CircularDependency { cycle: Vec<(String, ClauseKind)> },
#[error(
"entry '{source_id}': after.ref '{ref_id}' uses generator '{generator}' which does \
not support {op} threshold crossings: {reason}"
)]
UnsupportedGenerator {
source_id: String,
ref_id: String,
generator: String,
op: String,
reason: String,
},
#[error("entry '{source_id}': after.ref '{ref_id}' op '{op}' value {value} -- {reason}")]
OutOfRangeThreshold {
source_id: String,
ref_id: String,
op: String,
value: f64,
reason: String,
},
#[error(
"entry '{source_id}': after.ref '{ref_id}' op '{op}' value {value} -- condition is \
true at t=0, timing is ambiguous: {reason}"
)]
AmbiguousAtT0 {
source_id: String,
ref_id: String,
op: String,
value: f64,
reason: String,
},
#[error(
"conflicting clock_group in dependency chain: entry '{first_entry}' has \
clock_group '{first_group}', entry '{second_entry}' has clock_group '{second_group}'"
)]
ConflictingClockGroup {
first_entry: String,
first_group: String,
second_entry: String,
second_group: String,
},
#[error(
"entry '{source_id}': {clause}.ref '{ref_id}' resolves to a {target_signal} signal; \
only metrics signals can be `{clause}` targets"
)]
NonMetricsTarget {
source_id: String,
ref_id: String,
clause: ClauseKind,
target_signal: String,
},
#[error("entry '{source_id}': invalid duration '{input}' in {field}: {reason}")]
InvalidDuration {
source_id: String,
field: &'static str,
input: String,
reason: String,
},
#[error(
"entry '{source_id}': `while:` cannot reference '{ref_id}' — it emits a literal NaN \
({nan}); strict comparisons against NaN never hold and would leave the scenario \
permanently paused"
)]
WhileNanSource {
source_id: String,
ref_id: String,
nan: NanSource,
},
#[error(
"entry '{source_id}': `while:` cannot reference '{ref_id}' — generator \
'{generator_kind}' is data-dependent; only analytical generators are supported as \
`while:` upstreams"
)]
WhileUnsupportedUpstreamGenerator {
source_id: String,
ref_id: String,
generator_kind: &'static str,
},
}
#[derive(Debug, Clone, PartialEq)]
#[non_exhaustive]
pub enum NanSource {
Constant,
SequenceValue {
index: usize,
},
CsvCell {
path: String,
row: usize,
column: usize,
},
}
impl std::fmt::Display for NanSource {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
NanSource::Constant => f.write_str("constant.value: NaN"),
NanSource::SequenceValue { index } => {
write!(f, "sequence.values[{index}]: NaN")
}
NanSource::CsvCell { path, row, column } => {
write!(f, "csv_replay file '{path}' row {row} column {column}: NaN")
}
}
}
}
fn format_cycle(cycle: &[(String, ClauseKind)]) -> String {
if cycle.is_empty() {
return "circular dependency detected: <unknown cycle>".to_string();
}
let edge_kinds = &cycle[..cycle.len().saturating_sub(1)];
let any_while = edge_kinds.iter().any(|(_, k)| *k == ClauseKind::While);
if !any_while {
let names: Vec<&str> = cycle.iter().map(|(name, _)| name.as_str()).collect();
return format!("circular dependency detected: {}", names.join(" -> "));
}
let mut out = String::from("circular dependency detected: ");
for (i, (name, kind)) in cycle.iter().enumerate() {
out.push_str(name);
if i + 1 < cycle.len() {
use std::fmt::Write;
let _ = write!(out, " --[{kind}]--> ");
}
}
out
}
#[derive(Debug, Clone)]
#[cfg_attr(feature = "config", derive(serde::Serialize))]
pub struct CompiledFile {
pub version: u32,
#[cfg_attr(feature = "config", serde(skip_serializing_if = "Option::is_none"))]
pub scenario_name: Option<String>,
pub entries: Vec<CompiledEntry>,
}
#[derive(Debug, Clone)]
#[cfg_attr(feature = "config", derive(serde::Serialize))]
pub struct CompiledEntry {
pub id: Option<String>,
pub signal_type: String,
pub name: String,
pub rate: f64,
pub duration: Option<String>,
pub generator: Option<GeneratorConfig>,
pub log_generator: Option<LogGeneratorConfig>,
pub labels: Option<BTreeMap<String, String>>,
pub dynamic_labels: Option<Vec<DynamicLabelConfig>>,
pub encoder: EncoderConfig,
pub sink: SinkConfig,
pub jitter: Option<f64>,
pub jitter_seed: Option<u64>,
pub gaps: Option<GapConfig>,
pub bursts: Option<BurstConfig>,
pub cardinality_spikes: Option<Vec<CardinalitySpikeConfig>>,
pub phase_offset: Option<String>,
pub clock_group: Option<String>,
pub clock_group_is_auto: bool,
pub distribution: Option<DistributionConfig>,
pub buckets: Option<Vec<f64>>,
pub quantiles: Option<Vec<f64>>,
pub observations_per_tick: Option<u32>,
pub mean_shift_per_sec: Option<f64>,
pub seed: Option<u64>,
pub on_sink_error: OnSinkError,
#[cfg_attr(feature = "config", serde(skip_serializing_if = "Option::is_none"))]
pub while_clause: Option<WhileClause>,
#[cfg_attr(feature = "config", serde(skip_serializing_if = "Option::is_none"))]
pub delay_clause: Option<DelayClause>,
#[cfg_attr(feature = "config", serde(skip_serializing_if = "Option::is_none"))]
pub after_ref: Option<String>,
}
pub fn compile_after(file: ExpandedFile) -> Result<CompiledFile, CompileAfterError> {
let ExpandedFile {
version,
scenario_name,
entries,
} = file;
let id_to_idx = build_id_index(&entries);
for entry in &entries {
let source_id = source_label(entry);
for (ref_id, clause) in outgoing_edges(entry) {
resolve_reference(ref_id, &id_to_idx, &source_id, clause)?;
if entry.id.as_deref() == Some(ref_id) {
return Err(CompileAfterError::SelfReference {
source_id: source_id.clone().into_owned(),
clause,
});
}
}
}
let n = entries.len();
let mut in_degree = vec![0u32; n];
let mut dependents: Vec<Vec<(usize, ClauseKind)>> = vec![Vec::new(); n];
for (i, entry) in entries.iter().enumerate() {
for (ref_id, clause) in outgoing_edges(entry) {
let dep_idx = id_to_idx[ref_id];
in_degree[i] += 1;
dependents[dep_idx].push((i, clause));
}
}
let mut queue: VecDeque<usize> = (0..n).filter(|&i| in_degree[i] == 0).collect();
let mut sorted: Vec<usize> = Vec::with_capacity(n);
while let Some(idx) = queue.pop_front() {
sorted.push(idx);
for &(dependent, _) in &dependents[idx] {
in_degree[dependent] -= 1;
if in_degree[dependent] == 0 {
queue.push_back(dependent);
}
}
}
if sorted.len() < n {
let cycle = find_cycle(&entries, &id_to_idx);
return Err(CompileAfterError::CircularDependency { cycle });
}
for entry in &entries {
let Some(clause) = &entry.while_clause else {
continue;
};
let source_id = source_label(entry).into_owned();
let dep_idx = id_to_idx[clause.ref_id.as_str()];
let target = &entries[dep_idx];
if target.signal_type != "metrics" {
return Err(CompileAfterError::NonMetricsTarget {
source_id,
ref_id: clause.ref_id.clone(),
clause: ClauseKind::While,
target_signal: target.signal_type.clone(),
});
}
if let Some(generator) = target.generator.as_ref() {
if !is_supported_while_upstream(generator) {
return Err(CompileAfterError::WhileUnsupportedUpstreamGenerator {
source_id,
ref_id: clause.ref_id.clone(),
generator_kind: generator_kind(generator),
});
}
if let Some(nan) = detect_nan_source(generator) {
return Err(CompileAfterError::WhileNanSource {
source_id,
ref_id: clause.ref_id.clone(),
nan,
});
}
}
}
let mut total_offsets = vec![0.0_f64; n];
let mut base_offsets = vec![0.0_f64; n];
for (i, entry) in entries.iter().enumerate() {
if let Some(s) = entry.phase_offset.as_deref() {
base_offsets[i] = parse_duration_secs(s, &source_label(entry), "phase_offset")?;
}
}
for &idx in &sorted {
let entry = &entries[idx];
let Some(clause) = &entry.after else {
total_offsets[idx] = base_offsets[idx];
continue;
};
let source_id = source_label(entry).into_owned();
let dep_idx = id_to_idx[clause.ref_id.as_str()];
let target = &entries[dep_idx];
if target.signal_type != "metrics" {
return Err(CompileAfterError::NonMetricsTarget {
source_id,
ref_id: clause.ref_id.clone(),
clause: ClauseKind::After,
target_signal: target.signal_type.clone(),
});
}
let generator = target.generator.as_ref().unwrap_or_else(|| {
unreachable!(
"metrics target '{ref_id}' has no generator — parser and expand \
pass both guarantee metrics entries always carry one",
ref_id = clause.ref_id
)
});
let op = operator_from(&clause.op);
let crossing = crossing_secs(generator, op, clause.value, target.rate).map_err(|err| {
timing_to_error(err, &source_id, &clause.ref_id, generator, op, clause.value)
})?;
let delay = match clause.delay.as_deref() {
Some(s) => parse_duration_secs(s, &source_id, "after.delay")?,
None => 0.0,
};
total_offsets[idx] = base_offsets[idx] + total_offsets[dep_idx] + crossing + delay;
}
let clock_groups = assign_clock_groups(&entries, &id_to_idx)?;
let mut out: Vec<CompiledEntry> = Vec::with_capacity(n);
for (i, entry) in entries.into_iter().enumerate() {
let phase_offset = if entry.after.is_some() || total_offsets[i] != 0.0 {
Some(format_duration_secs(total_offsets[i]))
} else {
entry.phase_offset.clone()
};
let (clock_group, clock_group_is_auto) = match &clock_groups[i] {
ClockGroupAssignment::Resolved { name, is_auto } => (Some(name.clone()), *is_auto),
ClockGroupAssignment::Unassigned => (entry.clock_group.clone(), false),
};
let after_ref = entry.after.as_ref().map(|c| c.ref_id.clone());
out.push(CompiledEntry {
id: entry.id,
signal_type: entry.signal_type,
name: entry.name,
rate: entry.rate,
duration: entry.duration,
generator: entry.generator,
log_generator: entry.log_generator,
labels: entry.labels,
dynamic_labels: entry.dynamic_labels,
encoder: entry.encoder,
sink: entry.sink,
jitter: entry.jitter,
jitter_seed: entry.jitter_seed,
gaps: entry.gaps,
bursts: entry.bursts,
cardinality_spikes: entry.cardinality_spikes,
phase_offset,
clock_group,
clock_group_is_auto,
distribution: entry.distribution,
buckets: entry.buckets,
quantiles: entry.quantiles,
observations_per_tick: entry.observations_per_tick,
mean_shift_per_sec: entry.mean_shift_per_sec,
on_sink_error: entry.on_sink_error,
seed: entry.seed,
while_clause: entry.while_clause,
delay_clause: entry.delay_clause,
after_ref,
});
}
Ok(CompiledFile {
version,
scenario_name,
entries: out,
})
}
fn detect_nan_source(generator: &GeneratorConfig) -> Option<NanSource> {
match generator {
GeneratorConfig::Constant { value } if value.is_nan() => Some(NanSource::Constant),
GeneratorConfig::Sequence { values, .. } => values
.iter()
.position(|v| v.is_nan())
.map(|index| NanSource::SequenceValue { index }),
GeneratorConfig::CsvReplay { file, column, .. } => {
scan_csv_for_nan(file, column.unwrap_or(0))
}
_ => None,
}
}
fn scan_csv_for_nan(path: &str, column: usize) -> Option<NanSource> {
let contents = std::fs::read_to_string(path).ok()?;
for (row, line) in contents.lines().enumerate() {
let trimmed = line.trim();
if trimmed.is_empty() || trimmed.starts_with('#') {
continue;
}
let mut cells = trimmed.split(',');
let cell = cells.nth(column)?.trim();
if is_literal_nan(cell) {
return Some(NanSource::CsvCell {
path: path.to_string(),
row,
column,
});
}
}
None
}
fn is_literal_nan(cell: &str) -> bool {
matches!(cell.to_ascii_lowercase().as_str(), "nan" | "+nan" | "-nan")
}
fn build_id_index(entries: &[ExpandedEntry]) -> BTreeMap<&str, usize> {
let mut idx = BTreeMap::new();
for (i, entry) in entries.iter().enumerate() {
if let Some(id) = entry.id.as_deref() {
idx.insert(id, i);
}
}
idx
}
fn resolve_reference(
ref_id: &str,
id_to_idx: &BTreeMap<&str, usize>,
source_id: &str,
clause: ClauseKind,
) -> Result<usize, CompileAfterError> {
if let Some(&idx) = id_to_idx.get(ref_id) {
return Ok(idx);
}
let prefix = format!("{ref_id}#");
let candidates: Vec<&str> = id_to_idx
.keys()
.filter(|k| k.starts_with(&prefix))
.copied()
.collect();
if !candidates.is_empty() {
let pack_entry_id = ref_id
.rsplit_once('.')
.map(|(left, _)| left.to_string())
.unwrap_or_default();
return Err(CompileAfterError::AmbiguousSubSignalRef {
ref_id: ref_id.to_string(),
pack_entry_id,
candidates: candidates.join(", "),
});
}
let available: Vec<&str> = id_to_idx.keys().copied().collect();
Err(CompileAfterError::UnknownRef {
source_id: source_id.to_string(),
ref_id: ref_id.to_string(),
clause,
available: available.join(", "),
})
}
fn outgoing_edges(entry: &ExpandedEntry) -> impl Iterator<Item = (&str, ClauseKind)> {
entry
.after
.as_ref()
.map(|c| (c.ref_id.as_str(), ClauseKind::After))
.into_iter()
.chain(
entry
.while_clause
.as_ref()
.map(|c| (c.ref_id.as_str(), ClauseKind::While)),
)
}
fn source_label(entry: &ExpandedEntry) -> std::borrow::Cow<'_, str> {
if let Some(id) = entry.id.as_deref() {
std::borrow::Cow::Borrowed(id)
} else {
std::borrow::Cow::Owned(format!("<anonymous:{}>", entry.name))
}
}
fn crossing_secs(
generator: &GeneratorConfig,
op: Operator,
threshold: f64,
rate: f64,
) -> Result<f64, TimingError> {
match generator {
GeneratorConfig::Constant { value } => constant_crossing_secs(op, threshold, *value),
GeneratorConfig::Uniform { .. } => uniform_crossing_secs(),
GeneratorConfig::Sine { .. } => sine_crossing_secs(),
GeneratorConfig::CsvReplay { .. } => csv_replay_crossing_secs(),
GeneratorConfig::Sawtooth {
min,
max,
period_secs,
} => sawtooth_crossing_secs(op, threshold, *min, *max, *period_secs),
GeneratorConfig::Sequence { values, repeat } => {
sequence_crossing_secs(op, threshold, values, *repeat, rate)
}
GeneratorConfig::Step {
start,
step_size,
max,
} => step_crossing_secs(op, threshold, start.unwrap_or(0.0), *step_size, *max, rate),
GeneratorConfig::Spike {
baseline,
magnitude,
duration_secs,
..
} => spike_crossing_secs(op, threshold, *baseline, *magnitude, *duration_secs),
GeneratorConfig::Flap {
up_duration,
down_duration,
up_value,
down_value,
enum_kind,
} => {
let up_secs = duration_or_default(up_duration.as_deref(), 10.0, "flap.up_duration")?;
let down_secs =
duration_or_default(down_duration.as_deref(), 5.0, "flap.down_duration")?;
let (up_default, down_default) = enum_kind.map(|e| e.defaults()).unwrap_or((1.0, 0.0));
let up_val = up_value.unwrap_or(up_default);
let down_val = down_value.unwrap_or(down_default);
timing::flap_crossing_secs(op, threshold, up_secs, down_secs, up_val, down_val)
}
GeneratorConfig::Saturation {
baseline,
ceiling,
time_to_saturate,
} => {
let bl = baseline.unwrap_or(0.0);
let cl = ceiling.unwrap_or(100.0);
let period = duration_or_default(
time_to_saturate.as_deref(),
5.0 * 60.0,
"saturation.time_to_saturate",
)?;
sawtooth_crossing_secs(op, threshold, bl, cl, period)
}
GeneratorConfig::Leak {
baseline,
ceiling,
time_to_ceiling,
} => {
let bl = baseline.unwrap_or(0.0);
let cl = ceiling.unwrap_or(100.0);
let period = duration_or_default(
time_to_ceiling.as_deref(),
10.0 * 60.0,
"leak.time_to_ceiling",
)?;
sawtooth_crossing_secs(op, threshold, bl, cl, period)
}
GeneratorConfig::Degradation {
baseline,
ceiling,
time_to_degrade,
..
} => {
let bl = baseline.unwrap_or(0.0);
let cl = ceiling.unwrap_or(100.0);
let period = duration_or_default(
time_to_degrade.as_deref(),
5.0 * 60.0,
"degradation.time_to_degrade",
)?;
sawtooth_crossing_secs(op, threshold, bl, cl, period)
}
GeneratorConfig::Steady { .. } => timing::steady_crossing_secs(),
GeneratorConfig::SpikeEvent {
baseline,
spike_height,
spike_duration,
..
} => {
let bl = baseline.unwrap_or(0.0);
let height = spike_height.unwrap_or(100.0);
let dur = duration_or_default(
spike_duration.as_deref(),
10.0,
"spike_event.spike_duration",
)?;
spike_crossing_secs(op, threshold, bl, height, dur)
}
}
}
fn is_supported_while_upstream(generator: &GeneratorConfig) -> bool {
!matches!(generator, GeneratorConfig::CsvReplay { .. })
}
fn generator_kind(generator: &GeneratorConfig) -> &'static str {
match generator {
GeneratorConfig::Constant { .. } => "constant",
GeneratorConfig::Uniform { .. } => "uniform",
GeneratorConfig::Sine { .. } => "sine",
GeneratorConfig::Sawtooth { .. } => "sawtooth",
GeneratorConfig::Sequence { .. } => "sequence",
GeneratorConfig::Spike { .. } => "spike",
GeneratorConfig::CsvReplay { .. } => "csv_replay",
GeneratorConfig::Step { .. } => "step",
GeneratorConfig::Flap { .. } => "flap",
GeneratorConfig::Saturation { .. } => "saturation",
GeneratorConfig::Leak { .. } => "leak",
GeneratorConfig::Degradation { .. } => "degradation",
GeneratorConfig::Steady { .. } => "steady",
GeneratorConfig::SpikeEvent { .. } => "spike_event",
}
}
fn timing_to_error(
err: TimingError,
source_id: &str,
ref_id: &str,
generator: &GeneratorConfig,
op: Operator,
value: f64,
) -> CompileAfterError {
let op = op.to_string();
match err {
TimingError::Unsupported { message } => CompileAfterError::UnsupportedGenerator {
source_id: source_id.to_string(),
ref_id: ref_id.to_string(),
generator: generator_kind(generator).to_string(),
op,
reason: message,
},
TimingError::OutOfRange { message } => CompileAfterError::OutOfRangeThreshold {
source_id: source_id.to_string(),
ref_id: ref_id.to_string(),
op,
value,
reason: message,
},
TimingError::Ambiguous { message } => CompileAfterError::AmbiguousAtT0 {
source_id: source_id.to_string(),
ref_id: ref_id.to_string(),
op,
value,
reason: message,
},
TimingError::InvalidDuration {
field,
input,
reason,
} => CompileAfterError::InvalidDuration {
source_id: source_id.to_string(),
field,
input,
reason,
},
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
enum ClockGroupAssignment {
Resolved { name: String, is_auto: bool },
Unassigned,
}
fn assign_clock_groups(
entries: &[ExpandedEntry],
id_to_idx: &BTreeMap<&str, usize>,
) -> Result<Vec<ClockGroupAssignment>, CompileAfterError> {
let n = entries.len();
let mut adj: Vec<Vec<usize>> = vec![Vec::new(); n];
for (i, entry) in entries.iter().enumerate() {
if let Some(clause) = &entry.after {
if let Some(&dep_idx) = id_to_idx.get(clause.ref_id.as_str()) {
adj[i].push(dep_idx);
adj[dep_idx].push(i);
}
}
}
let mut component_id = vec![usize::MAX; n];
let mut components: Vec<Vec<usize>> = Vec::new();
for start in 0..n {
if component_id[start] != usize::MAX {
continue;
}
let cid = components.len();
let mut stack = vec![start];
let mut members = Vec::new();
while let Some(node) = stack.pop() {
if component_id[node] != usize::MAX {
continue;
}
component_id[node] = cid;
members.push(node);
for &next in &adj[node] {
if component_id[next] == usize::MAX {
stack.push(next);
}
}
}
components.push(members);
}
let mut out: Vec<ClockGroupAssignment> =
(0..n).map(|_| ClockGroupAssignment::Unassigned).collect();
for members in &components {
if members.len() < 2 {
continue;
}
let mut distinct: BTreeMap<&str, usize> = BTreeMap::new();
for &idx in members {
if let Some(cg) = entries[idx].clock_group.as_deref() {
if !cg.is_empty() {
distinct.entry(cg).or_insert(idx);
}
}
}
let (resolved, is_auto) = match distinct.len() {
0 => (auto_chain_name(members, entries), true),
1 => {
let (&k, _) = distinct.iter().next().expect("len == 1");
(k.to_string(), false)
}
_ => {
let mut iter = distinct.iter();
let (&first_group, &first_idx) = iter.next().expect("len >= 2");
let (&second_group, &second_idx) = iter.next().expect("len >= 2");
return Err(CompileAfterError::ConflictingClockGroup {
first_entry: source_label(&entries[first_idx]).into_owned(),
first_group: first_group.to_string(),
second_entry: source_label(&entries[second_idx]).into_owned(),
second_group: second_group.to_string(),
});
}
};
for &idx in members {
out[idx] = ClockGroupAssignment::Resolved {
name: resolved.clone(),
is_auto,
};
}
}
Ok(out)
}
fn auto_chain_name(members: &[usize], entries: &[ExpandedEntry]) -> String {
let mut ids: Vec<&str> = members
.iter()
.filter_map(|&i| entries[i].id.as_deref())
.collect();
ids.sort();
let first = ids.first().unwrap_or_else(|| {
unreachable!(
"multi-entry component has no id-bearing member — `after.ref` \
resolution guarantees every linked entry carries an id"
)
});
format!("chain_{first}")
}
fn find_cycle(
entries: &[ExpandedEntry],
id_to_idx: &BTreeMap<&str, usize>,
) -> Vec<(String, ClauseKind)> {
#[derive(Clone, Copy, PartialEq, Eq)]
enum Color {
White,
Gray,
Black,
}
let n = entries.len();
let mut color = vec![Color::White; n];
let mut path: Vec<(usize, ClauseKind)> = Vec::new();
fn dfs(
node: usize,
entries: &[ExpandedEntry],
id_to_idx: &BTreeMap<&str, usize>,
color: &mut [Color],
path: &mut Vec<(usize, ClauseKind)>,
) -> Option<Vec<(usize, ClauseKind)>> {
color[node] = Color::Gray;
path.push((node, ClauseKind::After));
for (ref_id, clause) in outgoing_edges(&entries[node]) {
let Some(&dep) = id_to_idx.get(ref_id) else {
continue;
};
if let Some(last) = path.last_mut() {
last.1 = clause;
}
match color[dep] {
Color::White => {
if let Some(cycle) = dfs(dep, entries, id_to_idx, color, path) {
return Some(cycle);
}
}
Color::Gray => {
let start = path.iter().position(|&(x, _)| x == dep).unwrap_or(0);
let mut cycle: Vec<(usize, ClauseKind)> = path[start..].to_vec();
cycle.push((dep, clause));
return Some(cycle);
}
Color::Black => {}
}
}
color[node] = Color::Black;
path.pop();
None
}
for start in 0..n {
if color[start] == Color::White {
if let Some(cycle) = dfs(start, entries, id_to_idx, &mut color, &mut path) {
return cycle
.into_iter()
.map(|(i, kind)| (source_label(&entries[i]).into_owned(), kind))
.collect();
}
}
}
vec![("<unknown cycle>".to_string(), ClauseKind::After)]
}
fn operator_from(op: &AfterOp) -> Operator {
match op {
AfterOp::LessThan => Operator::LessThan,
AfterOp::GreaterThan => Operator::GreaterThan,
}
}
fn parse_duration_secs(
input: &str,
source_id: &str,
field: &'static str,
) -> Result<f64, CompileAfterError> {
parse_duration(input)
.map(|d| d.as_secs_f64())
.map_err(|e| CompileAfterError::InvalidDuration {
source_id: source_id.to_string(),
field,
input: input.to_string(),
reason: e.to_string(),
})
}
fn duration_or_default(
input: Option<&str>,
default_secs: f64,
field: &'static str,
) -> Result<f64, TimingError> {
match input {
Some(s) => {
parse_duration(s)
.map(|d| d.as_secs_f64())
.map_err(|e| TimingError::InvalidDuration {
field,
input: s.to_string(),
reason: e.to_string(),
})
}
None => Ok(default_secs),
}
}
pub fn format_duration_secs(secs: f64) -> String {
debug_assert!(
secs.is_finite() && secs >= 0.0,
"format_duration_secs received non-finite or negative value: {secs}"
);
if !secs.is_finite() || secs <= 0.0 {
return "0s".to_string();
}
let ms = (secs * 1000.0).round() as u64;
if ms.is_multiple_of(1000) {
let whole_secs = ms / 1000;
if whole_secs > 0 && whole_secs.is_multiple_of(3600) {
return format!("{}h", whole_secs / 3600);
}
if whole_secs > 0 && whole_secs.is_multiple_of(60) {
return format!("{}m", whole_secs / 60);
}
return format!("{whole_secs}s");
}
if ms < 1 {
return format!("{secs}s");
}
let secs_rounded = (ms as f64) / 1000.0;
format!("{secs_rounded}s")
}
#[cfg(test)]
mod tests {
use super::*;
use crate::compiler::expand::expand;
use crate::compiler::expand::InMemoryPackResolver;
use crate::compiler::normalize::normalize;
use crate::compiler::parse::parse;
fn compile(yaml: &str) -> Result<CompiledFile, String> {
compile_with_resolver(yaml, &InMemoryPackResolver::new())
}
fn compile_with_resolver(
yaml: &str,
resolver: &InMemoryPackResolver,
) -> Result<CompiledFile, String> {
let parsed = parse(yaml).map_err(|e| format!("parse: {e}"))?;
let normalized = normalize(parsed).map_err(|e| format!("normalize: {e}"))?;
let expanded = expand(normalized, resolver).map_err(|e| format!("expand: {e}"))?;
compile_after(expanded).map_err(|e| format!("compile_after: {e}"))
}
#[test]
fn unknown_ref_surfaces_available_ids() {
let yaml = r#"
version: 2
scenarios:
- id: cpu
signal_type: metrics
name: cpu_saturation
rate: 1
generator: { type: saturation, baseline: 0, ceiling: 100, time_to_saturate: 60s }
- id: log_entry
signal_type: logs
name: errors
rate: 1
log_generator: { type: template, templates: [{ message: "hi" }] }
after: { ref: nonexistent, op: ">", value: 50 }
"#;
let err = compile(yaml).expect_err("should fail");
assert!(err.contains("nonexistent"), "got: {err}");
assert!(err.contains("Available"), "got: {err}");
}
#[test]
fn self_reference_is_rejected() {
let yaml = r#"
version: 2
scenarios:
- id: loop
signal_type: metrics
name: util
rate: 1
generator: { type: saturation, baseline: 0, ceiling: 100, time_to_saturate: 60s }
after: { ref: loop, op: ">", value: 50 }
"#;
let err = compile(yaml).expect_err("self-ref is rejected");
assert!(err.contains("references itself"), "got: {err}");
}
#[test]
fn saturation_greater_than_sets_offset() {
let yaml = r#"
version: 2
scenarios:
- id: util
signal_type: metrics
name: util
rate: 1
generator: { type: saturation, baseline: 20, ceiling: 85, time_to_saturate: 120s }
- id: follower
signal_type: metrics
name: latency
rate: 1
generator: { type: constant, value: 1 }
after: { ref: util, op: ">", value: 70 }
"#;
let compiled = compile(yaml).expect("should compile");
let follower = &compiled.entries[1];
let expected_secs = (70.0 - 20.0) / (85.0 - 20.0) * 120.0;
let expected_str = format_duration_secs(expected_secs);
assert_eq!(
follower.phase_offset.as_deref(),
Some(expected_str.as_str())
);
}
#[rustfmt::skip]
#[rstest::rstest]
#[case::flap_less_than(r#"
version: 2
scenarios:
- id: link
signal_type: metrics
name: oper_state
rate: 1
generator: { type: flap, up_duration: 60s, down_duration: 30s }
- id: follower
signal_type: metrics
name: util
rate: 1
generator: { type: constant, value: 1 }
after: { ref: link, op: "<", value: 1 }
"#, "1m")]
#[case::spike_event_less_than(r#"
version: 2
scenarios:
- id: burst
signal_type: metrics
name: errs
rate: 1
generator: { type: spike_event, baseline: 0, spike_height: 100, spike_duration: 10s, spike_interval: 60s }
- id: follower
signal_type: metrics
name: recovery
rate: 1
generator: { type: constant, value: 1 }
after: { ref: burst, op: "<", value: 50 }
"#, "10s")]
#[case::step_greater_than(r#"
version: 2
scenarios:
- id: counter
signal_type: metrics
name: req_count
rate: 2
generator: { type: step, start: 0, step_size: 10 }
- id: follower
signal_type: metrics
name: alert
rate: 1
generator: { type: constant, value: 1 }
after: { ref: counter, op: ">", value: 55 }
"#, "3s")]
#[case::sequence_less_than(r#"
version: 2
scenarios:
- id: seq
signal_type: metrics
name: values
rate: 2
generator: { type: sequence, values: [10, 5, 2, 1], repeat: false }
- id: follower
signal_type: metrics
name: alert
rate: 1
generator: { type: constant, value: 1 }
after: { ref: seq, op: "<", value: 3 }
"#, "1s")]
fn follower_phase_offset_matches_expected_crossing(
#[case] yaml: &str,
#[case] expected_offset: &str,
) {
let compiled = compile(yaml).expect("should compile");
assert_eq!(
compiled.entries[1].phase_offset.as_deref(),
Some(expected_offset)
);
}
#[test]
fn step_less_than_is_unsupported() {
let yaml = r#"
version: 2
scenarios:
- id: counter
signal_type: metrics
name: x
rate: 1
generator: { type: step, start: 0, step_size: 1 }
- id: follower
signal_type: metrics
name: y
rate: 1
generator: { type: constant, value: 1 }
after: { ref: counter, op: "<", value: 5 }
"#;
let err = compile(yaml).expect_err("step < is unsupported");
assert!(err.contains("step"), "got: {err}");
}
#[rustfmt::skip]
#[rstest::rstest]
#[case::constant(r#"
version: 2
scenarios:
- id: k
signal_type: metrics
name: k
rate: 1
generator: { type: constant, value: 42 }
- id: follower
signal_type: metrics
name: y
rate: 1
generator: { type: constant, value: 1 }
after: { ref: k, op: ">", value: 100 }
"#, "constant")]
#[case::sine(r#"
version: 2
scenarios:
- id: wave
signal_type: metrics
name: s
rate: 1
generator: { type: sine, amplitude: 10, period_secs: 60, offset: 50 }
- id: follower
signal_type: metrics
name: f
rate: 1
generator: { type: constant, value: 1 }
after: { ref: wave, op: ">", value: 55 }
"#, "sine")]
#[case::steady(r#"
version: 2
scenarios:
- id: base
signal_type: metrics
name: s
rate: 1
generator: { type: steady, center: 50, amplitude: 5, period: 60s }
- id: follower
signal_type: metrics
name: f
rate: 1
generator: { type: constant, value: 1 }
after: { ref: base, op: ">", value: 55 }
"#, "steady")]
#[case::uniform(r#"
version: 2
scenarios:
- id: u
signal_type: metrics
name: u
rate: 1
generator: { type: uniform, min: 0, max: 10, seed: 1 }
- id: follower
signal_type: metrics
name: f
rate: 1
generator: { type: constant, value: 1 }
after: { ref: u, op: ">", value: 5 }
"#, "uniform")]
fn unresolvable_target_generator_is_rejected(
#[case] yaml: &str,
#[case] expected_substring: &str,
) {
let err = compile(yaml).expect_err("target generator must be rejected");
assert!(
err.contains(expected_substring),
"expected error to mention {expected_substring:?}, got: {err}"
);
}
#[test]
fn transitive_chain_accumulates() {
let yaml = r#"
version: 2
scenarios:
- id: a
signal_type: metrics
name: a
rate: 1
generator: { type: flap, up_duration: 60s, down_duration: 30s }
- id: b
signal_type: metrics
name: b
rate: 1
generator: { type: saturation, baseline: 20, ceiling: 85, time_to_saturate: 120s }
after: { ref: a, op: "<", value: 1 }
- id: c
signal_type: metrics
name: c
rate: 1
generator: { type: constant, value: 1 }
after: { ref: b, op: ">", value: 70 }
"#;
let compiled = compile(yaml).expect("chain compiles");
let expected_b_secs = 60.0;
let expected_c_secs = 60.0 + (70.0 - 20.0) / (85.0 - 20.0) * 120.0;
assert_eq!(
compiled.entries[1].phase_offset.as_deref(),
Some(format_duration_secs(expected_b_secs).as_str())
);
assert_eq!(
compiled.entries[2].phase_offset.as_deref(),
Some(format_duration_secs(expected_c_secs).as_str())
);
}
#[test]
fn delay_is_added_to_crossing_time() {
let yaml = r#"
version: 2
scenarios:
- id: link
signal_type: metrics
name: a
rate: 1
generator: { type: flap, up_duration: 60s, down_duration: 30s }
- id: follower
signal_type: metrics
name: b
rate: 1
generator: { type: constant, value: 1 }
after: { ref: link, op: "<", value: 1, delay: 15s }
"#;
let compiled = compile(yaml).expect("compile");
assert_eq!(compiled.entries[1].phase_offset.as_deref(), Some("75s"));
}
#[test]
fn explicit_phase_offset_is_added() {
let yaml = r#"
version: 2
scenarios:
- id: link
signal_type: metrics
name: a
rate: 1
generator: { type: flap, up_duration: 60s, down_duration: 30s }
- id: follower
signal_type: metrics
name: b
rate: 1
generator: { type: constant, value: 1 }
phase_offset: 10s
after: { ref: link, op: "<", value: 1 }
"#;
let compiled = compile(yaml).expect("compile");
assert_eq!(compiled.entries[1].phase_offset.as_deref(), Some("70s"));
}
#[test]
fn phase_offset_delay_and_crossing_sum() {
let yaml = r#"
version: 2
scenarios:
- id: link
signal_type: metrics
name: a
rate: 1
generator: { type: flap, up_duration: 60s, down_duration: 30s }
- id: follower
signal_type: metrics
name: b
rate: 1
generator: { type: constant, value: 1 }
phase_offset: 10s
after: { ref: link, op: "<", value: 1, delay: 5s }
"#;
let compiled = compile(yaml).expect("compile");
assert_eq!(compiled.entries[1].phase_offset.as_deref(), Some("75s"));
}
#[test]
fn two_entry_cycle_is_detected() {
let yaml = r#"
version: 2
scenarios:
- id: a
signal_type: metrics
name: a
rate: 1
generator: { type: saturation, baseline: 0, ceiling: 100, time_to_saturate: 60s }
after: { ref: b, op: ">", value: 1 }
- id: b
signal_type: metrics
name: b
rate: 1
generator: { type: saturation, baseline: 0, ceiling: 100, time_to_saturate: 60s }
after: { ref: a, op: ">", value: 1 }
"#;
let err = compile(yaml).expect_err("cycle should fail");
assert!(err.contains("circular"), "got: {err}");
assert!(err.contains("a") && err.contains("b"), "got: {err}");
}
#[test]
fn three_entry_cycle_path_is_returned() {
let yaml = r#"
version: 2
scenarios:
- id: a
signal_type: metrics
name: a
rate: 1
generator: { type: saturation, baseline: 0, ceiling: 100, time_to_saturate: 60s }
after: { ref: c, op: ">", value: 1 }
- id: b
signal_type: metrics
name: b
rate: 1
generator: { type: saturation, baseline: 0, ceiling: 100, time_to_saturate: 60s }
after: { ref: a, op: ">", value: 1 }
- id: c
signal_type: metrics
name: c
rate: 1
generator: { type: saturation, baseline: 0, ceiling: 100, time_to_saturate: 60s }
after: { ref: b, op: ">", value: 1 }
"#;
let err = compile(yaml).expect_err("cycle should fail");
assert!(err.contains("circular"), "got: {err}");
assert!(
err.contains("a -> "),
"cycle path should have an arrow. got: {err}"
);
}
#[test]
fn clock_group_auto_assigned_as_chain_plus_lowest_id() {
let yaml = r#"
version: 2
scenarios:
- id: alpha
signal_type: metrics
name: a
rate: 1
generator: { type: flap, up_duration: 60s, down_duration: 30s }
- id: bravo
signal_type: metrics
name: b
rate: 1
generator: { type: constant, value: 1 }
after: { ref: alpha, op: "<", value: 1 }
"#;
let compiled = compile(yaml).expect("compile");
assert_eq!(
compiled.entries[0].clock_group.as_deref(),
Some("chain_alpha")
);
assert_eq!(
compiled.entries[1].clock_group.as_deref(),
Some("chain_alpha")
);
}
#[test]
fn explicit_clock_group_propagates_to_chain_members() {
let yaml = r#"
version: 2
scenarios:
- id: alpha
signal_type: metrics
name: a
rate: 1
clock_group: failover
generator: { type: flap, up_duration: 60s, down_duration: 30s }
- id: bravo
signal_type: metrics
name: b
rate: 1
generator: { type: constant, value: 1 }
after: { ref: alpha, op: "<", value: 1 }
"#;
let compiled = compile(yaml).expect("compile");
assert_eq!(compiled.entries[0].clock_group.as_deref(), Some("failover"));
assert_eq!(compiled.entries[1].clock_group.as_deref(), Some("failover"));
}
#[test]
fn conflicting_clock_groups_are_rejected() {
let yaml = r#"
version: 2
scenarios:
- id: alpha
signal_type: metrics
name: a
rate: 1
clock_group: group_a
generator: { type: flap, up_duration: 60s, down_duration: 30s }
- id: bravo
signal_type: metrics
name: b
rate: 1
clock_group: group_b
generator: { type: constant, value: 1 }
after: { ref: alpha, op: "<", value: 1 }
"#;
let err = compile(yaml).expect_err("conflicting groups fail");
assert!(err.contains("conflicting clock_group"), "got: {err}");
assert!(
err.contains("group_a") && err.contains("group_b"),
"got: {err}"
);
}
#[test]
fn independent_signals_keep_no_clock_group() {
let yaml = r#"
version: 2
scenarios:
- id: independent
signal_type: metrics
name: a
rate: 1
generator: { type: saturation, baseline: 0, ceiling: 100, time_to_saturate: 60s }
"#;
let compiled = compile(yaml).expect("compile");
assert!(compiled.entries[0].clock_group.is_none());
}
#[test]
fn clock_group_empty_string_mixed_with_some_x_uses_x() {
let yaml = r#"
version: 2
scenarios:
- id: alpha
signal_type: metrics
name: a
rate: 1
clock_group: ""
generator: { type: flap, up_duration: 60s, down_duration: 30s }
- id: bravo
signal_type: metrics
name: b
rate: 1
clock_group: x
generator: { type: constant, value: 1 }
after: { ref: alpha, op: "<", value: 1 }
"#;
let compiled = compile(yaml).expect("compile");
assert_eq!(compiled.entries[0].clock_group.as_deref(), Some("x"));
assert_eq!(compiled.entries[1].clock_group.as_deref(), Some("x"));
}
#[test]
fn clock_group_whitespace_variants_conflict() {
let yaml = r#"
version: 2
scenarios:
- id: alpha
signal_type: metrics
name: a
rate: 1
clock_group: "x "
generator: { type: flap, up_duration: 60s, down_duration: 30s }
- id: bravo
signal_type: metrics
name: b
rate: 1
clock_group: x
generator: { type: constant, value: 1 }
after: { ref: alpha, op: "<", value: 1 }
"#;
let err = compile(yaml).expect_err("trailing whitespace must conflict");
assert!(err.contains("conflicting clock_group"), "got: {err}");
}
#[test]
fn log_signal_can_depend_on_metrics_target() {
let yaml = r#"
version: 2
scenarios:
- id: err_rate
signal_type: metrics
name: http_error_rate
rate: 1
generator: { type: saturation, baseline: 1, ceiling: 30, time_to_saturate: 90s }
- id: err_logs
signal_type: logs
name: app_logs
rate: 1
log_generator: { type: template, templates: [{ message: "upstream timeout" }] }
after: { ref: err_rate, op: ">", value: 10 }
"#;
let compiled = compile(yaml).expect("cross-signal after compiles");
assert!(compiled.entries[1].phase_offset.is_some());
}
#[test]
fn metrics_entry_cannot_depend_on_logs_target() {
let yaml = r#"
version: 2
scenarios:
- id: log_src
signal_type: logs
name: lg
rate: 1
log_generator: { type: template, templates: [{ message: "hi" }] }
- id: follower
signal_type: metrics
name: f
rate: 1
generator: { type: constant, value: 1 }
after: { ref: log_src, op: ">", value: 0 }
"#;
let err = compile(yaml).expect_err("logs target rejected");
assert!(err.contains("logs signal"), "got: {err}");
}
#[test]
fn flap_alias_produces_expected_up_duration_offset() {
let yaml_alias = r#"
version: 2
scenarios:
- id: link
signal_type: metrics
name: s
rate: 1
generator: { type: flap, up_duration: 60s, down_duration: 30s }
- id: follower
signal_type: metrics
name: f
rate: 1
generator: { type: constant, value: 1 }
after: { ref: link, op: "<", value: 1 }
"#;
let compiled = compile(yaml_alias).expect("compile");
assert_eq!(compiled.entries[1].phase_offset.as_deref(), Some("1m"));
}
fn resolver_with_test_pack() -> InMemoryPackResolver {
let yaml = r#"
name: testpack
category: test
description: test
metrics:
- name: state_flap
generator: { type: flap, up_duration: 60s, down_duration: 30s }
- name: util_sat
generator: { type: saturation, baseline: 0, ceiling: 100, time_to_saturate: 120s }
"#;
let pack =
serde_yaml_ng::from_str::<crate::packs::MetricPackDef>(yaml).expect("pack parses");
let mut r = InMemoryPackResolver::new();
r.insert("testpack", pack);
r
}
#[test]
fn dotted_pack_ref_resolves() {
let yaml = r#"
version: 2
scenarios:
- id: dev
signal_type: metrics
rate: 1
pack: testpack
- id: follower
signal_type: metrics
name: alert
rate: 1
generator: { type: constant, value: 1 }
after: { ref: dev.state_flap, op: "<", value: 1 }
"#;
let compiled = compile_with_resolver(yaml, &resolver_with_test_pack()).expect("compile");
let follower = compiled
.entries
.iter()
.find(|e| e.id.as_deref() == Some("follower"))
.expect("follower present");
assert_eq!(follower.phase_offset.as_deref(), Some("1m"));
}
#[test]
fn ambiguous_bare_pack_ref_is_rejected() {
let pack_yaml = r#"
name: ambig
category: test
description: test
metrics:
- name: cpu_util
labels: { mode: user }
generator: { type: sawtooth, min: 0, max: 100, period_secs: 60 }
- name: cpu_util
labels: { mode: system }
generator: { type: sawtooth, min: 0, max: 100, period_secs: 60 }
"#;
let pack =
serde_yaml_ng::from_str::<crate::packs::MetricPackDef>(pack_yaml).expect("pack parses");
let mut r = InMemoryPackResolver::new();
r.insert("ambig", pack);
let yaml = r#"
version: 2
scenarios:
- id: host
signal_type: metrics
rate: 1
pack: ambig
- id: follower
signal_type: metrics
name: alert
rate: 1
generator: { type: constant, value: 1 }
after: { ref: host.cpu_util, op: ">", value: 50 }
"#;
let err = compile_with_resolver(yaml, &r).expect_err("bare ref is ambiguous");
assert!(err.contains("ambiguous"), "got: {err}");
assert!(
err.contains("host.cpu_util#0") && err.contains("host.cpu_util#1"),
"candidates should be listed. got: {err}"
);
}
#[rustfmt::skip]
#[rstest::rstest]
#[case::after_delay(r#"
version: 2
scenarios:
- id: src
signal_type: metrics
name: a
rate: 1
generator: { type: flap, up_duration: 60s, down_duration: 30s }
- id: follower
signal_type: metrics
name: b
rate: 1
generator: { type: constant, value: 1 }
after: { ref: src, op: "<", value: 1, delay: "10seconds" }
"#, "follower", "after.delay", "10seconds")]
#[case::phase_offset_zero(r#"
version: 2
scenarios:
- id: src
signal_type: metrics
name: a
rate: 1
generator: { type: flap, up_duration: 60s, down_duration: 30s }
- id: follower
signal_type: metrics
name: b
rate: 1
phase_offset: "0s"
generator: { type: constant, value: 1 }
after: { ref: src, op: "<", value: 1 }
"#, "follower", "phase_offset", "0s")]
#[case::alias_flap_up_duration(r#"
version: 2
scenarios:
- id: src
signal_type: metrics
name: a
rate: 1
generator: { type: flap, up_duration: "oops", down_duration: 30s }
- id: follower
signal_type: metrics
name: b
rate: 1
generator: { type: constant, value: 1 }
after: { ref: src, op: "<", value: 1 }
"#, "follower", "flap.up_duration", "oops")]
fn invalid_duration_surfaces_invalid_duration(
#[case] yaml: &str,
#[case] expected_source_id: &str,
#[case] expected_field: &str,
#[case] expected_input: &str,
) {
let err = match compile_after_from_yaml(yaml) {
Err(e) => e,
Ok(_) => panic!("invalid duration must fail"),
};
match err {
CompileAfterError::InvalidDuration {
ref source_id,
field,
ref input,
..
} => {
assert_eq!(source_id, expected_source_id);
assert_eq!(field, expected_field);
assert_eq!(input, expected_input);
}
other => panic!("expected InvalidDuration, got {other:?}"),
}
}
fn compile_after_from_yaml(yaml: &str) -> Result<CompiledFile, CompileAfterError> {
let parsed = parse(yaml).expect("parse");
let normalized = normalize(parsed).expect("normalize");
let expanded = expand(normalized, &InMemoryPackResolver::new()).expect("expand");
compile_after(expanded)
}
#[rustfmt::skip]
#[rstest::rstest]
#[case::whole_seconds(30.0, "30s")]
#[case::whole_minutes(120.0, "2m")]
#[case::whole_hours(3600.0, "1h")]
#[case::zero(0.0, "0s")]
#[case::negative_zero(-0.0, "0s")]
fn format_duration_whole_units(#[case] secs: f64, #[case] expected: &str) {
assert_eq!(format_duration_secs(secs), expected);
}
#[test]
fn format_duration_fractional_seconds_round_trip() {
let result = format_duration_secs(92.307);
let dur = parse_duration(&result).expect("round-trip");
assert!(
(dur.as_secs_f64() - 92.307).abs() < 0.01,
"got {}, expected ~92.307",
dur.as_secs_f64()
);
}
#[test]
fn outgoing_edges_yields_after_then_while() {
use crate::compiler::{WhileClause, WhileOp};
let mut e = ExpandedEntry {
id: Some("x".to_string()),
signal_type: "metrics".to_string(),
name: "x".to_string(),
rate: 1.0,
duration: None,
generator: None,
log_generator: None,
labels: None,
dynamic_labels: None,
encoder: crate::encoder::EncoderConfig::PrometheusText { precision: None },
sink: crate::sink::SinkConfig::Stdout,
jitter: None,
jitter_seed: None,
gaps: None,
bursts: None,
cardinality_spikes: None,
phase_offset: None,
clock_group: None,
after: Some(crate::compiler::AfterClause {
ref_id: "a_target".to_string(),
op: AfterOp::GreaterThan,
value: 0.0,
delay: None,
}),
while_clause: Some(WhileClause {
ref_id: "w_target".to_string(),
op: WhileOp::LessThan,
value: 0.0,
}),
delay_clause: None,
distribution: None,
buckets: None,
quantiles: None,
observations_per_tick: None,
mean_shift_per_sec: None,
seed: None,
on_sink_error: crate::OnSinkError::Warn,
};
let edges: Vec<_> = outgoing_edges(&e).collect();
assert_eq!(
edges,
vec![
("a_target", ClauseKind::After),
("w_target", ClauseKind::While)
]
);
e.while_clause = None;
let edges_after_only: Vec<_> = outgoing_edges(&e).collect();
assert_eq!(edges_after_only, vec![("a_target", ClauseKind::After)]);
e.after = None;
let edges_none: Vec<_> = outgoing_edges(&e).collect();
assert!(edges_none.is_empty());
}
#[test]
fn while_yaml_compiles_and_propagates_clause() {
let yaml = r#"
version: 2
defaults:
rate: 1
duration: 1m
scenarios:
- id: link
signal_type: metrics
name: link
generator: { type: flap, up_duration: 60s, down_duration: 30s }
- id: dependent
signal_type: metrics
name: dependent
generator: { type: constant, value: 1 }
while: { ref: link, op: ">", value: 0 }
"#;
let compiled = compile_after_from_yaml(yaml).expect("while: must compile");
let dep = compiled
.entries
.iter()
.find(|e| e.id.as_deref() == Some("dependent"))
.expect("dependent entry present");
let w = dep.while_clause.as_ref().expect("while propagates");
assert_eq!(w.ref_id, "link");
}
#[test]
fn defaults_duration_carries_into_while_compiled_entry() {
let yaml = r#"
version: 2
defaults:
rate: 1
duration: 5m
scenarios:
- id: link
signal_type: metrics
name: link
generator: { type: flap, up_duration: 60s, down_duration: 30s }
- id: dependent
signal_type: metrics
name: dependent
generator: { type: constant, value: 1 }
while: { ref: link, op: ">", value: 0 }
"#;
let compiled = compile_after_from_yaml(yaml).expect("while: must compile");
let dep = compiled
.entries
.iter()
.find(|e| e.id.as_deref() == Some("dependent"))
.expect("dependent entry present");
assert!(dep.while_clause.is_some());
assert_eq!(dep.duration.as_deref(), Some("5m"));
}
#[test]
fn mixed_after_while_cycle_uses_labeled_format() {
let yaml = r#"
version: 2
defaults:
rate: 1
duration: 10m
scenarios:
- id: a
signal_type: metrics
name: a
generator: { type: saturation, baseline: 0, ceiling: 100, time_to_saturate: 60s }
after: { ref: b, op: ">", value: 1 }
- id: b
signal_type: metrics
name: b
generator: { type: saturation, baseline: 0, ceiling: 100, time_to_saturate: 60s }
while: { ref: a, op: ">", value: 0 }
"#;
let err = compile_after_from_yaml(yaml).expect_err("mixed cycle must fail");
match err {
CompileAfterError::CircularDependency { ref cycle } => {
let edge_kinds: Vec<_> = cycle[..cycle.len() - 1].iter().map(|(_, k)| *k).collect();
assert!(
edge_kinds.contains(&ClauseKind::While),
"mixed cycle must include a While edge: {cycle:?}"
);
let display = err.to_string();
assert!(
display.contains("--[after]-->") && display.contains("--[while]-->"),
"mixed cycle must render labeled arrows. got: {display}"
);
}
other => panic!("expected CircularDependency, got {other:?}"),
}
}
#[test]
fn pure_after_cycle_keeps_short_arrow_format() {
let yaml = r#"
version: 2
defaults:
rate: 1
scenarios:
- id: a
signal_type: metrics
name: a
generator: { type: saturation, baseline: 0, ceiling: 100, time_to_saturate: 60s }
after: { ref: b, op: ">", value: 1 }
- id: b
signal_type: metrics
name: b
generator: { type: saturation, baseline: 0, ceiling: 100, time_to_saturate: 60s }
after: { ref: a, op: ">", value: 1 }
"#;
let err = compile_after_from_yaml(yaml).expect_err("pure-after cycle must fail");
let display = err.to_string();
assert!(
display.contains(" -> ") && !display.contains("--["),
"pure-after cycles must use the short arrow form. got: {display}"
);
}
#[test]
fn deep_while_chain_compiles_quickly() {
use std::fmt::Write;
let mut yaml =
String::from("version: 2\ndefaults:\n rate: 1\n duration: 1h\nscenarios:\n");
let _ = writeln!(
yaml,
" - id: n0\n signal_type: metrics\n name: n0\n generator: {{ type: constant, value: 1 }}"
);
for i in 1..200 {
let _ = writeln!(yaml,
" - id: n{i}\n signal_type: metrics\n name: n{i}\n generator: {{ type: constant, value: 1 }}\n while: {{ ref: n{prev}, op: \">\", value: 0 }}",
prev = i - 1);
}
let start = std::time::Instant::now();
let compiled = compile_after_from_yaml(&yaml).expect("deep chain must compile");
let elapsed = start.elapsed();
assert_eq!(compiled.entries.len(), 200);
assert!(
elapsed < std::time::Duration::from_secs(1),
"200-node while: chain took {elapsed:?}; compile pipeline regressed"
);
}
#[test]
fn self_while_reference_is_rejected_with_while_kind() {
let yaml = r#"
version: 2
defaults:
rate: 1
duration: 1m
scenarios:
- id: loop_w
signal_type: metrics
name: loop_w
generator: { type: saturation, baseline: 0, ceiling: 100, time_to_saturate: 60s }
while: { ref: loop_w, op: ">", value: 0 }
"#;
let err = compile_after_from_yaml(yaml).expect_err("self-while must fail");
match err {
CompileAfterError::SelfReference { source_id, clause } => {
assert_eq!(source_id, "loop_w");
assert_eq!(clause, ClauseKind::While);
}
other => panic!("expected SelfReference(While), got {other:?}"),
}
}
#[test]
fn while_targeting_logs_signal_is_rejected() {
let yaml = r#"
version: 2
defaults:
rate: 1
duration: 1m
scenarios:
- id: log_src
signal_type: logs
name: lg
log_generator: { type: template, templates: [{ message: "hi" }] }
- id: gated
signal_type: metrics
name: gated
generator: { type: constant, value: 1 }
while: { ref: log_src, op: ">", value: 0 }
"#;
let err = compile_after_from_yaml(yaml).expect_err("non-metrics while target must fail");
match err {
CompileAfterError::NonMetricsTarget {
ref_id,
clause,
target_signal,
..
} => {
assert_eq!(ref_id, "log_src");
assert_eq!(clause, ClauseKind::While);
assert_eq!(target_signal, "logs");
}
other => panic!("expected NonMetricsTarget(While), got {other:?}"),
}
}
#[test]
fn while_against_nan_constant_is_rejected() {
let yaml = r#"
version: 2
defaults:
rate: 1
duration: 1m
scenarios:
- id: src
signal_type: metrics
name: src
generator: { type: constant, value: .nan }
- id: gated
signal_type: metrics
name: gated
generator: { type: constant, value: 1 }
while: { ref: src, op: ">", value: 0 }
"#;
let err = compile_after_from_yaml(yaml).expect_err("constant NaN must reject");
match err {
CompileAfterError::WhileNanSource {
ref_id,
nan: NanSource::Constant,
..
} => {
assert_eq!(ref_id, "src");
}
other => panic!("expected WhileNanSource(Constant), got {other:?}"),
}
}
#[test]
fn while_against_nan_sequence_value_is_rejected() {
let yaml = r#"
version: 2
defaults:
rate: 1
duration: 1m
scenarios:
- id: src
signal_type: metrics
name: src
generator: { type: sequence, values: [1, 2, .nan, 3], repeat: false }
- id: gated
signal_type: metrics
name: gated
generator: { type: constant, value: 1 }
while: { ref: src, op: ">", value: 0 }
"#;
let err = compile_after_from_yaml(yaml).expect_err("sequence NaN must reject");
match err {
CompileAfterError::WhileNanSource {
nan: NanSource::SequenceValue { index },
..
} => {
assert_eq!(index, 2);
}
other => panic!("expected WhileNanSource(SequenceValue), got {other:?}"),
}
}
#[test]
fn while_against_csv_replay_upstream_is_rejected() {
let dir = std::env::temp_dir().join("sonda-while-csv-upstream");
std::fs::create_dir_all(&dir).expect("tempdir");
let path = dir.join("ok.csv");
std::fs::write(&path, "1\n2\n3\n").expect("write csv");
let yaml = format!(
r#"
version: 2
defaults:
rate: 1
duration: 1m
scenarios:
- id: src
signal_type: metrics
name: src
generator: {{ type: csv_replay, file: "{path}" }}
- id: gated
signal_type: metrics
name: gated
generator: {{ type: constant, value: 1 }}
while: {{ ref: src, op: ">", value: 0 }}
"#,
path = path.display()
);
let err = compile_after_from_yaml(&yaml).expect_err("csv_replay upstream must reject");
match err {
CompileAfterError::WhileUnsupportedUpstreamGenerator {
ref_id,
generator_kind,
..
} => {
assert_eq!(ref_id, "src");
assert_eq!(generator_kind, "csv_replay");
}
other => panic!("expected WhileUnsupportedUpstreamGenerator, got {other:?}"),
}
std::fs::remove_file(&path).ok();
}
#[test]
fn while_against_log_template_upstream_is_rejected_as_non_metrics() {
let yaml = r#"
version: 2
defaults:
rate: 1
duration: 1m
scenarios:
- id: src
signal_type: logs
name: lg
log_generator: { type: template, templates: [{ message: "hi" }] }
- id: gated
signal_type: metrics
name: gated
generator: { type: constant, value: 1 }
while: { ref: src, op: ">", value: 0 }
"#;
let err = compile_after_from_yaml(yaml).expect_err("log upstream must reject");
assert!(
matches!(err, CompileAfterError::NonMetricsTarget { .. }),
"expected NonMetricsTarget, got {err:?}"
);
}
#[rustfmt::skip]
#[rstest::rstest]
#[case::le("<=")]
#[case::ge(">=")]
#[case::eq("==")]
#[case::ne("!=")]
fn while_strict_operators_reject_non_strict(#[case] op: &str) {
let yaml = format!(r#"
version: 2
defaults:
rate: 1
duration: 1m
scenarios:
- id: src
signal_type: metrics
name: src
generator: {{ type: constant, value: 1 }}
- id: gated
signal_type: metrics
name: gated
generator: {{ type: constant, value: 1 }}
while: {{ ref: src, op: "{op}", value: 1 }}
"#);
let err = parse(&yaml).expect_err("non-strict op must fail at parse");
let msg = err.to_string();
assert!(
msg.contains("unsupported operator") && msg.contains("strict"),
"error must use the 'unsupported operator … strict' wording. got: {msg}"
);
}
}