use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
pub const SCENARIO_SCHEMA_VERSION: u32 = 1;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Scenario {
#[serde(default = "default_schema_version")]
pub schema_version: u32,
pub id: String,
#[serde(default)]
pub description: String,
#[serde(default)]
pub lab: LabSection,
#[serde(default)]
pub chaos: ChaosSection,
#[serde(default)]
pub network: NetworkSection,
#[serde(default)]
pub faults: Vec<FaultEvent>,
#[serde(default)]
pub participants: Vec<Participant>,
#[serde(default = "default_oracles")]
pub oracles: Vec<String>,
#[serde(default)]
pub cancellation: Option<CancellationSection>,
#[serde(default)]
pub include: Vec<IncludeRef>,
#[serde(default)]
pub metadata: BTreeMap<String, String>,
}
fn default_schema_version() -> u32 {
SCENARIO_SCHEMA_VERSION
}
fn default_oracles() -> Vec<String> {
vec!["all".to_string()]
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LabSection {
#[serde(default = "default_seed")]
pub seed: u64,
pub entropy_seed: Option<u64>,
#[serde(default = "default_worker_count")]
pub worker_count: usize,
#[serde(default = "default_trace_capacity")]
pub trace_capacity: usize,
#[serde(default = "default_max_steps")]
pub max_steps: Option<u64>,
#[serde(default = "default_true")]
pub panic_on_obligation_leak: bool,
#[serde(default = "default_true")]
pub panic_on_futurelock: bool,
#[serde(default = "default_futurelock_max_idle")]
pub futurelock_max_idle_steps: u64,
#[serde(default)]
pub replay_recording: bool,
}
impl Default for LabSection {
fn default() -> Self {
Self {
seed: 42,
entropy_seed: None,
worker_count: 1,
trace_capacity: 4096,
max_steps: Some(100_000),
panic_on_obligation_leak: true,
panic_on_futurelock: true,
futurelock_max_idle_steps: 10_000,
replay_recording: false,
}
}
}
fn default_seed() -> u64 {
42
}
fn default_worker_count() -> usize {
1
}
fn default_trace_capacity() -> usize {
4096
}
#[allow(clippy::unnecessary_wraps)]
fn default_max_steps() -> Option<u64> {
Some(100_000)
}
fn default_true() -> bool {
true
}
fn default_futurelock_max_idle() -> u64 {
10_000
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
#[serde(tag = "preset", rename_all = "snake_case")]
pub enum ChaosSection {
#[default]
Off,
Light,
Heavy,
Custom {
#[serde(default)]
cancel_probability: f64,
#[serde(default)]
delay_probability: f64,
#[serde(default)]
delay_min_ms: u64,
#[serde(default = "default_delay_max_ms")]
delay_max_ms: u64,
#[serde(default)]
io_error_probability: f64,
#[serde(default)]
wakeup_storm_probability: f64,
#[serde(default)]
budget_exhaustion_probability: f64,
},
}
fn default_delay_max_ms() -> u64 {
10
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NetworkSection {
#[serde(default = "default_network_preset")]
pub preset: NetworkPreset,
#[serde(default)]
pub links: BTreeMap<String, LinkConditions>,
}
impl Default for NetworkSection {
fn default() -> Self {
Self {
preset: NetworkPreset::Ideal,
links: BTreeMap::new(),
}
}
}
fn default_network_preset() -> NetworkPreset {
NetworkPreset::Ideal
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum NetworkPreset {
Ideal,
Local,
Lan,
Wan,
Satellite,
Congested,
Lossy,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LinkConditions {
#[serde(default)]
pub latency: Option<LatencySpec>,
#[serde(default)]
pub packet_loss: Option<f64>,
#[serde(default)]
pub packet_corrupt: Option<f64>,
#[serde(default)]
pub packet_duplicate: Option<f64>,
#[serde(default)]
pub packet_reorder: Option<f64>,
#[serde(default)]
pub bandwidth: Option<u64>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "model", rename_all = "snake_case")]
pub enum LatencySpec {
Fixed {
ms: u64,
},
Uniform {
min_ms: u64,
max_ms: u64,
},
Normal {
mean_ms: u64,
stddev_ms: u64,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FaultEvent {
pub at_ms: u64,
pub action: FaultAction,
#[serde(default)]
pub args: BTreeMap<String, serde_json::Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum FaultAction {
Partition,
Heal,
HostCrash,
HostRestart,
ClockSkew,
ClockReset,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Participant {
pub name: String,
#[serde(default)]
pub role: String,
#[serde(default)]
pub properties: BTreeMap<String, serde_json::Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CancellationSection {
pub strategy: CancellationStrategy,
#[serde(default)]
pub count: Option<usize>,
#[serde(default)]
pub probability: Option<f64>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum CancellationStrategy {
Never,
AllPoints,
RandomSample,
FirstN,
LastN,
EveryNth,
Probabilistic,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IncludeRef {
pub path: String,
}
#[derive(Debug, Clone)]
pub struct ValidationError {
pub field: String,
pub message: String,
}
impl std::fmt::Display for ValidationError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}: {}", self.field, self.message)
}
}
impl std::error::Error for ValidationError {}
impl Scenario {
#[must_use]
pub fn validate(&self) -> Vec<ValidationError> {
let mut errors = Vec::new();
self.validate_header(&mut errors);
self.validate_chaos(&mut errors);
self.validate_network(&mut errors);
self.validate_faults(&mut errors);
self.validate_participants(&mut errors);
self.validate_cancellation(&mut errors);
errors
}
fn validate_header(&self, errors: &mut Vec<ValidationError>) {
if self.schema_version != SCENARIO_SCHEMA_VERSION {
errors.push(ValidationError {
field: "schema_version".into(),
message: format!(
"unsupported version {}, expected {SCENARIO_SCHEMA_VERSION}",
self.schema_version
),
});
}
if self.id.is_empty() {
errors.push(ValidationError {
field: "id".into(),
message: "scenario id must not be empty".into(),
});
}
if self.lab.worker_count == 0 {
errors.push(ValidationError {
field: "lab.worker_count".into(),
message: "worker_count must be >= 1".into(),
});
}
if self.lab.trace_capacity == 0 {
errors.push(ValidationError {
field: "lab.trace_capacity".into(),
message: "trace_capacity must be > 0".into(),
});
}
}
fn validate_chaos(&self, errors: &mut Vec<ValidationError>) {
if let ChaosSection::Custom {
cancel_probability,
delay_probability,
delay_min_ms,
delay_max_ms,
io_error_probability,
wakeup_storm_probability,
budget_exhaustion_probability,
} = &self.chaos
{
for (name, val) in [
("chaos.cancel_probability", cancel_probability),
("chaos.delay_probability", delay_probability),
("chaos.io_error_probability", io_error_probability),
("chaos.wakeup_storm_probability", wakeup_storm_probability),
(
"chaos.budget_exhaustion_probability",
budget_exhaustion_probability,
),
] {
if !(0.0..=1.0).contains(val) {
errors.push(ValidationError {
field: name.into(),
message: format!("probability must be in [0.0, 1.0], got {val}"),
});
}
}
if *delay_min_ms > *delay_max_ms {
errors.push(ValidationError {
field: "chaos.delay_min_ms".into(),
message: format!(
"delay_min_ms ({delay_min_ms}) must be <= delay_max_ms ({delay_max_ms})"
),
});
}
}
}
fn validate_network(&self, errors: &mut Vec<ValidationError>) {
for (key, link) in &self.network.links {
let key_valid = key
.split_once("->")
.is_some_and(|(from, to)| !from.is_empty() && !to.is_empty() && !to.contains("->"));
if !key_valid {
errors.push(ValidationError {
field: format!("network.links.{key}"),
message: "link key must be in format \"from->to\"".into(),
});
}
for (name, value) in [
("packet_loss", link.packet_loss),
("packet_corrupt", link.packet_corrupt),
("packet_duplicate", link.packet_duplicate),
("packet_reorder", link.packet_reorder),
] {
if let Some(probability) = value {
if !probability.is_finite() || !(0.0..=1.0).contains(&probability) {
errors.push(ValidationError {
field: format!("network.links.{key}.{name}"),
message: format!(
"probability must be finite and in [0.0, 1.0], got {probability}"
),
});
}
}
}
if let Some(LatencySpec::Uniform { min_ms, max_ms }) = &link.latency {
if min_ms > max_ms {
errors.push(ValidationError {
field: format!("network.links.{key}.latency"),
message: format!(
"uniform latency min_ms ({min_ms}) must be <= max_ms ({max_ms})"
),
});
}
}
}
}
fn validate_faults(&self, errors: &mut Vec<ValidationError>) {
for window in self.faults.windows(2) {
if window[1].at_ms < window[0].at_ms {
errors.push(ValidationError {
field: "faults".into(),
message: format!(
"fault events must be ordered by at_ms: {} comes before {}",
window[0].at_ms, window[1].at_ms
),
});
}
}
}
fn validate_participants(&self, errors: &mut Vec<ValidationError>) {
let mut seen_names = std::collections::HashSet::new();
for p in &self.participants {
if !seen_names.insert(&p.name) {
errors.push(ValidationError {
field: format!("participants.{}", p.name),
message: "duplicate participant name".into(),
});
}
}
}
fn validate_cancellation(&self, errors: &mut Vec<ValidationError>) {
let Some(ref cancel) = self.cancellation else {
return;
};
match cancel.strategy {
CancellationStrategy::RandomSample
| CancellationStrategy::FirstN
| CancellationStrategy::LastN
| CancellationStrategy::EveryNth => {
if cancel.count.is_none() {
errors.push(ValidationError {
field: "cancellation.count".into(),
message: format!(
"strategy {:?} requires a count parameter",
cancel.strategy
),
});
} else if cancel.count == Some(0) {
errors.push(ValidationError {
field: "cancellation.count".into(),
message: "count must be >= 1".into(),
});
}
}
CancellationStrategy::Probabilistic => {
if let Some(p) = cancel.probability {
if !p.is_finite() || !(0.0..=1.0).contains(&p) {
errors.push(ValidationError {
field: "cancellation.probability".into(),
message: format!("probability must be in [0.0, 1.0], got {p}"),
});
}
} else {
errors.push(ValidationError {
field: "cancellation.probability".into(),
message: "strategy probabilistic requires a probability parameter".into(),
});
}
}
CancellationStrategy::Never | CancellationStrategy::AllPoints => {}
}
}
#[must_use]
pub fn to_lab_config(&self) -> super::config::LabConfig {
let mut config = super::config::LabConfig::new(self.lab.seed)
.worker_count(self.lab.worker_count)
.trace_capacity(self.lab.trace_capacity)
.panic_on_leak(self.lab.panic_on_obligation_leak)
.panic_on_futurelock(self.lab.panic_on_futurelock)
.futurelock_max_idle_steps(self.lab.futurelock_max_idle_steps);
if let Some(entropy) = self.lab.entropy_seed {
config = config.entropy_seed(entropy);
}
if let Some(max) = self.lab.max_steps {
config = config.max_steps(max);
} else {
config = config.no_step_limit();
}
config = match &self.chaos {
ChaosSection::Off => config,
ChaosSection::Light => config.with_light_chaos(),
ChaosSection::Heavy => config.with_heavy_chaos(),
ChaosSection::Custom {
cancel_probability,
delay_probability,
delay_min_ms,
delay_max_ms,
io_error_probability,
wakeup_storm_probability,
budget_exhaustion_probability,
} => {
use std::time::Duration;
let chaos_seed = self.lab.entropy_seed.unwrap_or(self.lab.seed);
let chaos = crate::lab::chaos::ChaosConfig::new(chaos_seed)
.with_cancel_probability(*cancel_probability)
.with_delay_probability(*delay_probability)
.with_delay_range(
Duration::from_millis(*delay_min_ms)..Duration::from_millis(*delay_max_ms),
)
.with_io_error_probability(*io_error_probability)
.with_wakeup_storm_probability(*wakeup_storm_probability)
.with_budget_exhaust_probability(*budget_exhaustion_probability);
config.with_chaos(chaos)
}
};
if self.lab.replay_recording {
config = config.with_default_replay_recording();
}
config
}
pub fn from_json(json: &str) -> Result<Self, serde_json::Error> {
serde_json::from_str(json)
}
pub fn to_json(&self) -> Result<String, serde_json::Error> {
serde_json::to_string_pretty(self)
}
}
#[cfg(test)]
mod tests {
use super::*;
fn minimal_json() -> &'static str {
r#"{
"id": "test-scenario",
"description": "minimal test"
}"#
}
#[test]
fn parse_minimal_scenario() {
let s: Scenario = serde_json::from_str(minimal_json()).unwrap();
assert_eq!(s.id, "test-scenario");
assert_eq!(s.schema_version, 1);
assert_eq!(s.lab.seed, 42);
assert_eq!(s.lab.worker_count, 1);
assert!(s.faults.is_empty());
assert!(s.participants.is_empty());
assert_eq!(s.oracles, vec!["all"]);
}
#[test]
fn validate_minimal_scenario() {
let s: Scenario = serde_json::from_str(minimal_json()).unwrap();
let errors = s.validate();
assert!(errors.is_empty(), "unexpected errors: {errors:?}");
}
#[test]
fn validate_empty_id_rejected() {
let json = r#"{"id": "", "description": "bad"}"#;
let s: Scenario = serde_json::from_str(json).unwrap();
let errors = s.validate();
assert!(errors.iter().any(|e| e.field == "id"));
}
#[test]
fn validate_bad_schema_version() {
let json = r#"{"schema_version": 99, "id": "x"}"#;
let s: Scenario = serde_json::from_str(json).unwrap();
let errors = s.validate();
assert!(errors.iter().any(|e| e.field == "schema_version"));
}
#[test]
fn parse_chaos_preset_light() {
let json = r#"{"id": "x", "chaos": {"preset": "light"}}"#;
let s: Scenario = serde_json::from_str(json).unwrap();
assert!(matches!(s.chaos, ChaosSection::Light));
}
#[test]
fn parse_chaos_custom() {
let json = r#"{
"id": "x",
"chaos": {
"preset": "custom",
"cancel_probability": 0.05,
"delay_probability": 0.3,
"io_error_probability": 0.1
}
}"#;
let s: Scenario = serde_json::from_str(json).unwrap();
match s.chaos {
ChaosSection::Custom {
cancel_probability,
delay_probability,
io_error_probability,
..
} => {
assert!((cancel_probability - 0.05).abs() < f64::EPSILON);
assert!((delay_probability - 0.3).abs() < f64::EPSILON);
assert!((io_error_probability - 0.1).abs() < f64::EPSILON);
}
other => panic!("expected Custom, got {other:?}"),
}
}
#[test]
fn validate_chaos_bad_probability() {
let json = r#"{
"id": "x",
"chaos": {"preset": "custom", "cancel_probability": 1.5}
}"#;
let s: Scenario = serde_json::from_str(json).unwrap();
let errors = s.validate();
assert!(errors.iter().any(|e| e.field == "chaos.cancel_probability"));
}
#[test]
fn parse_network_preset_wan() {
let json = r#"{"id": "x", "network": {"preset": "wan"}}"#;
let s: Scenario = serde_json::from_str(json).unwrap();
assert_eq!(s.network.preset, NetworkPreset::Wan);
}
#[test]
fn parse_network_link_override() {
let json = r#"{
"id": "x",
"network": {
"preset": "lan",
"links": {
"alice->bob": { "packet_loss": 0.5 }
}
}
}"#;
let s: Scenario = serde_json::from_str(json).unwrap();
let link = s.network.links.get("alice->bob").unwrap();
assert!((link.packet_loss.unwrap() - 0.5).abs() < f64::EPSILON);
}
#[test]
fn validate_bad_link_key() {
let json = r#"{
"id": "x",
"network": {"links": {"alice_bob": {}}}
}"#;
let s: Scenario = serde_json::from_str(json).unwrap();
let errors = s.validate();
assert!(errors.iter().any(|e| e.field.contains("network.links")));
}
#[test]
fn validate_link_probability_out_of_range() {
let json = r#"{
"id": "x",
"network": {
"links": {
"alice->bob": { "packet_loss": 1.5 }
}
}
}"#;
let s: Scenario = serde_json::from_str(json).unwrap();
let errors = s.validate();
assert!(
errors
.iter()
.any(|e| e.field == "network.links.alice->bob.packet_loss")
);
}
#[test]
fn validate_uniform_latency_min_max_order() {
let json = r#"{
"id": "x",
"network": {
"links": {
"alice->bob": {
"latency": { "model": "uniform", "min_ms": 20, "max_ms": 10 }
}
}
}
}"#;
let s: Scenario = serde_json::from_str(json).unwrap();
let errors = s.validate();
assert!(
errors
.iter()
.any(|e| e.field == "network.links.alice->bob.latency")
);
}
#[test]
fn parse_fault_events() {
let json = r#"{
"id": "x",
"faults": [
{"at_ms": 100, "action": "partition", "args": {"from": "a", "to": "b"}},
{"at_ms": 500, "action": "heal", "args": {"from": "a", "to": "b"}}
]
}"#;
let s: Scenario = serde_json::from_str(json).unwrap();
assert_eq!(s.faults.len(), 2);
assert_eq!(s.faults[0].at_ms, 100);
assert!(matches!(s.faults[0].action, FaultAction::Partition));
assert_eq!(s.faults[1].at_ms, 500);
assert!(matches!(s.faults[1].action, FaultAction::Heal));
}
#[test]
fn validate_unordered_faults() {
let json = r#"{
"id": "x",
"faults": [
{"at_ms": 500, "action": "partition"},
{"at_ms": 100, "action": "heal"}
]
}"#;
let s: Scenario = serde_json::from_str(json).unwrap();
let errors = s.validate();
assert!(errors.iter().any(|e| e.field == "faults"));
}
#[test]
fn parse_participants() {
let json = r#"{
"id": "x",
"participants": [
{"name": "alice", "role": "sender"},
{"name": "bob", "role": "receiver"}
]
}"#;
let s: Scenario = serde_json::from_str(json).unwrap();
assert_eq!(s.participants.len(), 2);
assert_eq!(s.participants[0].name, "alice");
assert_eq!(s.participants[1].role, "receiver");
}
#[test]
fn validate_duplicate_participant() {
let json = r#"{
"id": "x",
"participants": [
{"name": "alice"},
{"name": "alice"}
]
}"#;
let s: Scenario = serde_json::from_str(json).unwrap();
let errors = s.validate();
assert!(errors.iter().any(|e| e.message.contains("duplicate")));
}
#[test]
fn parse_cancellation_strategy() {
let json = r#"{
"id": "x",
"cancellation": {
"strategy": "random_sample",
"count": 100
}
}"#;
let s: Scenario = serde_json::from_str(json).unwrap();
let cancel = s.cancellation.as_ref().unwrap();
assert!(matches!(
cancel.strategy,
CancellationStrategy::RandomSample
));
assert_eq!(cancel.count, Some(100));
}
#[test]
fn validate_missing_count() {
let json = r#"{
"id": "x",
"cancellation": {"strategy": "random_sample"}
}"#;
let s: Scenario = serde_json::from_str(json).unwrap();
let errors = s.validate();
assert!(errors.iter().any(|e| e.field == "cancellation.count"));
}
#[test]
fn to_lab_config_defaults() {
let s: Scenario = serde_json::from_str(minimal_json()).unwrap();
let config = s.to_lab_config();
assert_eq!(config.seed, 42);
assert_eq!(config.worker_count, 1);
assert_eq!(config.trace_capacity, 4096);
assert!(config.panic_on_obligation_leak);
}
#[test]
fn to_lab_config_chaos_light() {
let json = r#"{"id": "x", "chaos": {"preset": "light"}}"#;
let s: Scenario = serde_json::from_str(json).unwrap();
let config = s.to_lab_config();
assert!(config.has_chaos());
}
#[test]
fn to_lab_config_custom_seed() {
let json = r#"{"id": "x", "lab": {"seed": 12345, "worker_count": 4}}"#;
let s: Scenario = serde_json::from_str(json).unwrap();
let config = s.to_lab_config();
assert_eq!(config.seed, 12345);
assert_eq!(config.worker_count, 4);
}
#[test]
fn json_roundtrip() {
let json = r#"{
"id": "roundtrip-test",
"description": "full roundtrip",
"lab": {"seed": 99, "worker_count": 2},
"chaos": {"preset": "heavy"},
"network": {"preset": "wan"},
"participants": [{"name": "alice", "role": "sender"}],
"faults": [{"at_ms": 100, "action": "partition"}]
}"#;
let s1: Scenario = serde_json::from_str(json).unwrap();
let serialized = s1.to_json().unwrap();
let s2: Scenario = Scenario::from_json(&serialized).unwrap();
assert_eq!(s1.id, s2.id);
assert_eq!(s1.lab.seed, s2.lab.seed);
assert_eq!(s1.participants.len(), s2.participants.len());
assert_eq!(s1.faults.len(), s2.faults.len());
}
#[test]
fn parse_metadata() {
let json = r#"{
"id": "x",
"metadata": {"git_sha": "abc123", "author": "bot"}
}"#;
let s: Scenario = serde_json::from_str(json).unwrap();
assert_eq!(s.metadata.get("git_sha").unwrap(), "abc123");
}
#[test]
fn parse_latency_models() {
let json = r#"{
"id": "x",
"network": {
"preset": "ideal",
"links": {
"a->b": {"latency": {"model": "fixed", "ms": 5}},
"b->c": {"latency": {"model": "uniform", "min_ms": 1, "max_ms": 10}},
"c->d": {"latency": {"model": "normal", "mean_ms": 50, "stddev_ms": 10}}
}
}
}"#;
let s: Scenario = serde_json::from_str(json).unwrap();
assert_eq!(s.network.links.len(), 3);
let ab = s.network.links.get("a->b").unwrap();
assert!(matches!(ab.latency, Some(LatencySpec::Fixed { ms: 5 })));
}
#[test]
fn parse_include() {
let json = r#"{
"id": "x",
"include": [{"path": "base.yaml"}]
}"#;
let s: Scenario = serde_json::from_str(json).unwrap();
assert_eq!(s.include.len(), 1);
assert_eq!(s.include[0].path, "base.yaml");
}
#[test]
fn network_preset_debug_clone_copy_eq() {
let p = NetworkPreset::Wan;
let dbg = format!("{p:?}");
assert!(dbg.contains("Wan"));
let p2 = p;
assert_eq!(p, p2);
let p3 = p;
assert_eq!(p, p3);
assert_ne!(NetworkPreset::Ideal, NetworkPreset::Lossy);
}
#[test]
fn chaos_section_debug_clone_default() {
let c = ChaosSection::default();
let dbg = format!("{c:?}");
assert!(dbg.contains("Off"));
let c2 = c;
let dbg2 = format!("{c2:?}");
assert_eq!(dbg, dbg2);
}
#[test]
fn fault_action_debug_clone() {
let a = FaultAction::Partition;
let dbg = format!("{a:?}");
assert!(dbg.contains("Partition"));
let a2 = a;
let dbg2 = format!("{a2:?}");
assert_eq!(dbg, dbg2);
}
#[test]
fn validation_error_debug_clone() {
let e = ValidationError {
field: "lab.seed".into(),
message: "must be positive".into(),
};
let dbg = format!("{e:?}");
assert!(dbg.contains("lab.seed"));
let e2 = e;
assert_eq!(e2.field, "lab.seed");
assert_eq!(e2.message, "must be positive");
}
}