#![cfg(feature = "config")]
#![allow(dead_code)]
use std::collections::BTreeSet;
use std::path::PathBuf;
use std::sync::atomic::AtomicBool;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::{Duration, Instant};
use sonda_core::compiler::compile_after::{compile_after, CompiledFile};
use sonda_core::compiler::expand::{expand, ExpandedFile, InMemoryPackResolver};
use sonda_core::compiler::normalize::normalize;
use sonda_core::compiler::parse::parse;
use sonda_core::packs::MetricPackDef;
use sonda_core::prepare_entries;
use sonda_core::schedule::histogram_runner;
use sonda_core::schedule::log_runner;
use sonda_core::schedule::runner;
use sonda_core::schedule::summary_runner;
use sonda_core::sink::Sink;
use sonda_core::{ScenarioEntry, SondaError};
pub fn fixtures_dir() -> PathBuf {
PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("tests/fixtures")
}
pub fn repo_root() -> PathBuf {
PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.parent()
.expect("crate has a parent directory")
.to_path_buf()
}
pub fn example_fixture(name: &str) -> String {
let path = fixtures_dir().join("v2-examples").join(name);
std::fs::read_to_string(&path)
.unwrap_or_else(|e| panic!("cannot read fixture {}: {}", path.display(), e))
}
pub fn parity_fixture(name: &str) -> String {
let path = fixtures_dir().join("v2-parity").join(name);
std::fs::read_to_string(&path)
.unwrap_or_else(|e| panic!("cannot read fixture {}: {}", path.display(), e))
}
pub fn load_repo_pack(file_name: &str) -> MetricPackDef {
let path = repo_root().join("packs").join(file_name);
let yaml = std::fs::read_to_string(&path)
.unwrap_or_else(|e| panic!("cannot read pack {}: {}", path.display(), e));
serde_yaml_ng::from_str::<MetricPackDef>(&yaml)
.unwrap_or_else(|e| panic!("cannot parse pack {}: {}", path.display(), e))
}
pub fn builtin_pack_resolver() -> InMemoryPackResolver {
let mut r = InMemoryPackResolver::new();
for (file, pack_name) in [
("telegraf-snmp-interface.yaml", "telegraf_snmp_interface"),
("node-exporter-cpu.yaml", "node_exporter_cpu"),
("node-exporter-memory.yaml", "node_exporter_memory"),
] {
let pack = load_repo_pack(file);
r.insert(pack_name, pack.clone());
r.insert(format!("./packs/{file}"), pack);
}
r
}
pub fn resolver_with(name: &str, pack: MetricPackDef) -> InMemoryPackResolver {
let mut r = InMemoryPackResolver::new();
r.insert(name, pack);
r
}
pub fn compile_to_expanded(yaml: &str, resolver: &InMemoryPackResolver) -> ExpandedFile {
let parsed = parse(yaml).expect("fixture must parse");
let normalized = normalize(parsed).expect("fixture must normalize");
expand(normalized, resolver).expect("fixture must expand")
}
pub fn compile_to_compiled(yaml: &str, resolver: &InMemoryPackResolver) -> CompiledFile {
let expanded = compile_to_expanded(yaml, resolver);
compile_after(expanded).expect("fixture must compile after")
}
struct CapturingSink {
buffer: Arc<Mutex<Vec<u8>>>,
}
impl Sink for CapturingSink {
fn write(&mut self, data: &[u8]) -> Result<(), SondaError> {
let mut guard = self
.buffer
.lock()
.expect("parity harness buffer lock poisoned");
guard.extend_from_slice(data);
Ok(())
}
fn flush(&mut self) -> Result<(), SondaError> {
Ok(())
}
}
pub fn run_and_capture_stdout(entries: Vec<ScenarioEntry>) -> Vec<u8> {
let prepared =
prepare_entries(entries).expect("run_and_capture_stdout: prepare_entries must succeed");
let mut handles = Vec::with_capacity(prepared.len());
for (idx, prepared_entry) in prepared.into_iter().enumerate() {
let buffer = Arc::new(Mutex::new(Vec::<u8>::with_capacity(4096)));
let buffer_for_thread = Arc::clone(&buffer);
let start_delay = prepared_entry.start_delay;
let entry = prepared_entry.entry;
let handle = thread::Builder::new()
.name(format!("parity-{idx}"))
.spawn(move || -> Result<(), SondaError> {
if let Some(delay) = start_delay {
let deadline = Instant::now() + delay;
while Instant::now() < deadline {
let remaining = deadline.saturating_duration_since(Instant::now());
let chunk = remaining.min(Duration::from_millis(25));
if chunk > Duration::ZERO {
thread::sleep(chunk);
}
}
}
let mut sink = CapturingSink {
buffer: buffer_for_thread,
};
run_entry_with_sink(&entry, &mut sink)
})
.expect("failed to spawn parity harness thread");
handles.push((handle, buffer));
}
let mut result = Vec::new();
for (handle, buffer) in handles {
handle
.join()
.expect("parity harness thread panicked")
.expect("parity harness runner returned an error");
let mut guard = buffer.lock().expect("buffer lock poisoned");
result.extend_from_slice(&guard);
guard.clear();
}
result
}
pub fn normalize_timestamps(bytes: &[u8]) -> Vec<u8> {
let mut out = Vec::with_capacity(bytes.len());
let mut i = 0;
while i < bytes.len() {
let rest = &bytes[i..];
if rest[0] == b' ' {
let after_space = &rest[1..];
let mut digit_end = 0;
while digit_end < after_space.len() && after_space[digit_end].is_ascii_digit() {
digit_end += 1;
}
if digit_end >= 11 && digit_end <= 19 && after_space.get(digit_end) == Some(&b'\n') {
out.extend_from_slice(b" ___TS___\n");
i += 1 + digit_end + 1;
continue;
}
}
out.push(bytes[i]);
i += 1;
}
let bytes = out;
let mut out = Vec::with_capacity(bytes.len());
let needle = b"\"timestamp\":\"";
let mut i = 0;
while i < bytes.len() {
if bytes.len() - i >= needle.len() && &bytes[i..i + needle.len()] == needle {
out.extend_from_slice(b"\"timestamp\":\"___TS___\"");
let scan_start = i + needle.len();
let mut j = scan_start;
while j < bytes.len() && bytes[j] != b'"' {
j += 1;
}
i = j.saturating_add(1);
} else {
out.push(bytes[i]);
i += 1;
}
}
out
}
fn run_entry_with_sink(entry: &ScenarioEntry, sink: &mut dyn Sink) -> Result<(), SondaError> {
const NONE_ATOMIC: Option<&AtomicBool> = None;
match entry {
ScenarioEntry::Metrics(config) => runner::run_with_sink(config, sink, NONE_ATOMIC, None),
ScenarioEntry::Logs(config) => {
log_runner::run_logs_with_sink(config, sink, NONE_ATOMIC, None)
}
ScenarioEntry::Histogram(config) => {
histogram_runner::run_with_sink(config, sink, NONE_ATOMIC, None)
}
ScenarioEntry::Summary(config) => {
summary_runner::run_with_sink(config, sink, NONE_ATOMIC, None)
}
_ => Err(SondaError::Config(sonda_core::ConfigError::InvalidValue(
"test harness encountered an unknown ScenarioEntry variant".to_string(),
))),
}
}
pub fn assert_line_multisets_equal(label: &str, expected: &[u8], actual: &[u8]) {
let expected_lines: Vec<&[u8]> = split_lines_preserve_empty(expected);
let actual_lines: Vec<&[u8]> = split_lines_preserve_empty(actual);
let mut expected_sorted: Vec<Vec<u8>> = expected_lines.iter().map(|l| l.to_vec()).collect();
let mut actual_sorted: Vec<Vec<u8>> = actual_lines.iter().map(|l| l.to_vec()).collect();
expected_sorted.sort();
actual_sorted.sort();
if expected_sorted != actual_sorted {
let expected_set: BTreeSet<&[u8]> = expected_sorted.iter().map(Vec::as_slice).collect();
let actual_set: BTreeSet<&[u8]> = actual_sorted.iter().map(Vec::as_slice).collect();
let only_in_expected: Vec<String> = expected_set
.difference(&actual_set)
.map(|b| String::from_utf8_lossy(b).into_owned())
.collect();
let only_in_actual: Vec<String> = actual_set
.difference(&expected_set)
.map(|b| String::from_utf8_lossy(b).into_owned())
.collect();
panic!(
"{label}: line multisets differ\n\
expected {} lines, got {} lines\n\
only in expected:\n {}\n\
only in actual:\n {}",
expected_sorted.len(),
actual_sorted.len(),
only_in_expected.join("\n "),
only_in_actual.join("\n ")
);
}
}
fn split_lines_preserve_empty(bytes: &[u8]) -> Vec<&[u8]> {
if bytes.is_empty() {
return Vec::new();
}
let mut lines: Vec<&[u8]> = bytes.split(|&b| b == b'\n').collect();
if lines.last().is_some_and(|l| l.is_empty()) {
lines.pop();
}
lines
}
pub fn snapshot_settings() -> insta::Settings {
let mut s = insta::Settings::clone_current();
s.set_sort_maps(true);
s.add_filter(r#"(?m)^\s+"[^"]+": null,\n"#, "");
s.add_filter(r#",\n(\s+"[^"]+": null\n)"#, "\n");
s
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn normalize_timestamps_replaces_only_prometheus_trailing_millis() {
let input = b"my_metric{foo=\"bar\"} 42 1712345678901\n";
let out = normalize_timestamps(input);
let expected = b"my_metric{foo=\"bar\"} 42 ___TS___\n";
assert_eq!(
out,
expected,
"got: {:?}",
String::from_utf8_lossy(&out).into_owned()
);
}
#[test]
fn normalize_timestamps_replaces_only_json_timestamp_value() {
let input = b"{\"timestamp\":\"2026-04-13T12:34:56.789Z\",\"msg\":\"hello\",\"n\":42}\n";
let out = normalize_timestamps(input);
let expected = b"{\"timestamp\":\"___TS___\",\"msg\":\"hello\",\"n\":42}\n";
assert_eq!(
out,
expected,
"got: {:?}",
String::from_utf8_lossy(&out).into_owned()
);
}
}