use crate::fixtures::{AutoProduceConfig, KafkaFixture};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct KafkaFixtureFile {
#[serde(default)]
pub fixture: Option<FixtureMeta>,
#[serde(default)]
pub cluster: Option<ClusterSpec>,
#[serde(default)]
pub topics: Vec<KafkaTopicSpec>,
#[serde(default)]
pub scenarios: Vec<ScenarioSpec>,
#[serde(default)]
pub relationships: Vec<RelationshipSpec>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct FixtureMeta {
pub name: String,
#[serde(default)]
pub description: Option<String>,
#[serde(default)]
pub protocol: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct ClusterSpec {
#[serde(default)]
pub bootstrap_servers: Option<String>,
#[serde(default)]
pub cluster_id: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct KafkaTopicSpec {
pub name: String,
#[serde(default = "default_partitions")]
pub partitions: i32,
#[serde(default = "default_replication_factor")]
pub replication_factor: i16,
#[serde(default)]
pub partitioning: Option<serde_json::Value>,
#[serde(default)]
pub config: Option<serde_json::Value>,
#[serde(default)]
pub messages: Vec<KafkaMessageSpec>,
}
fn default_partitions() -> i32 {
1
}
fn default_replication_factor() -> i16 {
1
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct KafkaMessageSpec {
#[serde(default)]
pub key_template: Option<String>,
pub value: serde_json::Value,
#[serde(default)]
pub headers: HashMap<String, String>,
#[serde(default)]
pub partition: Option<i32>,
#[serde(default)]
pub auto_produce: Option<MessageAutoProduce>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MessageAutoProduce {
#[serde(default)]
pub enabled: bool,
#[serde(default)]
pub rate_per_second: Option<u64>,
#[serde(default)]
pub duration_seconds: Option<u64>,
#[serde(default)]
pub total_count: Option<usize>,
#[serde(default)]
pub partition: Option<i32>,
#[serde(default)]
pub trigger: Option<String>,
#[serde(default)]
pub state_machine: Option<StateMachineSpec>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StateMachineSpec {
pub initial_state: String,
#[serde(default)]
pub states: Vec<StateSpec>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StateSpec {
pub name: String,
#[serde(default)]
pub next_states: Vec<String>,
#[serde(default)]
pub probability: Vec<f64>,
#[serde(default)]
pub delay_ms: Vec<u64>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ScenarioSpec {
pub name: String,
#[serde(default = "yes")]
pub enabled: bool,
#[serde(default)]
pub probability: Option<f64>,
#[serde(default)]
pub sequence: Vec<ScenarioStep>,
}
fn yes() -> bool {
true
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ScenarioStep {
pub topic: String,
#[serde(default)]
pub message: Option<String>,
#[serde(default)]
pub delay_ms: Vec<u64>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RelationshipSpec {
pub from_topic: String,
pub to_topic: String,
#[serde(default)]
pub relationship: Option<String>,
#[serde(default)]
pub key_mapping: HashMap<String, String>,
}
#[derive(Debug, Default)]
pub struct FlattenedFixtures {
pub topics: Vec<KafkaTopicSpec>,
pub fixtures: Vec<KafkaFixture>,
pub scenarios: Vec<ScenarioSpec>,
pub relationships: Vec<RelationshipSpec>,
pub state_machines: Vec<(String, StateMachineSpec)>,
}
impl KafkaFixtureFile {
pub fn flatten(self) -> FlattenedFixtures {
let mut fixtures = Vec::new();
let mut state_machines = Vec::new();
for topic in &self.topics {
for (i, msg) in topic.messages.iter().enumerate() {
fixtures.push(message_to_fixture(topic, i, msg));
if let Some(ap) = &msg.auto_produce {
if ap.enabled && ap.trigger.as_deref() == Some("state_machine") {
if let Some(sm) = &ap.state_machine {
state_machines.push((format!("{}#{}", topic.name, i), sm.clone()));
}
}
}
}
}
FlattenedFixtures {
topics: self.topics,
fixtures,
scenarios: self.scenarios,
relationships: self.relationships,
state_machines,
}
}
}
fn message_to_fixture(
topic: &KafkaTopicSpec,
index: usize,
msg: &KafkaMessageSpec,
) -> KafkaFixture {
let auto = msg.auto_produce.as_ref().and_then(|ap| {
match ap.trigger.as_deref() {
None | Some("rate") => ap.rate_per_second.map(|rate| AutoProduceConfig {
enabled: ap.enabled,
rate_per_second: rate,
duration_seconds: ap.duration_seconds,
total_count: ap.total_count,
}),
_ => None,
}
});
KafkaFixture {
identifier: format!("{}#{}", topic.name, index),
name: format!("{} message {}", topic.name, index),
topic: topic.name.clone(),
partition: msg.partition.or_else(|| msg.auto_produce.as_ref().and_then(|a| a.partition)),
key_pattern: msg.key_template.clone(),
value_template: msg.value.clone(),
headers: msg.headers.clone(),
auto_produce: auto,
}
}
#[cfg(test)]
mod tests {
use super::*;
fn sample_yaml() -> &'static str {
r#"
fixture:
name: "E-commerce Order Events"
description: "demo"
protocol: kafka
cluster:
bootstrap_servers: "localhost:9092"
cluster_id: "mockforge-cluster"
topics:
- name: "orders.created"
partitions: 3
replication_factor: 1
partitioning:
strategy: "key_hash"
config:
retention_ms: 604800000
messages:
- key_template: "order-{{uuid}}"
value:
event_type: "order.created"
order_id: "{{uuid}}"
headers:
event_version: "1.0"
auto_produce:
enabled: true
rate_per_second: 10
partition: null
- name: "orders.status-updated"
partitions: 3
messages:
- key_template: "{{context.order_id}}"
value:
event_type: "order.status_updated"
auto_produce:
enabled: true
trigger: "state_machine"
state_machine:
initial_state: "pending"
"#
}
#[test]
fn parses_nested_topology() {
let file: KafkaFixtureFile = serde_yaml::from_str(sample_yaml()).unwrap();
assert_eq!(file.fixture.as_ref().unwrap().name, "E-commerce Order Events");
assert_eq!(file.cluster.as_ref().unwrap().cluster_id.as_deref(), Some("mockforge-cluster"));
assert_eq!(file.topics.len(), 2);
assert_eq!(file.topics[0].name, "orders.created");
assert_eq!(file.topics[0].partitions, 3);
assert_eq!(file.topics[0].messages.len(), 1);
}
#[test]
fn flatten_keeps_topics_and_emits_one_fixture_per_message() {
let file: KafkaFixtureFile = serde_yaml::from_str(sample_yaml()).unwrap();
let flat = file.flatten();
assert_eq!(flat.topics.len(), 2);
assert_eq!(flat.fixtures.len(), 2);
assert_eq!(flat.fixtures[0].identifier, "orders.created#0");
assert_eq!(flat.fixtures[0].topic, "orders.created");
assert!(flat.fixtures[0].auto_produce.as_ref().unwrap().enabled);
assert_eq!(flat.fixtures[0].auto_produce.as_ref().unwrap().rate_per_second, 10);
}
#[test]
fn state_machine_trigger_loads_without_auto_produce() {
let file: KafkaFixtureFile = serde_yaml::from_str(sample_yaml()).unwrap();
let flat = file.flatten();
let sm = &flat.fixtures[1];
assert_eq!(sm.topic, "orders.status-updated");
assert!(sm.auto_produce.is_none());
}
#[test]
fn missing_optional_fields_parse_with_defaults() {
let yaml = r#"
topics:
- name: "plain"
messages:
- value: { k: "v" }
"#;
let file: KafkaFixtureFile = serde_yaml::from_str(yaml).unwrap();
assert_eq!(file.topics[0].partitions, 1);
assert_eq!(file.topics[0].replication_factor, 1);
assert!(file.topics[0].messages[0].key_template.is_none());
}
#[test]
fn flattens_state_machine_spec_into_index() {
let yaml = r#"
topics:
- name: "orders.status-updated"
messages:
- value: { event_type: "order.status_updated" }
auto_produce:
enabled: true
trigger: "state_machine"
state_machine:
initial_state: "pending"
states:
- name: "pending"
next_states: ["processing"]
probability: [1.0]
delay_ms: [1000, 2000]
- name: "processing"
next_states: ["shipped", "cancelled"]
probability: [0.9, 0.1]
delay_ms: [2000, 5000]
- name: "shipped"
next_states: []
- name: "cancelled"
next_states: []
"#;
let file: KafkaFixtureFile = serde_yaml::from_str(yaml).unwrap();
let flat = file.flatten();
assert_eq!(flat.state_machines.len(), 1);
let (id, sm) = &flat.state_machines[0];
assert_eq!(id, "orders.status-updated#0");
assert_eq!(sm.initial_state, "pending");
assert_eq!(sm.states.len(), 4);
assert_eq!(sm.states[1].next_states, vec!["shipped", "cancelled"]);
assert_eq!(sm.states[1].probability, vec![0.9, 0.1]);
assert_eq!(sm.states[2].next_states, Vec::<String>::new());
}
#[test]
fn parses_scenarios_and_relationships_sections() {
let yaml = r#"
topics:
- name: "orders.created"
messages:
- value: { k: "v" }
scenarios:
- name: "Successful Order"
enabled: true
probability: 0.85
sequence:
- topic: "orders.created"
- topic: "payments.processed"
delay_ms: [1000, 3000]
relationships:
- from_topic: "orders.created"
to_topic: "payments.processed"
relationship: "one_to_one"
key_mapping:
order_id: "order_id"
"#;
let file: KafkaFixtureFile = serde_yaml::from_str(yaml).unwrap();
let flat = file.flatten();
assert_eq!(flat.scenarios.len(), 1);
assert_eq!(flat.scenarios[0].name, "Successful Order");
assert_eq!(flat.scenarios[0].probability, Some(0.85));
assert_eq!(flat.scenarios[0].sequence.len(), 2);
assert_eq!(flat.scenarios[0].sequence[1].topic, "payments.processed");
assert_eq!(flat.scenarios[0].sequence[1].delay_ms, vec![1000, 3000]);
assert_eq!(flat.relationships.len(), 1);
assert_eq!(flat.relationships[0].from_topic, "orders.created");
assert_eq!(flat.relationships[0].to_topic, "payments.processed");
assert_eq!(
flat.relationships[0].key_mapping.get("order_id"),
Some(&"order_id".to_string())
);
}
#[test]
fn unknown_top_level_sections_are_ignored() {
let yaml = r#"
topics:
- name: "t"
messages:
- value: {}
scenarios:
- name: "Ignored"
failure_simulation:
broker_failures:
enabled: true
monitoring:
prometheus:
enabled: true
"#;
let file: KafkaFixtureFile = serde_yaml::from_str(yaml).unwrap();
assert_eq!(file.topics.len(), 1);
}
}