Skip to main content

mockforge_kafka/
fixture_file.rs

1//! Nested "topology" YAML fixture format for Kafka.
2//!
3//! The older `KafkaFixture` shape is a flat record describing one message
4//! template. Realistic fixture files want to describe a whole cluster:
5//! multiple topics with partition counts, per-topic configs, and a list of
6//! message templates under each topic. This module defines that richer
7//! structure and a `flatten()` method that expands it into:
8//!
9//!   * `Vec<KafkaTopicSpec>` — one entry per topic, used by the Metadata
10//!     response to advertise real topics/partitions.
11//!   * `Vec<KafkaFixture>`  — one entry per message template, fed into the
12//!     existing `KafkaSpecRegistry` so nothing else has to change to keep
13//!     working.
14//!
15//! Unknown fields are silently ignored so advanced YAML sections
16//! (failure_simulation, monitoring) don't break the load. The three
17//! trigger sections the issue calls for — state_machine, scenarios, and
18//! relationships — are now first-class: they deserialize into concrete
19//! structs that `fixture_executor` consumes at broker startup.
20
21use crate::fixtures::{AutoProduceConfig, KafkaFixture};
22use serde::{Deserialize, Serialize};
23use std::collections::HashMap;
24
25/// Top-level shape of a Kafka fixture YAML file.
26#[derive(Debug, Clone, Serialize, Deserialize, Default)]
27pub struct KafkaFixtureFile {
28    /// Human-readable metadata about the fixture file. Optional.
29    #[serde(default)]
30    pub fixture: Option<FixtureMeta>,
31    /// Cluster-level configuration. Optional.
32    #[serde(default)]
33    pub cluster: Option<ClusterSpec>,
34    /// Topics described by this file.
35    #[serde(default)]
36    pub topics: Vec<KafkaTopicSpec>,
37    /// Scenario-based sequences of topic emissions. Each scenario fires
38    /// once on startup if `enabled` and survives random sampling against
39    /// `probability`.
40    #[serde(default)]
41    pub scenarios: Vec<ScenarioSpec>,
42    /// Causal links — producing to `from_topic` triggers a dependent
43    /// emission on `to_topic` with correlated keys.
44    #[serde(default)]
45    pub relationships: Vec<RelationshipSpec>,
46}
47
48#[derive(Debug, Clone, Serialize, Deserialize, Default)]
49pub struct FixtureMeta {
50    pub name: String,
51    #[serde(default)]
52    pub description: Option<String>,
53    #[serde(default)]
54    pub protocol: Option<String>,
55}
56
57#[derive(Debug, Clone, Serialize, Deserialize, Default)]
58pub struct ClusterSpec {
59    #[serde(default)]
60    pub bootstrap_servers: Option<String>,
61    #[serde(default)]
62    pub cluster_id: Option<String>,
63}
64
65/// One topic described in the YAML file. `partitions` / `replication_factor`
66/// flow into the Metadata response; `messages` flow into `KafkaFixture`s.
67#[derive(Debug, Clone, Serialize, Deserialize)]
68pub struct KafkaTopicSpec {
69    pub name: String,
70    #[serde(default = "default_partitions")]
71    pub partitions: i32,
72    #[serde(default = "default_replication_factor")]
73    pub replication_factor: i16,
74    #[serde(default)]
75    pub partitioning: Option<serde_json::Value>,
76    #[serde(default)]
77    pub config: Option<serde_json::Value>,
78    #[serde(default)]
79    pub messages: Vec<KafkaMessageSpec>,
80}
81
82fn default_partitions() -> i32 {
83    1
84}
85fn default_replication_factor() -> i16 {
86    1
87}
88
89/// One message template under a topic.
90#[derive(Debug, Clone, Serialize, Deserialize)]
91pub struct KafkaMessageSpec {
92    #[serde(default)]
93    pub key_template: Option<String>,
94    pub value: serde_json::Value,
95    #[serde(default)]
96    pub headers: HashMap<String, String>,
97    #[serde(default)]
98    pub partition: Option<i32>,
99    #[serde(default)]
100    pub auto_produce: Option<MessageAutoProduce>,
101}
102
103/// A superset of `AutoProduceConfig`. Simple rate-limited auto-produce
104/// fields are first-class; `state_machine` drives the probabilistic state
105/// graph executor when `trigger == "state_machine"`.
106#[derive(Debug, Clone, Serialize, Deserialize)]
107pub struct MessageAutoProduce {
108    #[serde(default)]
109    pub enabled: bool,
110    #[serde(default)]
111    pub rate_per_second: Option<u64>,
112    #[serde(default)]
113    pub duration_seconds: Option<u64>,
114    #[serde(default)]
115    pub total_count: Option<usize>,
116    #[serde(default)]
117    pub partition: Option<i32>,
118    /// `"rate"` (default) drives the rate-based `AutoProducer`.
119    /// `"state_machine"` drives `fixture_executor::StateMachineExecutor`.
120    /// Other values deserialize cleanly but don't hook anything up.
121    #[serde(default)]
122    pub trigger: Option<String>,
123    /// Graph definition used by the `"state_machine"` trigger.
124    #[serde(default)]
125    pub state_machine: Option<StateMachineSpec>,
126}
127
128/// A probabilistic state graph that emits one message per state visit.
129#[derive(Debug, Clone, Serialize, Deserialize)]
130pub struct StateMachineSpec {
131    pub initial_state: String,
132    #[serde(default)]
133    pub states: Vec<StateSpec>,
134}
135
136/// One node in a `StateMachineSpec`.
137///
138/// Terminal states have `next_states` empty — the executor stops when it
139/// reaches one. When `next_states` has entries, `probability` (same length)
140/// selects which one to visit next, and `delay_ms` (a `[min, max]` pair)
141/// controls how long before the transition fires.
142#[derive(Debug, Clone, Serialize, Deserialize)]
143pub struct StateSpec {
144    pub name: String,
145    #[serde(default)]
146    pub next_states: Vec<String>,
147    #[serde(default)]
148    pub probability: Vec<f64>,
149    /// `[min_ms, max_ms]` — sampled uniformly. Absent = fire immediately.
150    #[serde(default)]
151    pub delay_ms: Vec<u64>,
152}
153
154/// One top-level scenario: a sequence of topic emissions fired once when
155/// the broker starts, gated by `enabled` and `probability`.
156#[derive(Debug, Clone, Serialize, Deserialize)]
157pub struct ScenarioSpec {
158    pub name: String,
159    #[serde(default = "yes")]
160    pub enabled: bool,
161    /// 0.0–1.0 chance the scenario actually runs. Absent = always run.
162    #[serde(default)]
163    pub probability: Option<f64>,
164    #[serde(default)]
165    pub sequence: Vec<ScenarioStep>,
166}
167
168fn yes() -> bool {
169    true
170}
171
172/// One step inside a scenario: emit a message on `topic` after
173/// `delay_ms[0]..=delay_ms[1]` ms (absent = fire immediately).
174///
175/// `message` is a free-form identifier in the fixture file (e.g.
176/// `"order_created_template"`). Today it's informational — the executor
177/// emits the first known message template for the referenced topic. A
178/// later PR can make `message` select among the topic's templates.
179#[derive(Debug, Clone, Serialize, Deserialize)]
180pub struct ScenarioStep {
181    pub topic: String,
182    #[serde(default)]
183    pub message: Option<String>,
184    /// `[min_ms, max_ms]` — sampled uniformly. Absent = fire immediately.
185    #[serde(default)]
186    pub delay_ms: Vec<u64>,
187}
188
189/// A causal link: when a record lands on `from_topic`, emit a record on
190/// `to_topic` using `key_mapping` to correlate identifiers.
191#[derive(Debug, Clone, Serialize, Deserialize)]
192pub struct RelationshipSpec {
193    pub from_topic: String,
194    pub to_topic: String,
195    /// `"one_to_one"` / `"one_to_many"` — informational; the executor
196    /// always emits exactly one `to_topic` record per `from_topic` record,
197    /// and 1-to-many fan-out is expressed through multiple relationships
198    /// or through the rate/state-machine triggers on `to_topic`.
199    #[serde(default)]
200    pub relationship: Option<String>,
201    /// Map of `source_field -> target_field`. For each entry, the
202    /// executor pulls `source_field` out of the source record's JSON
203    /// value and puts it in the rendering context under `target_field`.
204    /// If the source value isn't valid JSON, the raw message key is used
205    /// as the value for every mapping entry.
206    #[serde(default)]
207    pub key_mapping: HashMap<String, String>,
208}
209
210/// Result of expanding a `KafkaFixtureFile` for consumption downstream.
211#[derive(Debug, Default)]
212pub struct FlattenedFixtures {
213    /// Topic definitions — used by the Metadata response.
214    pub topics: Vec<KafkaTopicSpec>,
215    /// Message-level fixtures — stored in `KafkaSpecRegistry` keyed by topic.
216    pub fixtures: Vec<KafkaFixture>,
217    /// Scenarios aggregated from every fixture file. Executed once on
218    /// broker startup by `fixture_executor`.
219    pub scenarios: Vec<ScenarioSpec>,
220    /// Relationships aggregated from every fixture file. Fire on every
221    /// successful produce.
222    pub relationships: Vec<RelationshipSpec>,
223    /// State-machine definitions keyed by the fixture identifier
224    /// (`{topic}#{index}`) that owns them. Drives the state-machine
225    /// executor.
226    pub state_machines: Vec<(String, StateMachineSpec)>,
227}
228
229impl KafkaFixtureFile {
230    /// Expand this file into `(topic specs, flat fixtures)`.
231    ///
232    /// Each `KafkaMessageSpec` becomes exactly one `KafkaFixture` with a
233    /// synthetic identifier of the form `{topic}#{index}`. Only the
234    /// rate-based auto_produce branch is forwarded; state-machine-driven
235    /// messages load cleanly but don't emit an `AutoProduceConfig`.
236    pub fn flatten(self) -> FlattenedFixtures {
237        let mut fixtures = Vec::new();
238        let mut state_machines = Vec::new();
239        for topic in &self.topics {
240            for (i, msg) in topic.messages.iter().enumerate() {
241                fixtures.push(message_to_fixture(topic, i, msg));
242                if let Some(ap) = &msg.auto_produce {
243                    if ap.enabled && ap.trigger.as_deref() == Some("state_machine") {
244                        if let Some(sm) = &ap.state_machine {
245                            state_machines.push((format!("{}#{}", topic.name, i), sm.clone()));
246                        }
247                    }
248                }
249            }
250        }
251        FlattenedFixtures {
252            topics: self.topics,
253            fixtures,
254            scenarios: self.scenarios,
255            relationships: self.relationships,
256            state_machines,
257        }
258    }
259}
260
261fn message_to_fixture(
262    topic: &KafkaTopicSpec,
263    index: usize,
264    msg: &KafkaMessageSpec,
265) -> KafkaFixture {
266    let auto = msg.auto_produce.as_ref().and_then(|ap| {
267        // Only forward rate-based triggers into the existing AutoProducer.
268        // Anything else is preserved at the file level but not yet honored.
269        match ap.trigger.as_deref() {
270            None | Some("rate") => ap.rate_per_second.map(|rate| AutoProduceConfig {
271                enabled: ap.enabled,
272                rate_per_second: rate,
273                duration_seconds: ap.duration_seconds,
274                total_count: ap.total_count,
275            }),
276            _ => None,
277        }
278    });
279
280    KafkaFixture {
281        identifier: format!("{}#{}", topic.name, index),
282        name: format!("{} message {}", topic.name, index),
283        topic: topic.name.clone(),
284        partition: msg.partition.or_else(|| msg.auto_produce.as_ref().and_then(|a| a.partition)),
285        key_pattern: msg.key_template.clone(),
286        value_template: msg.value.clone(),
287        headers: msg.headers.clone(),
288        auto_produce: auto,
289    }
290}
291
292#[cfg(test)]
293mod tests {
294    use super::*;
295
296    fn sample_yaml() -> &'static str {
297        r#"
298fixture:
299  name: "E-commerce Order Events"
300  description: "demo"
301  protocol: kafka
302
303cluster:
304  bootstrap_servers: "localhost:9092"
305  cluster_id: "mockforge-cluster"
306
307topics:
308  - name: "orders.created"
309    partitions: 3
310    replication_factor: 1
311    partitioning:
312      strategy: "key_hash"
313    config:
314      retention_ms: 604800000
315    messages:
316      - key_template: "order-{{uuid}}"
317        value:
318          event_type: "order.created"
319          order_id: "{{uuid}}"
320        headers:
321          event_version: "1.0"
322        auto_produce:
323          enabled: true
324          rate_per_second: 10
325          partition: null
326
327  - name: "orders.status-updated"
328    partitions: 3
329    messages:
330      - key_template: "{{context.order_id}}"
331        value:
332          event_type: "order.status_updated"
333        auto_produce:
334          enabled: true
335          trigger: "state_machine"
336          state_machine:
337            initial_state: "pending"
338"#
339    }
340
341    #[test]
342    fn parses_nested_topology() {
343        let file: KafkaFixtureFile = serde_yaml::from_str(sample_yaml()).unwrap();
344        assert_eq!(file.fixture.as_ref().unwrap().name, "E-commerce Order Events");
345        assert_eq!(file.cluster.as_ref().unwrap().cluster_id.as_deref(), Some("mockforge-cluster"));
346        assert_eq!(file.topics.len(), 2);
347        assert_eq!(file.topics[0].name, "orders.created");
348        assert_eq!(file.topics[0].partitions, 3);
349        assert_eq!(file.topics[0].messages.len(), 1);
350    }
351
352    #[test]
353    fn flatten_keeps_topics_and_emits_one_fixture_per_message() {
354        let file: KafkaFixtureFile = serde_yaml::from_str(sample_yaml()).unwrap();
355        let flat = file.flatten();
356        assert_eq!(flat.topics.len(), 2);
357        assert_eq!(flat.fixtures.len(), 2);
358        assert_eq!(flat.fixtures[0].identifier, "orders.created#0");
359        assert_eq!(flat.fixtures[0].topic, "orders.created");
360        assert!(flat.fixtures[0].auto_produce.as_ref().unwrap().enabled);
361        assert_eq!(flat.fixtures[0].auto_produce.as_ref().unwrap().rate_per_second, 10);
362    }
363
364    #[test]
365    fn state_machine_trigger_loads_without_auto_produce() {
366        // Advanced triggers should parse successfully but not emit an
367        // AutoProduceConfig (the rate-based executor would misinterpret them).
368        let file: KafkaFixtureFile = serde_yaml::from_str(sample_yaml()).unwrap();
369        let flat = file.flatten();
370        let sm = &flat.fixtures[1];
371        assert_eq!(sm.topic, "orders.status-updated");
372        assert!(sm.auto_produce.is_none());
373    }
374
375    #[test]
376    fn missing_optional_fields_parse_with_defaults() {
377        let yaml = r#"
378topics:
379  - name: "plain"
380    messages:
381      - value: { k: "v" }
382"#;
383        let file: KafkaFixtureFile = serde_yaml::from_str(yaml).unwrap();
384        assert_eq!(file.topics[0].partitions, 1);
385        assert_eq!(file.topics[0].replication_factor, 1);
386        assert!(file.topics[0].messages[0].key_template.is_none());
387    }
388
389    #[test]
390    fn flattens_state_machine_spec_into_index() {
391        let yaml = r#"
392topics:
393  - name: "orders.status-updated"
394    messages:
395      - value: { event_type: "order.status_updated" }
396        auto_produce:
397          enabled: true
398          trigger: "state_machine"
399          state_machine:
400            initial_state: "pending"
401            states:
402              - name: "pending"
403                next_states: ["processing"]
404                probability: [1.0]
405                delay_ms: [1000, 2000]
406              - name: "processing"
407                next_states: ["shipped", "cancelled"]
408                probability: [0.9, 0.1]
409                delay_ms: [2000, 5000]
410              - name: "shipped"
411                next_states: []
412              - name: "cancelled"
413                next_states: []
414"#;
415        let file: KafkaFixtureFile = serde_yaml::from_str(yaml).unwrap();
416        let flat = file.flatten();
417        assert_eq!(flat.state_machines.len(), 1);
418        let (id, sm) = &flat.state_machines[0];
419        assert_eq!(id, "orders.status-updated#0");
420        assert_eq!(sm.initial_state, "pending");
421        assert_eq!(sm.states.len(), 4);
422        assert_eq!(sm.states[1].next_states, vec!["shipped", "cancelled"]);
423        assert_eq!(sm.states[1].probability, vec![0.9, 0.1]);
424        assert_eq!(sm.states[2].next_states, Vec::<String>::new());
425    }
426
427    #[test]
428    fn parses_scenarios_and_relationships_sections() {
429        let yaml = r#"
430topics:
431  - name: "orders.created"
432    messages:
433      - value: { k: "v" }
434scenarios:
435  - name: "Successful Order"
436    enabled: true
437    probability: 0.85
438    sequence:
439      - topic: "orders.created"
440      - topic: "payments.processed"
441        delay_ms: [1000, 3000]
442relationships:
443  - from_topic: "orders.created"
444    to_topic: "payments.processed"
445    relationship: "one_to_one"
446    key_mapping:
447      order_id: "order_id"
448"#;
449        let file: KafkaFixtureFile = serde_yaml::from_str(yaml).unwrap();
450        let flat = file.flatten();
451        assert_eq!(flat.scenarios.len(), 1);
452        assert_eq!(flat.scenarios[0].name, "Successful Order");
453        assert_eq!(flat.scenarios[0].probability, Some(0.85));
454        assert_eq!(flat.scenarios[0].sequence.len(), 2);
455        assert_eq!(flat.scenarios[0].sequence[1].topic, "payments.processed");
456        assert_eq!(flat.scenarios[0].sequence[1].delay_ms, vec![1000, 3000]);
457
458        assert_eq!(flat.relationships.len(), 1);
459        assert_eq!(flat.relationships[0].from_topic, "orders.created");
460        assert_eq!(flat.relationships[0].to_topic, "payments.processed");
461        assert_eq!(
462            flat.relationships[0].key_mapping.get("order_id"),
463            Some(&"order_id".to_string())
464        );
465    }
466
467    #[test]
468    fn unknown_top_level_sections_are_ignored() {
469        // Real fixtures include relationships/scenarios/failure_simulation —
470        // those aren't implemented yet and must not break the load.
471        let yaml = r#"
472topics:
473  - name: "t"
474    messages:
475      - value: {}
476scenarios:
477  - name: "Ignored"
478failure_simulation:
479  broker_failures:
480    enabled: true
481monitoring:
482  prometheus:
483    enabled: true
484"#;
485        let file: KafkaFixtureFile = serde_yaml::from_str(yaml).unwrap();
486        assert_eq!(file.topics.len(), 1);
487    }
488}