1use crate::fixtures::{AutoProduceConfig, KafkaFixture};
22use serde::{Deserialize, Serialize};
23use std::collections::HashMap;
24
25#[derive(Debug, Clone, Serialize, Deserialize, Default)]
27pub struct KafkaFixtureFile {
28 #[serde(default)]
30 pub fixture: Option<FixtureMeta>,
31 #[serde(default)]
33 pub cluster: Option<ClusterSpec>,
34 #[serde(default)]
36 pub topics: Vec<KafkaTopicSpec>,
37 #[serde(default)]
41 pub scenarios: Vec<ScenarioSpec>,
42 #[serde(default)]
45 pub relationships: Vec<RelationshipSpec>,
46}
47
48#[derive(Debug, Clone, Serialize, Deserialize, Default)]
49pub struct FixtureMeta {
50 pub name: String,
51 #[serde(default)]
52 pub description: Option<String>,
53 #[serde(default)]
54 pub protocol: Option<String>,
55}
56
57#[derive(Debug, Clone, Serialize, Deserialize, Default)]
58pub struct ClusterSpec {
59 #[serde(default)]
60 pub bootstrap_servers: Option<String>,
61 #[serde(default)]
62 pub cluster_id: Option<String>,
63}
64
65#[derive(Debug, Clone, Serialize, Deserialize)]
68pub struct KafkaTopicSpec {
69 pub name: String,
70 #[serde(default = "default_partitions")]
71 pub partitions: i32,
72 #[serde(default = "default_replication_factor")]
73 pub replication_factor: i16,
74 #[serde(default)]
75 pub partitioning: Option<serde_json::Value>,
76 #[serde(default)]
77 pub config: Option<serde_json::Value>,
78 #[serde(default)]
79 pub messages: Vec<KafkaMessageSpec>,
80}
81
82fn default_partitions() -> i32 {
83 1
84}
85fn default_replication_factor() -> i16 {
86 1
87}
88
89#[derive(Debug, Clone, Serialize, Deserialize)]
91pub struct KafkaMessageSpec {
92 #[serde(default)]
93 pub key_template: Option<String>,
94 pub value: serde_json::Value,
95 #[serde(default)]
96 pub headers: HashMap<String, String>,
97 #[serde(default)]
98 pub partition: Option<i32>,
99 #[serde(default)]
100 pub auto_produce: Option<MessageAutoProduce>,
101}
102
103#[derive(Debug, Clone, Serialize, Deserialize)]
107pub struct MessageAutoProduce {
108 #[serde(default)]
109 pub enabled: bool,
110 #[serde(default)]
111 pub rate_per_second: Option<u64>,
112 #[serde(default)]
113 pub duration_seconds: Option<u64>,
114 #[serde(default)]
115 pub total_count: Option<usize>,
116 #[serde(default)]
117 pub partition: Option<i32>,
118 #[serde(default)]
122 pub trigger: Option<String>,
123 #[serde(default)]
125 pub state_machine: Option<StateMachineSpec>,
126}
127
128#[derive(Debug, Clone, Serialize, Deserialize)]
130pub struct StateMachineSpec {
131 pub initial_state: String,
132 #[serde(default)]
133 pub states: Vec<StateSpec>,
134}
135
136#[derive(Debug, Clone, Serialize, Deserialize)]
143pub struct StateSpec {
144 pub name: String,
145 #[serde(default)]
146 pub next_states: Vec<String>,
147 #[serde(default)]
148 pub probability: Vec<f64>,
149 #[serde(default)]
151 pub delay_ms: Vec<u64>,
152}
153
154#[derive(Debug, Clone, Serialize, Deserialize)]
157pub struct ScenarioSpec {
158 pub name: String,
159 #[serde(default = "yes")]
160 pub enabled: bool,
161 #[serde(default)]
163 pub probability: Option<f64>,
164 #[serde(default)]
165 pub sequence: Vec<ScenarioStep>,
166}
167
168fn yes() -> bool {
169 true
170}
171
172#[derive(Debug, Clone, Serialize, Deserialize)]
180pub struct ScenarioStep {
181 pub topic: String,
182 #[serde(default)]
183 pub message: Option<String>,
184 #[serde(default)]
186 pub delay_ms: Vec<u64>,
187}
188
189#[derive(Debug, Clone, Serialize, Deserialize)]
192pub struct RelationshipSpec {
193 pub from_topic: String,
194 pub to_topic: String,
195 #[serde(default)]
200 pub relationship: Option<String>,
201 #[serde(default)]
207 pub key_mapping: HashMap<String, String>,
208}
209
210#[derive(Debug, Default)]
212pub struct FlattenedFixtures {
213 pub topics: Vec<KafkaTopicSpec>,
215 pub fixtures: Vec<KafkaFixture>,
217 pub scenarios: Vec<ScenarioSpec>,
220 pub relationships: Vec<RelationshipSpec>,
223 pub state_machines: Vec<(String, StateMachineSpec)>,
227}
228
229impl KafkaFixtureFile {
230 pub fn flatten(self) -> FlattenedFixtures {
237 let mut fixtures = Vec::new();
238 let mut state_machines = Vec::new();
239 for topic in &self.topics {
240 for (i, msg) in topic.messages.iter().enumerate() {
241 fixtures.push(message_to_fixture(topic, i, msg));
242 if let Some(ap) = &msg.auto_produce {
243 if ap.enabled && ap.trigger.as_deref() == Some("state_machine") {
244 if let Some(sm) = &ap.state_machine {
245 state_machines.push((format!("{}#{}", topic.name, i), sm.clone()));
246 }
247 }
248 }
249 }
250 }
251 FlattenedFixtures {
252 topics: self.topics,
253 fixtures,
254 scenarios: self.scenarios,
255 relationships: self.relationships,
256 state_machines,
257 }
258 }
259}
260
261fn message_to_fixture(
262 topic: &KafkaTopicSpec,
263 index: usize,
264 msg: &KafkaMessageSpec,
265) -> KafkaFixture {
266 let auto = msg.auto_produce.as_ref().and_then(|ap| {
267 match ap.trigger.as_deref() {
270 None | Some("rate") => ap.rate_per_second.map(|rate| AutoProduceConfig {
271 enabled: ap.enabled,
272 rate_per_second: rate,
273 duration_seconds: ap.duration_seconds,
274 total_count: ap.total_count,
275 }),
276 _ => None,
277 }
278 });
279
280 KafkaFixture {
281 identifier: format!("{}#{}", topic.name, index),
282 name: format!("{} message {}", topic.name, index),
283 topic: topic.name.clone(),
284 partition: msg.partition.or_else(|| msg.auto_produce.as_ref().and_then(|a| a.partition)),
285 key_pattern: msg.key_template.clone(),
286 value_template: msg.value.clone(),
287 headers: msg.headers.clone(),
288 auto_produce: auto,
289 }
290}
291
292#[cfg(test)]
293mod tests {
294 use super::*;
295
296 fn sample_yaml() -> &'static str {
297 r#"
298fixture:
299 name: "E-commerce Order Events"
300 description: "demo"
301 protocol: kafka
302
303cluster:
304 bootstrap_servers: "localhost:9092"
305 cluster_id: "mockforge-cluster"
306
307topics:
308 - name: "orders.created"
309 partitions: 3
310 replication_factor: 1
311 partitioning:
312 strategy: "key_hash"
313 config:
314 retention_ms: 604800000
315 messages:
316 - key_template: "order-{{uuid}}"
317 value:
318 event_type: "order.created"
319 order_id: "{{uuid}}"
320 headers:
321 event_version: "1.0"
322 auto_produce:
323 enabled: true
324 rate_per_second: 10
325 partition: null
326
327 - name: "orders.status-updated"
328 partitions: 3
329 messages:
330 - key_template: "{{context.order_id}}"
331 value:
332 event_type: "order.status_updated"
333 auto_produce:
334 enabled: true
335 trigger: "state_machine"
336 state_machine:
337 initial_state: "pending"
338"#
339 }
340
341 #[test]
342 fn parses_nested_topology() {
343 let file: KafkaFixtureFile = serde_yaml::from_str(sample_yaml()).unwrap();
344 assert_eq!(file.fixture.as_ref().unwrap().name, "E-commerce Order Events");
345 assert_eq!(file.cluster.as_ref().unwrap().cluster_id.as_deref(), Some("mockforge-cluster"));
346 assert_eq!(file.topics.len(), 2);
347 assert_eq!(file.topics[0].name, "orders.created");
348 assert_eq!(file.topics[0].partitions, 3);
349 assert_eq!(file.topics[0].messages.len(), 1);
350 }
351
352 #[test]
353 fn flatten_keeps_topics_and_emits_one_fixture_per_message() {
354 let file: KafkaFixtureFile = serde_yaml::from_str(sample_yaml()).unwrap();
355 let flat = file.flatten();
356 assert_eq!(flat.topics.len(), 2);
357 assert_eq!(flat.fixtures.len(), 2);
358 assert_eq!(flat.fixtures[0].identifier, "orders.created#0");
359 assert_eq!(flat.fixtures[0].topic, "orders.created");
360 assert!(flat.fixtures[0].auto_produce.as_ref().unwrap().enabled);
361 assert_eq!(flat.fixtures[0].auto_produce.as_ref().unwrap().rate_per_second, 10);
362 }
363
364 #[test]
365 fn state_machine_trigger_loads_without_auto_produce() {
366 let file: KafkaFixtureFile = serde_yaml::from_str(sample_yaml()).unwrap();
369 let flat = file.flatten();
370 let sm = &flat.fixtures[1];
371 assert_eq!(sm.topic, "orders.status-updated");
372 assert!(sm.auto_produce.is_none());
373 }
374
375 #[test]
376 fn missing_optional_fields_parse_with_defaults() {
377 let yaml = r#"
378topics:
379 - name: "plain"
380 messages:
381 - value: { k: "v" }
382"#;
383 let file: KafkaFixtureFile = serde_yaml::from_str(yaml).unwrap();
384 assert_eq!(file.topics[0].partitions, 1);
385 assert_eq!(file.topics[0].replication_factor, 1);
386 assert!(file.topics[0].messages[0].key_template.is_none());
387 }
388
389 #[test]
390 fn flattens_state_machine_spec_into_index() {
391 let yaml = r#"
392topics:
393 - name: "orders.status-updated"
394 messages:
395 - value: { event_type: "order.status_updated" }
396 auto_produce:
397 enabled: true
398 trigger: "state_machine"
399 state_machine:
400 initial_state: "pending"
401 states:
402 - name: "pending"
403 next_states: ["processing"]
404 probability: [1.0]
405 delay_ms: [1000, 2000]
406 - name: "processing"
407 next_states: ["shipped", "cancelled"]
408 probability: [0.9, 0.1]
409 delay_ms: [2000, 5000]
410 - name: "shipped"
411 next_states: []
412 - name: "cancelled"
413 next_states: []
414"#;
415 let file: KafkaFixtureFile = serde_yaml::from_str(yaml).unwrap();
416 let flat = file.flatten();
417 assert_eq!(flat.state_machines.len(), 1);
418 let (id, sm) = &flat.state_machines[0];
419 assert_eq!(id, "orders.status-updated#0");
420 assert_eq!(sm.initial_state, "pending");
421 assert_eq!(sm.states.len(), 4);
422 assert_eq!(sm.states[1].next_states, vec!["shipped", "cancelled"]);
423 assert_eq!(sm.states[1].probability, vec![0.9, 0.1]);
424 assert_eq!(sm.states[2].next_states, Vec::<String>::new());
425 }
426
427 #[test]
428 fn parses_scenarios_and_relationships_sections() {
429 let yaml = r#"
430topics:
431 - name: "orders.created"
432 messages:
433 - value: { k: "v" }
434scenarios:
435 - name: "Successful Order"
436 enabled: true
437 probability: 0.85
438 sequence:
439 - topic: "orders.created"
440 - topic: "payments.processed"
441 delay_ms: [1000, 3000]
442relationships:
443 - from_topic: "orders.created"
444 to_topic: "payments.processed"
445 relationship: "one_to_one"
446 key_mapping:
447 order_id: "order_id"
448"#;
449 let file: KafkaFixtureFile = serde_yaml::from_str(yaml).unwrap();
450 let flat = file.flatten();
451 assert_eq!(flat.scenarios.len(), 1);
452 assert_eq!(flat.scenarios[0].name, "Successful Order");
453 assert_eq!(flat.scenarios[0].probability, Some(0.85));
454 assert_eq!(flat.scenarios[0].sequence.len(), 2);
455 assert_eq!(flat.scenarios[0].sequence[1].topic, "payments.processed");
456 assert_eq!(flat.scenarios[0].sequence[1].delay_ms, vec![1000, 3000]);
457
458 assert_eq!(flat.relationships.len(), 1);
459 assert_eq!(flat.relationships[0].from_topic, "orders.created");
460 assert_eq!(flat.relationships[0].to_topic, "payments.processed");
461 assert_eq!(
462 flat.relationships[0].key_mapping.get("order_id"),
463 Some(&"order_id".to_string())
464 );
465 }
466
467 #[test]
468 fn unknown_top_level_sections_are_ignored() {
469 let yaml = r#"
472topics:
473 - name: "t"
474 messages:
475 - value: {}
476scenarios:
477 - name: "Ignored"
478failure_simulation:
479 broker_failures:
480 enabled: true
481monitoring:
482 prometheus:
483 enabled: true
484"#;
485 let file: KafkaFixtureFile = serde_yaml::from_str(yaml).unwrap();
486 assert_eq!(file.topics.len(), 1);
487 }
488}