mockforge_kafka/
fixtures.rs

1use chrono::Utc;
2use serde::{Deserialize, Serialize};
3use serde_yaml;
4use std::collections::HashMap;
5use std::fs;
6use std::path::PathBuf;
7use std::sync::Arc;
8use tokio::sync::RwLock;
9use tokio::time::{interval, Duration};
10
11/// Kafka fixture for message generation
12#[derive(Debug, Clone, Serialize, Deserialize)]
13pub struct KafkaFixture {
14    pub identifier: String,
15    pub name: String,
16    pub topic: String,
17    pub partition: Option<i32>,      // None = all partitions
18    pub key_pattern: Option<String>, // Template
19    pub value_template: serde_json::Value,
20    pub headers: std::collections::HashMap<String, String>,
21    pub auto_produce: Option<AutoProduceConfig>,
22}
23
24/// Configuration for auto-producing messages
25#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct AutoProduceConfig {
27    pub enabled: bool,
28    pub rate_per_second: u64,
29    pub duration_seconds: Option<u64>,
30    pub total_count: Option<usize>,
31}
32
33/// Auto-producer for fixtures
34pub struct AutoProducer {
35    fixtures: Arc<RwLock<HashMap<String, KafkaFixture>>>,
36    template_engine: mockforge_core::templating::TemplateEngine,
37    broker: Arc<super::broker::KafkaMockBroker>,
38}
39
40impl AutoProducer {
41    /// Create a new auto-producer
42    pub fn new(
43        broker: Arc<super::broker::KafkaMockBroker>,
44        template_engine: mockforge_core::templating::TemplateEngine,
45    ) -> Self {
46        Self {
47            fixtures: Arc::new(RwLock::new(HashMap::new())),
48            template_engine,
49            broker,
50        }
51    }
52
53    /// Add a fixture for auto-production
54    pub async fn add_fixture(&self, fixture: KafkaFixture) {
55        if fixture.auto_produce.as_ref().is_some_and(|ap| ap.enabled) {
56            let fixture_id = fixture.identifier.clone();
57            self.fixtures.write().await.insert(fixture_id, fixture);
58        }
59    }
60
61    /// Start auto-producing messages
62    pub async fn start(&self) -> anyhow::Result<()> {
63        let fixtures = self.fixtures.clone();
64        let _template_engine = self.template_engine.clone();
65        let _broker = self.broker.clone();
66
67        tokio::spawn(async move {
68            let mut interval = interval(Duration::from_secs(1));
69
70            loop {
71                interval.tick().await;
72
73                let fixtures_read = fixtures.read().await.clone();
74                for fixture in fixtures_read.values() {
75                    if let Some(auto_produce) = &fixture.auto_produce {
76                        if auto_produce.enabled {
77                            // Generate and produce messages
78                            for _ in 0..auto_produce.rate_per_second {
79                                if let Ok(message) = fixture.generate_message(&HashMap::new()) {
80                                    // Produce the message to the broker
81                                    let mut topics = _broker.topics.write().await;
82                                    if let Some(topic) = topics.get_mut(&fixture.topic) {
83                                        let partition = fixture.partition.unwrap_or_else(|| {
84                                            topic.assign_partition(message.key.as_deref())
85                                        });
86
87                                        if let Err(e) = topic.produce(partition, message).await {
88                                            tracing::error!(
89                                                "Failed to produce message to topic {}: {}",
90                                                fixture.topic,
91                                                e
92                                            );
93                                        } else {
94                                            tracing::debug!(
95                                                "Auto-produced message to topic {} partition {}",
96                                                fixture.topic,
97                                                partition
98                                            );
99                                        }
100                                    } else {
101                                        tracing::warn!(
102                                            "Topic {} not found for auto-production",
103                                            fixture.topic
104                                        );
105                                    }
106                                }
107                            }
108                        }
109                    }
110                }
111            }
112        });
113
114        Ok(())
115    }
116
117    /// Stop auto-producing for a specific fixture
118    pub async fn stop_fixture(&self, fixture_id: &str) {
119        if let Some(fixture) = self.fixtures.write().await.get_mut(fixture_id) {
120            if let Some(auto_produce) = &mut fixture.auto_produce {
121                auto_produce.enabled = false;
122            }
123        }
124    }
125}
126
127impl KafkaFixture {
128    /// Load fixtures from a directory
129    pub fn load_from_dir(dir: &PathBuf) -> mockforge_core::Result<Vec<Self>> {
130        let mut fixtures = Vec::new();
131        for entry in fs::read_dir(dir)? {
132            let entry = entry?;
133            let path = entry.path();
134            if path.extension().and_then(|s| s.to_str()) == Some("yaml")
135                || path.extension().and_then(|s| s.to_str()) == Some("yml")
136            {
137                let file = fs::File::open(&path)?;
138                let file_fixtures: Vec<Self> = serde_yaml::from_reader(file)?;
139                fixtures.extend(file_fixtures);
140            }
141        }
142        Ok(fixtures)
143    }
144
145    /// Generate a message using the fixture
146    pub fn generate_message(
147        &self,
148        context: &std::collections::HashMap<String, String>,
149    ) -> mockforge_core::Result<crate::partitions::KafkaMessage> {
150        // Render key if pattern provided
151        let key = self.key_pattern.as_ref().map(|pattern| self.render_template(pattern, context));
152
153        // Render value template
154        let value_str = serde_json::to_string(&self.value_template)?;
155        let value_rendered = self.render_template(&value_str, context);
156        let value = value_rendered.into_bytes();
157
158        // Render headers
159        let headers = self
160            .headers
161            .iter()
162            .map(|(k, v)| (k.clone(), self.render_template(v, context).into_bytes()))
163            .collect();
164
165        Ok(crate::partitions::KafkaMessage {
166            offset: 0,
167            timestamp: Utc::now().timestamp_millis(),
168            key: key.map(|k| k.into_bytes()),
169            value,
170            headers,
171        })
172    }
173
174    fn render_template(
175        &self,
176        template: &str,
177        context: &std::collections::HashMap<String, String>,
178    ) -> String {
179        let mut result = template.to_string();
180        for (key, value) in context {
181            result = result.replace(&format!("{{{{{}}}}}", key), value);
182        }
183        result
184    }
185}
186
187#[cfg(test)]
188mod tests {
189    use super::*;
190    use std::sync::Arc;
191    use tempfile::TempDir;
192
193    // ==================== KafkaFixture Tests ====================
194
195    #[test]
196    fn test_kafka_fixture_creation() {
197        let fixture = KafkaFixture {
198            identifier: "test-fixture".to_string(),
199            name: "Test Fixture".to_string(),
200            topic: "test-topic".to_string(),
201            partition: Some(0),
202            key_pattern: Some("key-{{id}}".to_string()),
203            value_template: serde_json::json!({"message": "test"}),
204            headers: HashMap::new(),
205            auto_produce: None,
206        };
207
208        assert_eq!(fixture.identifier, "test-fixture");
209        assert_eq!(fixture.topic, "test-topic");
210        assert_eq!(fixture.partition, Some(0));
211        assert!(fixture.auto_produce.is_none());
212    }
213
214    #[test]
215    fn test_kafka_fixture_with_auto_produce() {
216        let auto_produce = AutoProduceConfig {
217            enabled: true,
218            rate_per_second: 10,
219            duration_seconds: Some(60),
220            total_count: Some(100),
221        };
222
223        let fixture = KafkaFixture {
224            identifier: "auto-fixture".to_string(),
225            name: "Auto Fixture".to_string(),
226            topic: "auto-topic".to_string(),
227            partition: None,
228            key_pattern: None,
229            value_template: serde_json::json!({"auto": true}),
230            headers: HashMap::new(),
231            auto_produce: Some(auto_produce),
232        };
233
234        assert!(fixture.auto_produce.is_some());
235        let ap = fixture.auto_produce.as_ref().unwrap();
236        assert!(ap.enabled);
237        assert_eq!(ap.rate_per_second, 10);
238        assert_eq!(ap.duration_seconds, Some(60));
239        assert_eq!(ap.total_count, Some(100));
240    }
241
242    #[test]
243    fn test_kafka_fixture_clone() {
244        let fixture = KafkaFixture {
245            identifier: "clone-test".to_string(),
246            name: "Clone Test".to_string(),
247            topic: "clone-topic".to_string(),
248            partition: Some(1),
249            key_pattern: Some("key".to_string()),
250            value_template: serde_json::json!({"data": "value"}),
251            headers: HashMap::new(),
252            auto_produce: None,
253        };
254
255        let cloned = fixture.clone();
256        assert_eq!(fixture.identifier, cloned.identifier);
257        assert_eq!(fixture.topic, cloned.topic);
258        assert_eq!(fixture.partition, cloned.partition);
259    }
260
261    #[test]
262    fn test_kafka_fixture_serialize_deserialize() {
263        let fixture = KafkaFixture {
264            identifier: "serde-test".to_string(),
265            name: "Serde Test".to_string(),
266            topic: "serde-topic".to_string(),
267            partition: Some(0),
268            key_pattern: Some("key-pattern".to_string()),
269            value_template: serde_json::json!({"test": "data"}),
270            headers: HashMap::new(),
271            auto_produce: None,
272        };
273
274        let yaml = serde_yaml::to_string(&fixture).unwrap();
275        let deserialized: KafkaFixture = serde_yaml::from_str(&yaml).unwrap();
276
277        assert_eq!(fixture.identifier, deserialized.identifier);
278        assert_eq!(fixture.topic, deserialized.topic);
279    }
280
281    // ==================== AutoProduceConfig Tests ====================
282
283    #[test]
284    fn test_auto_produce_config_enabled() {
285        let config = AutoProduceConfig {
286            enabled: true,
287            rate_per_second: 5,
288            duration_seconds: None,
289            total_count: None,
290        };
291
292        assert!(config.enabled);
293        assert_eq!(config.rate_per_second, 5);
294        assert!(config.duration_seconds.is_none());
295        assert!(config.total_count.is_none());
296    }
297
298    #[test]
299    fn test_auto_produce_config_disabled() {
300        let config = AutoProduceConfig {
301            enabled: false,
302            rate_per_second: 0,
303            duration_seconds: None,
304            total_count: None,
305        };
306
307        assert!(!config.enabled);
308    }
309
310    #[test]
311    fn test_auto_produce_config_with_limits() {
312        let config = AutoProduceConfig {
313            enabled: true,
314            rate_per_second: 100,
315            duration_seconds: Some(300),
316            total_count: Some(10000),
317        };
318
319        assert_eq!(config.rate_per_second, 100);
320        assert_eq!(config.duration_seconds, Some(300));
321        assert_eq!(config.total_count, Some(10000));
322    }
323
324    #[test]
325    fn test_auto_produce_config_clone() {
326        let config = AutoProduceConfig {
327            enabled: true,
328            rate_per_second: 10,
329            duration_seconds: Some(60),
330            total_count: Some(100),
331        };
332
333        let cloned = config.clone();
334        assert_eq!(config.enabled, cloned.enabled);
335        assert_eq!(config.rate_per_second, cloned.rate_per_second);
336        assert_eq!(config.duration_seconds, cloned.duration_seconds);
337        assert_eq!(config.total_count, cloned.total_count);
338    }
339
340    // ==================== KafkaFixture::generate_message Tests ====================
341
342    #[test]
343    fn test_generate_message_basic() {
344        let fixture = KafkaFixture {
345            identifier: "msg-test".to_string(),
346            name: "Message Test".to_string(),
347            topic: "test-topic".to_string(),
348            partition: Some(0),
349            key_pattern: None,
350            value_template: serde_json::json!({"message": "hello"}),
351            headers: HashMap::new(),
352            auto_produce: None,
353        };
354
355        let context = HashMap::new();
356        let message = fixture.generate_message(&context).unwrap();
357
358        assert!(message.key.is_none());
359        assert!(!message.value.is_empty());
360        assert_eq!(message.offset, 0);
361        assert!(message.timestamp > 0);
362    }
363
364    #[test]
365    fn test_generate_message_with_key() {
366        let fixture = KafkaFixture {
367            identifier: "key-test".to_string(),
368            name: "Key Test".to_string(),
369            topic: "test-topic".to_string(),
370            partition: Some(0),
371            key_pattern: Some("order-12345".to_string()),
372            value_template: serde_json::json!({"order": "data"}),
373            headers: HashMap::new(),
374            auto_produce: None,
375        };
376
377        let context = HashMap::new();
378        let message = fixture.generate_message(&context).unwrap();
379
380        assert!(message.key.is_some());
381        assert_eq!(message.key.unwrap(), b"order-12345".to_vec());
382    }
383
384    #[test]
385    fn test_generate_message_with_template_substitution() {
386        let fixture = KafkaFixture {
387            identifier: "template-test".to_string(),
388            name: "Template Test".to_string(),
389            topic: "test-topic".to_string(),
390            partition: Some(0),
391            key_pattern: Some("user-{{userId}}".to_string()),
392            value_template: serde_json::json!({"userId": "{{userId}}", "action": "login"}),
393            headers: HashMap::new(),
394            auto_produce: None,
395        };
396
397        let mut context = HashMap::new();
398        context.insert("userId".to_string(), "123".to_string());
399
400        let message = fixture.generate_message(&context).unwrap();
401
402        assert!(message.key.is_some());
403        assert_eq!(message.key.unwrap(), b"user-123".to_vec());
404
405        let value_str = String::from_utf8(message.value).unwrap();
406        assert!(value_str.contains("123"));
407    }
408
409    #[test]
410    fn test_generate_message_with_headers() {
411        let mut headers = HashMap::new();
412        headers.insert("correlation-id".to_string(), "abc-123".to_string());
413        headers.insert("source".to_string(), "test-service".to_string());
414
415        let fixture = KafkaFixture {
416            identifier: "header-test".to_string(),
417            name: "Header Test".to_string(),
418            topic: "test-topic".to_string(),
419            partition: Some(0),
420            key_pattern: None,
421            value_template: serde_json::json!({"data": "test"}),
422            headers,
423            auto_produce: None,
424        };
425
426        let context = HashMap::new();
427        let message = fixture.generate_message(&context).unwrap();
428
429        assert_eq!(message.headers.len(), 2);
430        assert!(message.headers.iter().any(|(k, _)| k == "correlation-id"));
431        assert!(message.headers.iter().any(|(k, _)| k == "source"));
432    }
433
434    #[test]
435    fn test_generate_message_with_template_headers() {
436        let mut headers = HashMap::new();
437        headers.insert("trace-id".to_string(), "trace-{{traceId}}".to_string());
438
439        let fixture = KafkaFixture {
440            identifier: "header-template-test".to_string(),
441            name: "Header Template Test".to_string(),
442            topic: "test-topic".to_string(),
443            partition: Some(0),
444            key_pattern: None,
445            value_template: serde_json::json!({"data": "test"}),
446            headers,
447            auto_produce: None,
448        };
449
450        let mut context = HashMap::new();
451        context.insert("traceId".to_string(), "xyz789".to_string());
452
453        let message = fixture.generate_message(&context).unwrap();
454
455        let trace_header = message.headers.iter().find(|(k, _)| k == "trace-id");
456        assert!(trace_header.is_some());
457        assert_eq!(trace_header.unwrap().1, b"trace-xyz789".to_vec());
458    }
459
460    #[test]
461    fn test_generate_message_empty_context() {
462        let fixture = KafkaFixture {
463            identifier: "empty-context".to_string(),
464            name: "Empty Context".to_string(),
465            topic: "test-topic".to_string(),
466            partition: Some(0),
467            key_pattern: Some("static-key".to_string()),
468            value_template: serde_json::json!({"static": "value"}),
469            headers: HashMap::new(),
470            auto_produce: None,
471        };
472
473        let context = HashMap::new();
474        let message = fixture.generate_message(&context).unwrap();
475
476        assert!(message.key.is_some());
477        assert_eq!(message.key.unwrap(), b"static-key".to_vec());
478    }
479
480    // ==================== KafkaFixture::render_template Tests ====================
481
482    #[test]
483    fn test_render_template_no_substitution() {
484        let fixture = KafkaFixture {
485            identifier: "render-test".to_string(),
486            name: "Render Test".to_string(),
487            topic: "test-topic".to_string(),
488            partition: Some(0),
489            key_pattern: None,
490            value_template: serde_json::json!({}),
491            headers: HashMap::new(),
492            auto_produce: None,
493        };
494
495        let context = HashMap::new();
496        let result = fixture.render_template("static text", &context);
497        assert_eq!(result, "static text");
498    }
499
500    #[test]
501    fn test_render_template_single_substitution() {
502        let fixture = KafkaFixture {
503            identifier: "render-test".to_string(),
504            name: "Render Test".to_string(),
505            topic: "test-topic".to_string(),
506            partition: Some(0),
507            key_pattern: None,
508            value_template: serde_json::json!({}),
509            headers: HashMap::new(),
510            auto_produce: None,
511        };
512
513        let mut context = HashMap::new();
514        context.insert("name".to_string(), "Alice".to_string());
515
516        let result = fixture.render_template("Hello {{name}}", &context);
517        assert_eq!(result, "Hello Alice");
518    }
519
520    #[test]
521    fn test_render_template_multiple_substitutions() {
522        let fixture = KafkaFixture {
523            identifier: "render-test".to_string(),
524            name: "Render Test".to_string(),
525            topic: "test-topic".to_string(),
526            partition: Some(0),
527            key_pattern: None,
528            value_template: serde_json::json!({}),
529            headers: HashMap::new(),
530            auto_produce: None,
531        };
532
533        let mut context = HashMap::new();
534        context.insert("first".to_string(), "John".to_string());
535        context.insert("last".to_string(), "Doe".to_string());
536
537        let result = fixture.render_template("{{first}} {{last}}", &context);
538        assert_eq!(result, "John Doe");
539    }
540
541    #[test]
542    fn test_render_template_missing_variable() {
543        let fixture = KafkaFixture {
544            identifier: "render-test".to_string(),
545            name: "Render Test".to_string(),
546            topic: "test-topic".to_string(),
547            partition: Some(0),
548            key_pattern: None,
549            value_template: serde_json::json!({}),
550            headers: HashMap::new(),
551            auto_produce: None,
552        };
553
554        let context = HashMap::new();
555        let result = fixture.render_template("Hello {{name}}", &context);
556        // Missing variables are left as-is
557        assert_eq!(result, "Hello {{name}}");
558    }
559
560    // ==================== KafkaFixture::load_from_dir Tests ====================
561
562    #[test]
563    fn test_load_from_dir_empty_directory() {
564        let temp_dir = TempDir::new().unwrap();
565        let result = KafkaFixture::load_from_dir(&temp_dir.path().to_path_buf()).unwrap();
566        assert!(result.is_empty());
567    }
568
569    #[test]
570    fn test_load_from_dir_with_yaml_files() {
571        let temp_dir = TempDir::new().unwrap();
572        let fixture_path = temp_dir.path().join("fixtures.yaml");
573
574        let fixtures = vec![KafkaFixture {
575            identifier: "test-fixture".to_string(),
576            name: "Test Fixture".to_string(),
577            topic: "test-topic".to_string(),
578            partition: Some(0),
579            key_pattern: None,
580            value_template: serde_json::json!({"test": "data"}),
581            headers: HashMap::new(),
582            auto_produce: None,
583        }];
584
585        let yaml_content = serde_yaml::to_string(&fixtures).unwrap();
586        fs::write(&fixture_path, yaml_content).unwrap();
587
588        let loaded = KafkaFixture::load_from_dir(&temp_dir.path().to_path_buf()).unwrap();
589        assert_eq!(loaded.len(), 1);
590        assert_eq!(loaded[0].identifier, "test-fixture");
591    }
592
593    #[test]
594    fn test_load_from_dir_with_yml_extension() {
595        let temp_dir = TempDir::new().unwrap();
596        let fixture_path = temp_dir.path().join("fixtures.yml");
597
598        let fixtures = vec![KafkaFixture {
599            identifier: "yml-test".to_string(),
600            name: "YML Test".to_string(),
601            topic: "yml-topic".to_string(),
602            partition: None,
603            key_pattern: None,
604            value_template: serde_json::json!({"yml": true}),
605            headers: HashMap::new(),
606            auto_produce: None,
607        }];
608
609        let yaml_content = serde_yaml::to_string(&fixtures).unwrap();
610        fs::write(&fixture_path, yaml_content).unwrap();
611
612        let loaded = KafkaFixture::load_from_dir(&temp_dir.path().to_path_buf()).unwrap();
613        assert_eq!(loaded.len(), 1);
614        assert_eq!(loaded[0].identifier, "yml-test");
615    }
616
617    #[test]
618    fn test_load_from_dir_ignores_non_yaml_files() {
619        let temp_dir = TempDir::new().unwrap();
620        let txt_path = temp_dir.path().join("readme.txt");
621        fs::write(&txt_path, "This is not a YAML file").unwrap();
622
623        let loaded = KafkaFixture::load_from_dir(&temp_dir.path().to_path_buf()).unwrap();
624        assert!(loaded.is_empty());
625    }
626
627    #[test]
628    fn test_load_from_dir_multiple_files() {
629        let temp_dir = TempDir::new().unwrap();
630
631        let fixtures1 = vec![KafkaFixture {
632            identifier: "fixture-1".to_string(),
633            name: "Fixture 1".to_string(),
634            topic: "topic-1".to_string(),
635            partition: None,
636            key_pattern: None,
637            value_template: serde_json::json!({"id": 1}),
638            headers: HashMap::new(),
639            auto_produce: None,
640        }];
641
642        let fixtures2 = vec![KafkaFixture {
643            identifier: "fixture-2".to_string(),
644            name: "Fixture 2".to_string(),
645            topic: "topic-2".to_string(),
646            partition: None,
647            key_pattern: None,
648            value_template: serde_json::json!({"id": 2}),
649            headers: HashMap::new(),
650            auto_produce: None,
651        }];
652
653        fs::write(
654            temp_dir.path().join("fixtures1.yaml"),
655            serde_yaml::to_string(&fixtures1).unwrap(),
656        )
657        .unwrap();
658
659        fs::write(
660            temp_dir.path().join("fixtures2.yaml"),
661            serde_yaml::to_string(&fixtures2).unwrap(),
662        )
663        .unwrap();
664
665        let loaded = KafkaFixture::load_from_dir(&temp_dir.path().to_path_buf()).unwrap();
666        assert_eq!(loaded.len(), 2);
667    }
668
669    #[test]
670    fn test_load_from_dir_nonexistent() {
671        let result = KafkaFixture::load_from_dir(&PathBuf::from("/nonexistent/path"));
672        assert!(result.is_err());
673    }
674
675    // ==================== AutoProducer Tests ====================
676
677    #[tokio::test]
678    async fn test_auto_producer_creation() {
679        let config = mockforge_core::config::KafkaConfig::default();
680        let broker = Arc::new(crate::broker::KafkaMockBroker::new(config).await.unwrap());
681        let template_engine = mockforge_core::templating::TemplateEngine::new();
682
683        let producer = AutoProducer::new(broker, template_engine);
684        let fixtures = producer.fixtures.read().await;
685        assert!(fixtures.is_empty());
686    }
687
688    #[tokio::test]
689    async fn test_auto_producer_add_fixture_enabled() {
690        let config = mockforge_core::config::KafkaConfig::default();
691        let broker = Arc::new(crate::broker::KafkaMockBroker::new(config).await.unwrap());
692        let template_engine = mockforge_core::templating::TemplateEngine::new();
693
694        let producer = AutoProducer::new(broker, template_engine);
695
696        let fixture = KafkaFixture {
697            identifier: "auto-enabled".to_string(),
698            name: "Auto Enabled".to_string(),
699            topic: "auto-topic".to_string(),
700            partition: None,
701            key_pattern: None,
702            value_template: serde_json::json!({"auto": true}),
703            headers: HashMap::new(),
704            auto_produce: Some(AutoProduceConfig {
705                enabled: true,
706                rate_per_second: 1,
707                duration_seconds: None,
708                total_count: None,
709            }),
710        };
711
712        producer.add_fixture(fixture).await;
713
714        let fixtures = producer.fixtures.read().await;
715        assert_eq!(fixtures.len(), 1);
716        assert!(fixtures.contains_key("auto-enabled"));
717    }
718
719    #[tokio::test]
720    async fn test_auto_producer_add_fixture_disabled() {
721        let config = mockforge_core::config::KafkaConfig::default();
722        let broker = Arc::new(crate::broker::KafkaMockBroker::new(config).await.unwrap());
723        let template_engine = mockforge_core::templating::TemplateEngine::new();
724
725        let producer = AutoProducer::new(broker, template_engine);
726
727        let fixture = KafkaFixture {
728            identifier: "auto-disabled".to_string(),
729            name: "Auto Disabled".to_string(),
730            topic: "disabled-topic".to_string(),
731            partition: None,
732            key_pattern: None,
733            value_template: serde_json::json!({"auto": false}),
734            headers: HashMap::new(),
735            auto_produce: Some(AutoProduceConfig {
736                enabled: false,
737                rate_per_second: 1,
738                duration_seconds: None,
739                total_count: None,
740            }),
741        };
742
743        producer.add_fixture(fixture).await;
744
745        let fixtures = producer.fixtures.read().await;
746        assert!(fixtures.is_empty());
747    }
748
749    #[tokio::test]
750    async fn test_auto_producer_add_fixture_no_auto_produce() {
751        let config = mockforge_core::config::KafkaConfig::default();
752        let broker = Arc::new(crate::broker::KafkaMockBroker::new(config).await.unwrap());
753        let template_engine = mockforge_core::templating::TemplateEngine::new();
754
755        let producer = AutoProducer::new(broker, template_engine);
756
757        let fixture = KafkaFixture {
758            identifier: "no-auto".to_string(),
759            name: "No Auto".to_string(),
760            topic: "manual-topic".to_string(),
761            partition: None,
762            key_pattern: None,
763            value_template: serde_json::json!({"manual": true}),
764            headers: HashMap::new(),
765            auto_produce: None,
766        };
767
768        producer.add_fixture(fixture).await;
769
770        let fixtures = producer.fixtures.read().await;
771        assert!(fixtures.is_empty());
772    }
773
774    #[tokio::test]
775    async fn test_auto_producer_stop_fixture() {
776        let config = mockforge_core::config::KafkaConfig::default();
777        let broker = Arc::new(crate::broker::KafkaMockBroker::new(config).await.unwrap());
778        let template_engine = mockforge_core::templating::TemplateEngine::new();
779
780        let producer = AutoProducer::new(broker, template_engine);
781
782        let fixture = KafkaFixture {
783            identifier: "stop-test".to_string(),
784            name: "Stop Test".to_string(),
785            topic: "stop-topic".to_string(),
786            partition: None,
787            key_pattern: None,
788            value_template: serde_json::json!({"test": true}),
789            headers: HashMap::new(),
790            auto_produce: Some(AutoProduceConfig {
791                enabled: true,
792                rate_per_second: 1,
793                duration_seconds: None,
794                total_count: None,
795            }),
796        };
797
798        producer.add_fixture(fixture).await;
799        producer.stop_fixture("stop-test").await;
800
801        let fixtures = producer.fixtures.read().await;
802        let fixture = fixtures.get("stop-test");
803        assert!(fixture.is_some());
804        assert_eq!(fixture.unwrap().auto_produce.as_ref().unwrap().enabled, false);
805    }
806
807    #[tokio::test]
808    async fn test_auto_producer_stop_nonexistent_fixture() {
809        let config = mockforge_core::config::KafkaConfig::default();
810        let broker = Arc::new(crate::broker::KafkaMockBroker::new(config).await.unwrap());
811        let template_engine = mockforge_core::templating::TemplateEngine::new();
812
813        let producer = AutoProducer::new(broker, template_engine);
814        producer.stop_fixture("nonexistent").await;
815
816        // Should not panic
817        let fixtures = producer.fixtures.read().await;
818        assert!(fixtures.is_empty());
819    }
820}