use std::sync::atomic::AtomicBool;
use std::sync::{Arc, RwLock};
use crate::config::ScenarioConfig;
use crate::encoder::create_encoder;
use crate::generator::create_generator;
use crate::model::metric::{Labels, MetricEvent, ValidatedMetricName};
use crate::schedule::core_loop::{self, GateContext, TickContext, TickResult};
use crate::schedule::gate_bus::GateBus;
use crate::schedule::is_in_spike;
use crate::schedule::stats::ScenarioStats;
use crate::schedule::ParsedSchedule;
use crate::sink::{create_sink, Sink};
use crate::SondaError;
pub fn run(config: &ScenarioConfig) -> Result<(), SondaError> {
let mut sink = create_sink(&config.sink, None)?;
run_with_sink(config, sink.as_mut(), None, None)
}
pub fn run_with_sink(
config: &ScenarioConfig,
sink: &mut dyn Sink,
shutdown: Option<&AtomicBool>,
stats: Option<Arc<RwLock<ScenarioStats>>>,
) -> Result<(), SondaError> {
run_with_sink_gated(config, sink, shutdown, stats, None, None)
}
pub fn run_with_sink_gated(
config: &ScenarioConfig,
sink: &mut dyn Sink,
shutdown: Option<&AtomicBool>,
stats: Option<Arc<RwLock<ScenarioStats>>>,
upstream_bus: Option<Arc<GateBus>>,
gate_ctx: Option<GateContext>,
) -> Result<(), SondaError> {
let schedule = ParsedSchedule::from_base_config(&config.base)?;
let generator = create_generator(&config.generator, config.rate)?;
let generator =
crate::generator::wrap_with_jitter(generator, config.base.jitter, config.base.jitter_seed);
let encoder = create_encoder(&config.encoder)?;
let labels: Arc<Labels> = {
let inner = if let Some(ref label_map) = config.labels {
let pairs: Vec<(&str, &str)> = label_map
.iter()
.map(|(k, v)| (k.as_str(), v.as_str()))
.collect();
Labels::from_pairs(&pairs)?
} else {
Labels::from_pairs(&[])?
};
Arc::new(inner)
};
let name = ValidatedMetricName::new(&config.name)?;
let mut buf: Vec<u8> = Vec::with_capacity(256);
let upstream_bus_for_tick = upstream_bus.clone();
let mut tick_fn = |ctx: &TickContext<'_>| -> Result<TickResult, SondaError> {
let wall_now = std::time::SystemTime::now();
let value = generator.value(ctx.tick);
if let Some(ref bus) = upstream_bus_for_tick {
bus.tick(value);
}
let needs_dynamic = !ctx.dynamic_labels.is_empty();
let tick_labels: Arc<Labels> = if ctx.spike_windows.is_empty() && !needs_dynamic {
Arc::clone(&labels)
} else {
let mut mutated: Option<Labels> = None;
if needs_dynamic {
let tl = mutated.get_or_insert_with(|| (*labels).clone());
for dl in ctx.dynamic_labels {
tl.insert(dl.key.clone(), dl.label_value_for_tick(ctx.tick));
}
}
for sw in ctx.spike_windows {
if is_in_spike(ctx.elapsed, sw) {
let tl = mutated.get_or_insert_with(|| (*labels).clone());
tl.insert(sw.label.clone(), sw.label_value_for_tick(ctx.tick));
}
}
match mutated {
Some(tl) => Arc::new(tl),
None => Arc::clone(&labels),
}
};
let event = MetricEvent::from_parts(name.clone(), value, tick_labels, wall_now);
buf.clear();
encoder.encode_metric(&event, &mut buf)?;
let bytes_written = buf.len() as u64;
sink.write(&buf)?;
Ok(TickResult {
bytes_written,
metric_event: Some(event),
})
};
let stats_for_flush = stats.clone();
let loop_result = match gate_ctx {
None => core_loop::run_schedule_loop(&schedule, config.rate, shutdown, stats, &mut tick_fn),
Some(ctx) => {
core_loop::gated_loop(&schedule, config.rate, shutdown, stats, ctx, &mut tick_fn)
}
};
let flush_result = sink.flush();
match loop_result {
Ok(()) => core_loop::apply_flush_policy(&schedule, stats_for_flush.as_ref(), flush_result),
Err(e) => Err(e),
}
}
#[cfg(test)]
mod tests {
use crate::config::{BaseScheduleConfig, GapConfig, ScenarioConfig};
use crate::encoder::EncoderConfig;
use crate::generator::GeneratorConfig;
use crate::sink::memory::MemorySink;
use crate::sink::SinkConfig;
fn make_config(rate: f64, duration: &str, gaps: Option<GapConfig>) -> ScenarioConfig {
ScenarioConfig {
base: BaseScheduleConfig {
name: "up".to_string(),
rate,
duration: Some(duration.to_string()),
gaps,
bursts: None,
cardinality_spikes: None,
dynamic_labels: None,
labels: None,
sink: SinkConfig::Stdout, phase_offset: None,
clock_group: None,
clock_group_is_auto: None,
jitter: None,
jitter_seed: None,
on_sink_error: crate::OnSinkError::Warn,
},
generator: GeneratorConfig::Constant { value: 1.0 },
encoder: EncoderConfig::PrometheusText { precision: None },
}
}
#[test]
fn run_completes_without_error_for_short_duration() {
let config = make_config(100.0, "100ms", None);
let result = super::run(&config);
assert!(
result.is_ok(),
"run must succeed for valid config: {result:?}"
);
}
#[test]
fn integration_rate_100_duration_1s_emits_approximately_100_events() {
let config = make_config(100.0, "1s", None);
let mut sink = MemorySink::new();
super::run_with_sink(&config, &mut sink, None, None).expect("run must succeed");
let newlines = sink.buffer.iter().filter(|&&b| b == b'\n').count();
assert!(
(80..=120).contains(&newlines),
"expected ~100 events (80–120), got {newlines}"
);
}
#[test]
fn integration_output_lines_start_with_metric_name() {
let config = make_config(50.0, "200ms", None);
let mut sink = MemorySink::new();
super::run_with_sink(&config, &mut sink, None, None).expect("run must succeed");
let output = std::str::from_utf8(&sink.buffer).expect("output must be valid UTF-8");
for line in output.lines() {
assert!(
line.starts_with("up"),
"each line must start with metric name 'up', got: {line:?}"
);
}
}
#[test]
fn integration_output_ends_with_newline() {
let config = make_config(50.0, "200ms", None);
let mut sink = MemorySink::new();
super::run_with_sink(&config, &mut sink, None, None).expect("run must succeed");
assert!(
sink.buffer.ends_with(b"\n"),
"output must end with a newline"
);
}
#[test]
fn integration_gap_suppresses_events() {
let config = make_config(
100.0,
"5s",
Some(GapConfig {
every: "3s".to_string(),
r#for: "1s".to_string(),
}),
);
let mut sink = MemorySink::new();
super::run_with_sink(&config, &mut sink, None, None).expect("run must succeed");
let newlines = sink.buffer.iter().filter(|&&b| b == b'\n').count();
assert!(
newlines < 500,
"gap must suppress events: expected < 500, got {newlines}"
);
assert!(
newlines > 0,
"some events must be emitted outside of gaps, got {newlines}"
);
}
#[test]
fn run_with_invalid_duration_returns_err() {
let mut config = make_config(100.0, "bad_duration", None);
config.duration = Some("not_a_duration".to_string());
let result = super::run(&config);
assert!(result.is_err(), "invalid duration must return Err");
}
#[test]
fn run_with_invalid_gap_every_returns_err() {
let mut config = make_config(100.0, "1s", None);
config.gaps = Some(GapConfig {
every: "bad".to_string(),
r#for: "1s".to_string(),
});
let result = super::run(&config);
assert!(result.is_err(), "invalid gap.every must return Err");
}
#[test]
fn integration_labels_appear_in_output() {
let mut config = make_config(50.0, "100ms", None);
let mut label_map = std::collections::HashMap::new();
label_map.insert("host".to_string(), "server1".to_string());
config.labels = Some(label_map);
let mut sink = MemorySink::new();
super::run_with_sink(&config, &mut sink, None, None).expect("run must succeed");
let output = std::str::from_utf8(&sink.buffer).expect("output must be valid UTF-8");
assert!(
output.contains("host=\"server1\""),
"label must appear in output, got:\n{output}"
);
}
fn make_config_with_burst(
rate: f64,
duration: &str,
gaps: Option<crate::config::GapConfig>,
bursts: Option<crate::config::BurstConfig>,
) -> crate::config::ScenarioConfig {
crate::config::ScenarioConfig {
base: crate::config::BaseScheduleConfig {
name: "up".to_string(),
rate,
duration: Some(duration.to_string()),
gaps,
bursts,
cardinality_spikes: None,
dynamic_labels: None,
labels: None,
sink: crate::sink::SinkConfig::Stdout,
phase_offset: None,
clock_group: None,
clock_group_is_auto: None,
jitter: None,
jitter_seed: None,
on_sink_error: crate::OnSinkError::Warn,
},
generator: crate::generator::GeneratorConfig::Constant { value: 1.0 },
encoder: crate::encoder::EncoderConfig::PrometheusText { precision: None },
}
}
#[test]
fn integration_burst_increases_event_count() {
let config = make_config_with_burst(
10.0,
"1s",
None,
Some(crate::config::BurstConfig {
every: "10s".to_string(),
r#for: "9s".to_string(),
multiplier: 5.0,
}),
);
let mut sink = MemorySink::new();
super::run_with_sink(&config, &mut sink, None, None).expect("run must succeed");
let newlines = sink.buffer.iter().filter(|&&b| b == b'\n').count();
assert!(
newlines > 15,
"burst must increase event count above base rate: expected >15, got {newlines}"
);
assert!(
newlines < 100,
"event count must be sane (not runaway): expected <100, got {newlines}"
);
}
#[test]
fn integration_burst_then_normal_produces_mixed_rate() {
let config = make_config_with_burst(
100.0,
"2s",
None,
Some(crate::config::BurstConfig {
every: "10s".to_string(),
r#for: "1s".to_string(),
multiplier: 5.0,
}),
);
let mut sink = MemorySink::new();
super::run_with_sink(&config, &mut sink, None, None).expect("run must succeed");
let newlines = sink.buffer.iter().filter(|&&b| b == b'\n').count();
assert!(
newlines > 200,
"burst phase must produce more than base rate alone: expected >200, got {newlines}"
);
assert!(
newlines <= 900,
"total event count must be in expected range, got {newlines}"
);
}
#[test]
fn integration_gap_wins_over_burst_suppresses_events() {
let config = make_config_with_burst(
100.0,
"3s",
Some(GapConfig {
every: "3s".to_string(),
r#for: "2s".to_string(),
}),
Some(crate::config::BurstConfig {
every: "3s".to_string(),
r#for: "2500ms".to_string(), multiplier: 5.0,
}),
);
let mut sink = MemorySink::new();
super::run_with_sink(&config, &mut sink, None, None).expect("run must succeed");
let newlines = sink.buffer.iter().filter(|&&b| b == b'\n').count();
assert!(
newlines < 1000,
"gap must suppress many events that burst would have produced: expected <1000, got {newlines}"
);
assert!(
newlines > 0,
"some events must be emitted outside of gaps, got {newlines}"
);
}
#[test]
fn integration_gap_covering_full_window_produces_zero_events_even_with_burst() {
let config_no_gap_burst = make_config_with_burst(
1000.0,
"500ms",
None,
Some(crate::config::BurstConfig {
every: "1s".to_string(),
r#for: "900ms".to_string(),
multiplier: 5.0,
}),
);
let mut sink_no_gap = MemorySink::new();
super::run_with_sink(&config_no_gap_burst, &mut sink_no_gap, None, None)
.expect("run must succeed");
let events_burst_only = sink_no_gap.buffer.iter().filter(|&&b| b == b'\n').count();
let config_gap_and_burst = make_config_with_burst(
1000.0,
"500ms",
Some(GapConfig {
every: "1s".to_string(),
r#for: "900ms".to_string(),
}),
Some(crate::config::BurstConfig {
every: "1s".to_string(),
r#for: "900ms".to_string(),
multiplier: 5.0,
}),
);
let mut sink_gap_burst = MemorySink::new();
super::run_with_sink(&config_gap_and_burst, &mut sink_gap_burst, None, None)
.expect("run must succeed");
let events_gap_and_burst = sink_gap_burst
.buffer
.iter()
.filter(|&&b| b == b'\n')
.count();
assert!(
events_gap_and_burst < events_burst_only,
"gap must suppress burst events: gap+burst={events_gap_and_burst} must be < burst-only={events_burst_only}"
);
}
#[test]
fn runner_pushes_metric_events_to_stats_buffer() {
use std::sync::{Arc, RwLock};
let config = make_config(50.0, "200ms", None);
let mut sink = MemorySink::new();
let stats = Arc::new(RwLock::new(crate::schedule::stats::ScenarioStats::default()));
super::run_with_sink(&config, &mut sink, None, Some(Arc::clone(&stats)))
.expect("run must succeed");
let st = stats.read().expect("lock must not be poisoned");
assert!(
!st.recent_metrics.is_empty(),
"runner must push events into the stats recent_metrics buffer, got {} events",
st.recent_metrics.len()
);
assert!(
st.recent_metrics.len() <= crate::schedule::stats::MAX_RECENT_METRICS,
"recent_metrics buffer must not exceed MAX_RECENT_METRICS ({}), got {}",
crate::schedule::stats::MAX_RECENT_METRICS,
st.recent_metrics.len()
);
}
#[test]
fn runner_without_stats_does_not_push_metrics() {
let config = make_config(50.0, "100ms", None);
let mut sink = MemorySink::new();
super::run_with_sink(&config, &mut sink, None, None).expect("run must succeed");
let newlines = sink.buffer.iter().filter(|&&b| b == b'\n').count();
assert!(
newlines > 0,
"runner without stats must still produce output"
);
}
#[test]
fn runner_stats_buffer_events_have_correct_metric_name() {
use std::sync::{Arc, RwLock};
let config = make_config(50.0, "100ms", None);
let mut sink = MemorySink::new();
let stats = Arc::new(RwLock::new(crate::schedule::stats::ScenarioStats::default()));
super::run_with_sink(&config, &mut sink, None, Some(Arc::clone(&stats)))
.expect("run must succeed");
let st = stats.read().expect("lock must not be poisoned");
for event in st.recent_metrics.iter() {
assert_eq!(
&*event.name, "up",
"all buffered events must have the metric name from config"
);
}
}
#[test]
fn shutdown_flag_stops_run_during_burst() {
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
let config = make_config_with_burst(
1000.0,
"60s", None,
Some(crate::config::BurstConfig {
every: "10s".to_string(),
r#for: "9s".to_string(),
multiplier: 5.0,
}),
);
let shutdown = Arc::new(AtomicBool::new(true));
let shutdown_clone = Arc::clone(&shutdown);
let handle = std::thread::spawn(move || {
std::thread::sleep(std::time::Duration::from_millis(200));
shutdown_clone.store(false, Ordering::SeqCst);
});
let mut sink = MemorySink::new();
super::run_with_sink(&config, &mut sink, Some(&shutdown), None).expect("run must succeed");
handle.join().expect("thread must complete");
let newlines = sink.buffer.iter().filter(|&&b| b == b'\n').count();
assert!(
newlines > 0,
"some events must have been emitted before shutdown"
);
}
fn make_config_with_spike(
rate: f64,
duration: &str,
spike: crate::config::CardinalitySpikeConfig,
) -> crate::config::ScenarioConfig {
crate::config::ScenarioConfig {
base: crate::config::BaseScheduleConfig {
name: "up".to_string(),
rate,
duration: Some(duration.to_string()),
gaps: None,
bursts: None,
cardinality_spikes: Some(vec![spike]),
dynamic_labels: None,
labels: None,
sink: crate::sink::SinkConfig::Stdout,
phase_offset: None,
clock_group: None,
clock_group_is_auto: None,
jitter: None,
jitter_seed: None,
on_sink_error: crate::OnSinkError::Warn,
},
generator: crate::generator::GeneratorConfig::Constant { value: 1.0 },
encoder: crate::encoder::EncoderConfig::PrometheusText { precision: None },
}
}
#[test]
fn integration_spike_labels_appear_during_spike_window() {
let spike = crate::config::CardinalitySpikeConfig {
label: "pod_name".to_string(),
every: "10s".to_string(),
r#for: "9s".to_string(),
cardinality: 5,
strategy: crate::config::SpikeStrategy::Counter,
prefix: Some("pod-".to_string()),
seed: None,
};
let config = make_config_with_spike(50.0, "500ms", spike);
let mut sink = MemorySink::new();
super::run_with_sink(&config, &mut sink, None, None).expect("run must succeed");
let output = std::str::from_utf8(&sink.buffer).expect("output must be valid UTF-8");
for line in output.lines() {
assert!(
line.contains("pod_name="),
"every line during spike must contain pod_name label, got: {line:?}"
);
}
}
#[test]
fn integration_no_spike_config_produces_no_spike_labels() {
let config = make_config(50.0, "200ms", None);
let mut sink = MemorySink::new();
super::run_with_sink(&config, &mut sink, None, None).expect("run must succeed");
let output = std::str::from_utf8(&sink.buffer).expect("output must be valid UTF-8");
for line in output.lines() {
assert!(
!line.contains("pod_name="),
"without spike config, pod_name must not appear: {line:?}"
);
}
}
#[test]
fn integration_spike_counter_strategy_produces_bounded_values() {
let spike = crate::config::CardinalitySpikeConfig {
label: "pod_name".to_string(),
every: "10s".to_string(),
r#for: "9s".to_string(),
cardinality: 3,
strategy: crate::config::SpikeStrategy::Counter,
prefix: Some("pod-".to_string()),
seed: None,
};
let config = make_config_with_spike(50.0, "500ms", spike);
let mut sink = MemorySink::new();
super::run_with_sink(&config, &mut sink, None, None).expect("run must succeed");
let output = std::str::from_utf8(&sink.buffer).expect("output must be valid UTF-8");
let mut seen_values = std::collections::HashSet::new();
for line in output.lines() {
if let Some(start) = line.find("pod_name=\"") {
let rest = &line[start + 10..];
if let Some(end) = rest.find('"') {
seen_values.insert(rest[..end].to_string());
}
}
}
assert!(
seen_values.len() <= 3,
"counter strategy with cardinality=3 should produce at most 3 unique values, got {}: {:?}",
seen_values.len(),
seen_values
);
assert!(
!seen_values.is_empty(),
"must have produced at least one spike label value"
);
}
#[test]
fn integration_spike_stats_reports_in_cardinality_spike() {
use std::sync::{Arc, RwLock};
let spike = crate::config::CardinalitySpikeConfig {
label: "pod_name".to_string(),
every: "10s".to_string(),
r#for: "9s".to_string(),
cardinality: 5,
strategy: crate::config::SpikeStrategy::Counter,
prefix: Some("pod-".to_string()),
seed: None,
};
let config = make_config_with_spike(50.0, "200ms", spike);
let mut sink = MemorySink::new();
let stats = Arc::new(RwLock::new(crate::schedule::stats::ScenarioStats::default()));
super::run_with_sink(&config, &mut sink, None, Some(Arc::clone(&stats)))
.expect("run must succeed");
let st = stats.read().expect("lock must not be poisoned");
assert!(
st.in_cardinality_spike,
"stats must report in_cardinality_spike=true during spike window"
);
}
#[test]
fn buffered_events_share_name_arc_allocation() {
use std::sync::{Arc, RwLock};
let config = make_config(200.0, "100ms", None);
let mut sink = MemorySink::new();
let stats = Arc::new(RwLock::new(crate::schedule::stats::ScenarioStats::default()));
super::run_with_sink(&config, &mut sink, None, Some(Arc::clone(&stats)))
.expect("run must succeed");
let st = stats.read().expect("lock must not be poisoned");
let events: Vec<_> = st.recent_metrics.iter().collect();
assert!(
events.len() >= 2,
"need at least 2 events to verify sharing, got {}",
events.len()
);
let first_name = events[0].name.arc();
for (i, event) in events.iter().enumerate().skip(1) {
assert!(
Arc::ptr_eq(first_name, event.name.arc()),
"event[{i}].name should share Arc allocation with event[0].name"
);
}
}
#[test]
fn buffered_events_share_labels_arc_when_no_spikes() {
use std::sync::{Arc, RwLock};
let config = make_config(200.0, "100ms", None);
let mut sink = MemorySink::new();
let stats = Arc::new(RwLock::new(crate::schedule::stats::ScenarioStats::default()));
super::run_with_sink(&config, &mut sink, None, Some(Arc::clone(&stats)))
.expect("run must succeed");
let st = stats.read().expect("lock must not be poisoned");
let events: Vec<_> = st.recent_metrics.iter().collect();
assert!(
events.len() >= 2,
"need at least 2 events to verify sharing, got {}",
events.len()
);
let first_labels = &events[0].labels;
for (i, event) in events.iter().enumerate().skip(1) {
assert!(
Arc::ptr_eq(first_labels, &event.labels),
"event[{i}].labels should share Arc allocation with event[0].labels"
);
}
}
#[test]
fn invalid_metric_name_returns_config_error_before_loop() {
let config = ScenarioConfig {
base: BaseScheduleConfig {
name: "123-invalid".to_string(),
rate: 10.0,
duration: Some("100ms".to_string()),
gaps: None,
bursts: None,
cardinality_spikes: None,
dynamic_labels: None,
labels: None,
sink: SinkConfig::Stdout,
phase_offset: None,
clock_group: None,
clock_group_is_auto: None,
jitter: None,
jitter_seed: None,
on_sink_error: crate::OnSinkError::Warn,
},
generator: GeneratorConfig::Constant { value: 1.0 },
encoder: EncoderConfig::PrometheusText { precision: None },
};
let mut sink = MemorySink::new();
let result = super::run_with_sink(&config, &mut sink, None, None);
assert!(
matches!(result, Err(crate::SondaError::Config(ref e)) if e.to_string().contains("123-invalid")),
"expected Config error for invalid name, got: {result:?}"
);
}
fn make_config_with_dynamic_labels(
rate: f64,
duration: &str,
dynamic_labels: Vec<crate::config::DynamicLabelConfig>,
) -> ScenarioConfig {
ScenarioConfig {
base: BaseScheduleConfig {
name: "up".to_string(),
rate,
duration: Some(duration.to_string()),
gaps: None,
bursts: None,
cardinality_spikes: None,
dynamic_labels: Some(dynamic_labels),
labels: None,
sink: SinkConfig::Stdout,
phase_offset: None,
clock_group: None,
clock_group_is_auto: None,
jitter: None,
jitter_seed: None,
on_sink_error: crate::OnSinkError::Warn,
},
generator: GeneratorConfig::Constant { value: 1.0 },
encoder: EncoderConfig::PrometheusText { precision: None },
}
}
#[test]
fn dynamic_labels_counter_appear_in_metric_output() {
let config = make_config_with_dynamic_labels(
10.0,
"1s",
vec![crate::config::DynamicLabelConfig {
key: "hostname".to_string(),
strategy: crate::config::DynamicLabelStrategy::Counter {
prefix: Some("host-".to_string()),
cardinality: 5,
},
}],
);
let mut sink = MemorySink::new();
super::run_with_sink(&config, &mut sink, None, None).expect("run must succeed");
let output = std::str::from_utf8(&sink.buffer).expect("valid UTF-8");
let lines: Vec<&str> = output.lines().collect();
assert!(
!lines.is_empty(),
"runner must produce at least one line of output"
);
for line in &lines {
assert!(
line.contains("hostname=\"host-"),
"every metric line must contain dynamic label hostname; line: {line}"
);
}
}
#[test]
fn dynamic_labels_values_list_cycle_in_metric_output() {
let config = make_config_with_dynamic_labels(
10.0,
"1s",
vec![crate::config::DynamicLabelConfig {
key: "region".to_string(),
strategy: crate::config::DynamicLabelStrategy::ValuesList {
values: vec!["alpha".to_string(), "beta".to_string(), "gamma".to_string()],
},
}],
);
let mut sink = MemorySink::new();
super::run_with_sink(&config, &mut sink, None, None).expect("run must succeed");
let output = std::str::from_utf8(&sink.buffer).expect("valid UTF-8");
let lines: Vec<&str> = output.lines().collect();
assert!(!lines.is_empty());
for line in &lines {
assert!(
line.contains("region=\""),
"every metric line must contain dynamic label region; line: {line}"
);
}
let has_alpha = lines.iter().any(|l| l.contains("region=\"alpha\""));
let has_beta = lines.iter().any(|l| l.contains("region=\"beta\""));
assert!(
has_alpha || has_beta,
"at least one distinct dynamic label value should appear in output"
);
}
#[test]
fn dynamic_labels_counter_respects_cardinality_ceiling_in_output() {
let config = make_config_with_dynamic_labels(
50.0,
"1s",
vec![crate::config::DynamicLabelConfig {
key: "hostname".to_string(),
strategy: crate::config::DynamicLabelStrategy::Counter {
prefix: Some("host-".to_string()),
cardinality: 3,
},
}],
);
let mut sink = MemorySink::new();
super::run_with_sink(&config, &mut sink, None, None).expect("run must succeed");
let output = std::str::from_utf8(&sink.buffer).expect("valid UTF-8");
let mut distinct_values = std::collections::HashSet::new();
for line in output.lines() {
if let Some(start) = line.find("hostname=\"") {
let rest = &line[start + 10..];
if let Some(end) = rest.find('"') {
distinct_values.insert(rest[..end].to_string());
}
}
}
assert_eq!(
distinct_values.len(),
3,
"with cardinality=3, exactly 3 distinct values must appear, got {:?}",
distinct_values
);
}
#[test]
fn dynamic_labels_and_static_labels_coexist_in_output() {
let mut config = make_config_with_dynamic_labels(
10.0,
"1s",
vec![crate::config::DynamicLabelConfig {
key: "hostname".to_string(),
strategy: crate::config::DynamicLabelStrategy::Counter {
prefix: Some("host-".to_string()),
cardinality: 5,
},
}],
);
let mut label_map = std::collections::HashMap::new();
label_map.insert("env".to_string(), "prod".to_string());
config.labels = Some(label_map);
let mut sink = MemorySink::new();
super::run_with_sink(&config, &mut sink, None, None).expect("run must succeed");
let output = std::str::from_utf8(&sink.buffer).expect("valid UTF-8");
for line in output.lines() {
assert!(
line.contains("env=\"prod\""),
"static label must appear; line: {line}"
);
assert!(
line.contains("hostname=\"host-"),
"dynamic label must appear; line: {line}"
);
}
}
#[test]
fn dynamic_label_wins_on_key_collision_with_static() {
let mut config = make_config_with_dynamic_labels(
10.0,
"500ms",
vec![crate::config::DynamicLabelConfig {
key: "hostname".to_string(),
strategy: crate::config::DynamicLabelStrategy::Counter {
prefix: Some("dynamic-".to_string()),
cardinality: 3,
},
}],
);
let mut label_map = std::collections::HashMap::new();
label_map.insert("hostname".to_string(), "static-value".to_string());
config.labels = Some(label_map);
let mut sink = MemorySink::new();
super::run_with_sink(&config, &mut sink, None, None).expect("run must succeed");
let output = std::str::from_utf8(&sink.buffer).expect("valid UTF-8");
for line in output.lines() {
assert!(
line.contains("hostname=\"dynamic-"),
"dynamic label must overwrite static label; line: {line}"
);
assert!(
!line.contains("hostname=\"static-value\""),
"static value must not appear when dynamic label overrides it; line: {line}"
);
}
}
}