pub mod compiler;
pub mod config;
pub mod emit;
pub mod encoder;
pub mod generator;
pub mod model;
pub mod packs;
pub mod scenarios;
pub mod schedule;
pub mod sink;
pub(crate) mod util;
pub use config::aliases::{desugar_entry, desugar_scenario_config};
pub use config::BaseScheduleConfig;
pub use config::BurstConfig;
pub use config::CardinalitySpikeConfig;
pub use config::DistributionConfig;
pub use config::DynamicLabelConfig;
pub use config::DynamicLabelStrategy;
pub use config::HistogramScenarioConfig;
pub use config::LogScenarioConfig;
pub use config::OnSinkError;
pub use config::ScenarioEntry;
pub use config::SpikeStrategy;
pub use config::SummaryScenarioConfig;
pub use config::{expand_entry, expand_scenario};
pub use model::log::LogEvent;
pub use model::log::Severity;
pub use model::metric::Labels;
pub use model::metric::MetricEvent;
pub use model::metric::ValidatedMetricName;
pub use scenarios::BuiltinScenario;
pub use schedule::handle::ScenarioHandle;
pub use schedule::launch::{launch_scenario, prepare_entries, validate_entry, PreparedEntry};
pub use schedule::stats::{ScenarioState, ScenarioStats};
#[cfg(feature = "config")]
pub use compiler::prepare::PrepareError;
#[cfg(feature = "config")]
pub use compile::{compile_scenario_file, compile_scenario_file_compiled, CompileError};
#[cfg(feature = "config")]
mod compile;
#[derive(Debug, thiserror::Error)]
#[non_exhaustive]
pub enum SondaError {
#[error("configuration error: {0}")]
Config(#[from] ConfigError),
#[error("encoder error: {0}")]
Encoder(#[from] EncoderError),
#[error("sink error: {0}")]
Sink(std::io::Error),
#[error("generator error: {0}")]
Generator(#[from] GeneratorError),
#[error("runtime error: {0}")]
Runtime(#[from] RuntimeError),
}
#[derive(Debug, thiserror::Error)]
#[non_exhaustive]
pub enum ConfigError {
#[error("{0}")]
InvalidValue(String),
}
impl ConfigError {
pub(crate) fn invalid(msg: impl Into<String>) -> Self {
ConfigError::InvalidValue(msg.into())
}
}
#[derive(Debug, thiserror::Error)]
#[non_exhaustive]
pub enum GeneratorError {
#[error("cannot read file {path:?}")]
FileRead {
path: String,
#[source]
source: std::io::Error,
},
}
impl GeneratorError {
pub fn source_io_kind(&self) -> Option<std::io::ErrorKind> {
match self {
GeneratorError::FileRead { source, .. } => Some(source.kind()),
}
}
}
#[derive(Debug, thiserror::Error)]
#[non_exhaustive]
pub enum EncoderError {
#[error("JSON serialization failed")]
SerializationFailed(#[source] serde_json::Error),
#[error("timestamp before Unix epoch")]
TimestampBeforeEpoch(#[source] std::time::SystemTimeError),
#[error("{0}")]
NotSupported(String),
#[error("{0}")]
Other(String),
}
#[derive(Debug, thiserror::Error)]
#[non_exhaustive]
pub enum RuntimeError {
#[error("failed to spawn scenario thread")]
SpawnFailed(#[source] std::io::Error),
#[error("scenario thread panicked")]
ThreadPanicked,
#[error("{0}")]
ScenariosFailed(String),
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn io_error_does_not_auto_convert_to_sonda_error() {
let io_err = std::io::Error::new(std::io::ErrorKind::NotFound, "gone");
let sonda_err = SondaError::Sink(io_err);
assert!(
matches!(sonda_err, SondaError::Sink(_)),
"explicit Sink construction must produce Sink variant"
);
}
#[test]
fn missing_replay_file_produces_generator_error_not_sink() {
let path = std::path::Path::new("/nonexistent/path/for/replay.log");
let result = generator::log_replay::LogReplayGenerator::from_file(path);
match result {
Err(ref err) => {
assert!(
matches!(err, SondaError::Generator(_)),
"missing replay file must produce Generator variant, got: {err:?}"
);
}
Ok(_) => panic!("missing file must return Err"),
}
}
#[test]
fn missing_csv_file_produces_generator_error_not_sink() {
let result = generator::csv_replay::CsvReplayGenerator::new(
"/nonexistent/path/for/data.csv",
0,
true,
);
match result {
Err(SondaError::Generator(GeneratorError::FileRead {
ref path,
ref source,
})) => {
assert_eq!(path, "/nonexistent/path/for/data.csv");
assert_eq!(source.kind(), std::io::ErrorKind::NotFound);
}
Err(ref err) => {
panic!("missing CSV file must produce Generator(FileRead) variant, got: {err:?}");
}
Ok(_) => panic!("missing CSV file must return Err"),
}
}
#[test]
fn log_replay_factory_missing_file_produces_generator_error() {
let config = generator::LogGeneratorConfig::Replay {
file: "/nonexistent/path/for/replay.log".to_string(),
};
let result = generator::create_log_generator(&config);
match result {
Err(ref err) => {
assert!(
matches!(err, SondaError::Generator(_)),
"factory with missing replay file must produce Generator variant, got: {err:?}"
);
}
Ok(_) => panic!("missing replay file must return Err"),
}
}
#[test]
fn sink_file_error_produces_sink_variant() {
let result = sink::file::FileSink::new(std::path::Path::new(
"/nonexistent/deeply/nested/path/output.txt",
));
match result {
Err(ref err) => {
assert!(
matches!(err, SondaError::Sink(_)),
"file sink I/O error must produce Sink variant, got: {err:?}"
);
}
Ok(_) => panic!("invalid file path must return Err"),
}
}
#[test]
fn sonda_error_display_includes_context() {
let err = SondaError::Generator(GeneratorError::FileRead {
path: "/some/file".to_string(),
source: std::io::Error::new(std::io::ErrorKind::NotFound, "no such file"),
});
let msg = format!("{err}");
assert!(
msg.contains("generator error"),
"Generator variant display must include 'generator error', got: {msg}"
);
assert!(
msg.contains("/some/file"),
"Generator variant display must include the file path, got: {msg}"
);
}
#[test]
fn config_error_converts_to_sonda_error_via_from() {
let config_err = ConfigError::invalid("rate must be positive");
let sonda_err: SondaError = config_err.into();
assert!(
matches!(sonda_err, SondaError::Config(_)),
"ConfigError must convert to SondaError::Config"
);
}
#[test]
fn generator_error_converts_to_sonda_error_via_from() {
let gen_err = GeneratorError::FileRead {
path: "/tmp/test.csv".to_string(),
source: std::io::Error::new(std::io::ErrorKind::NotFound, "not found"),
};
let sonda_err: SondaError = gen_err.into();
assert!(
matches!(sonda_err, SondaError::Generator(_)),
"GeneratorError must convert to SondaError::Generator"
);
}
#[test]
fn encoder_error_converts_to_sonda_error_via_from() {
let enc_err = EncoderError::NotSupported("log encoding not supported".into());
let sonda_err: SondaError = enc_err.into();
assert!(
matches!(sonda_err, SondaError::Encoder(_)),
"EncoderError must convert to SondaError::Encoder"
);
}
#[test]
fn runtime_error_converts_to_sonda_error_via_from() {
let rt_err = RuntimeError::ThreadPanicked;
let sonda_err: SondaError = rt_err.into();
assert!(
matches!(sonda_err, SondaError::Runtime(_)),
"RuntimeError must convert to SondaError::Runtime"
);
}
#[test]
fn generator_file_read_preserves_io_error_source() {
use std::error::Error;
let io_err = std::io::Error::new(std::io::ErrorKind::PermissionDenied, "access denied");
let gen_err = GeneratorError::FileRead {
path: "/secret/file".to_string(),
source: io_err,
};
let source = gen_err.source().expect("source() must return Some");
let io_source = source
.downcast_ref::<std::io::Error>()
.expect("source must be std::io::Error");
assert_eq!(io_source.kind(), std::io::ErrorKind::PermissionDenied);
}
#[test]
fn generator_file_read_io_error_kind_is_inspectable() {
let gen_err = GeneratorError::FileRead {
path: "/missing/file".to_string(),
source: std::io::Error::new(std::io::ErrorKind::NotFound, "not found"),
};
assert_eq!(gen_err.source_io_kind(), Some(std::io::ErrorKind::NotFound));
}
#[test]
fn encoder_serialization_preserves_serde_json_source() {
use std::error::Error;
let json_err: serde_json::Error = serde_json::from_str::<serde_json::Value>("{{invalid}}")
.expect_err("invalid JSON must fail");
let enc_err = EncoderError::SerializationFailed(json_err);
let source = enc_err.source().expect("source() must return Some");
assert!(
source.downcast_ref::<serde_json::Error>().is_some(),
"source must be serde_json::Error"
);
}
#[test]
fn encoder_timestamp_preserves_system_time_source() {
use std::error::Error;
let pre_epoch = std::time::UNIX_EPOCH - std::time::Duration::from_secs(1);
let sys_err = pre_epoch.duration_since(std::time::UNIX_EPOCH).unwrap_err();
let enc_err = EncoderError::TimestampBeforeEpoch(sys_err);
let source = enc_err.source().expect("source() must return Some");
assert!(
source
.downcast_ref::<std::time::SystemTimeError>()
.is_some(),
"source must be SystemTimeError"
);
}
#[test]
fn spawn_failed_is_runtime_not_config() {
let io_err = std::io::Error::new(std::io::ErrorKind::Other, "resource limit");
let rt_err = RuntimeError::SpawnFailed(io_err);
let sonda_err: SondaError = rt_err.into();
assert!(
matches!(sonda_err, SondaError::Runtime(RuntimeError::SpawnFailed(_))),
"thread spawn failure must be Runtime::SpawnFailed, not Config"
);
}
#[test]
fn thread_panicked_is_runtime_not_config() {
let rt_err = RuntimeError::ThreadPanicked;
let sonda_err: SondaError = rt_err.into();
assert!(
matches!(sonda_err, SondaError::Runtime(RuntimeError::ThreadPanicked)),
"thread panic must be Runtime::ThreadPanicked, not Config"
);
}
#[test]
fn runtime_error_display_is_descriptive() {
let spawn_err = RuntimeError::SpawnFailed(std::io::Error::new(
std::io::ErrorKind::Other,
"too many threads",
));
let msg = format!("{spawn_err}");
assert!(
msg.contains("failed to spawn scenario thread"),
"SpawnFailed display must describe the spawn failure, got: {msg}"
);
let panic_err = RuntimeError::ThreadPanicked;
let msg = format!("{panic_err}");
assert!(
msg.contains("panicked"),
"ThreadPanicked display must mention panic, got: {msg}"
);
let scenarios_err =
RuntimeError::ScenariosFailed("sink error: broken pipe; sink error: timeout".into());
let msg = format!("{scenarios_err}");
assert!(
msg.contains("sink error"),
"ScenariosFailed display must include the collected messages, got: {msg}"
);
}
#[test]
fn spawn_failed_preserves_io_error_source() {
use std::error::Error;
let io_err = std::io::Error::new(
std::io::ErrorKind::WouldBlock,
"resource temporarily unavailable",
);
let rt_err = RuntimeError::SpawnFailed(io_err);
let source = rt_err
.source()
.expect("SpawnFailed source() must return Some");
let io_source = source
.downcast_ref::<std::io::Error>()
.expect("source must be std::io::Error");
assert_eq!(io_source.kind(), std::io::ErrorKind::WouldBlock);
}
#[test]
fn spawn_failed_source_chain_traverses_through_sonda_error() {
use std::error::Error;
let io_err =
std::io::Error::new(std::io::ErrorKind::PermissionDenied, "cannot create thread");
let sonda_err = SondaError::Runtime(RuntimeError::SpawnFailed(io_err));
let runtime_source = sonda_err
.source()
.expect("SondaError::Runtime source() must return Some");
let rt_err = runtime_source
.downcast_ref::<RuntimeError>()
.expect("first source must be RuntimeError");
let io_source = rt_err
.source()
.expect("SpawnFailed source() must return Some");
let io_inner = io_source
.downcast_ref::<std::io::Error>()
.expect("second source must be std::io::Error");
assert_eq!(io_inner.kind(), std::io::ErrorKind::PermissionDenied);
}
#[test]
fn scenarios_failed_is_runtime_not_config() {
let rt_err = RuntimeError::ScenariosFailed("thread failed".into());
let sonda_err: SondaError = rt_err.into();
assert!(
matches!(
sonda_err,
SondaError::Runtime(RuntimeError::ScenariosFailed(_))
),
"multi-scenario failures must be Runtime::ScenariosFailed, not Config"
);
}
#[test]
fn scenarios_failed_converts_to_sonda_error_via_from() {
let rt_err = RuntimeError::ScenariosFailed("sink error: broken pipe".into());
let sonda_err: SondaError = rt_err.into();
assert!(
matches!(sonda_err, SondaError::Runtime(_)),
"ScenariosFailed must convert to SondaError::Runtime"
);
}
#[test]
fn config_types_constructible_without_yaml_parsing() {
use crate::config::{BaseScheduleConfig, ScenarioConfig};
use crate::encoder::EncoderConfig;
use crate::generator::GeneratorConfig;
use crate::sink::SinkConfig;
let _config = ScenarioConfig {
base: BaseScheduleConfig {
name: "test".to_string(),
rate: 10.0,
duration: None,
gaps: None,
bursts: None,
cardinality_spikes: None,
dynamic_labels: None,
labels: None,
sink: SinkConfig::Stdout,
phase_offset: None,
clock_group: None,
clock_group_is_auto: None,
jitter: None,
jitter_seed: None,
on_sink_error: crate::OnSinkError::Warn,
},
generator: GeneratorConfig::Constant { value: 1.0 },
encoder: EncoderConfig::PrometheusText { precision: None },
};
}
#[cfg(feature = "config")]
#[test]
fn config_feature_enables_yaml_deserialization() {
use crate::config::ScenarioConfig;
let yaml = r#"
name: test
rate: 10
generator:
type: constant
value: 1.0
"#;
let config: ScenarioConfig = serde_yaml_ng::from_str(yaml)
.expect("YAML deserialization must work with config feature");
assert_eq!(config.name, "test");
}
#[test]
fn factory_functions_work_without_deserialization() {
use crate::encoder::{create_encoder, EncoderConfig};
use crate::generator::{create_generator, GeneratorConfig};
use crate::sink::{create_sink, SinkConfig};
let gen_config = GeneratorConfig::Constant { value: 42.0 };
let gen = create_generator(&gen_config, 1.0).expect("generator factory must succeed");
assert_eq!(gen.value(0), 42.0);
let enc_config = EncoderConfig::PrometheusText { precision: None };
let _enc = create_encoder(&enc_config).expect("encoder factory must succeed");
let sink_config = SinkConfig::Stdout;
let _sink = create_sink(&sink_config, None).expect("sink factory must succeed");
}
#[test]
fn sonda_error_sink_display_includes_io_context() {
let io_err = std::io::Error::new(std::io::ErrorKind::BrokenPipe, "pipe broke");
let err = SondaError::Sink(io_err);
let msg = format!("{err}");
assert!(
msg.contains("sink error"),
"Sink variant display must include 'sink error', got: {msg}"
);
assert!(
msg.contains("pipe broke"),
"Sink variant display must include the I/O error message, got: {msg}"
);
}
#[test]
fn error_types_are_send_and_sync() {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<SondaError>();
assert_send_sync::<ConfigError>();
assert_send_sync::<GeneratorError>();
assert_send_sync::<EncoderError>();
assert_send_sync::<RuntimeError>();
}
}