1use chrono::Utc;
2use serde::{Deserialize, Serialize};
3use serde_yaml;
4use std::collections::HashMap;
5use std::fs;
6use std::path::PathBuf;
7use std::sync::Arc;
8use tokio::sync::RwLock;
9use tokio::time::{interval, Duration};
10
11#[derive(Debug, Clone, Serialize, Deserialize)]
13pub struct KafkaFixture {
14 pub identifier: String,
15 pub name: String,
16 pub topic: String,
17 pub partition: Option<i32>, pub key_pattern: Option<String>, pub value_template: serde_json::Value,
20 pub headers: std::collections::HashMap<String, String>,
21 pub auto_produce: Option<AutoProduceConfig>,
22}
23
24#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct AutoProduceConfig {
27 pub enabled: bool,
28 pub rate_per_second: u64,
29 pub duration_seconds: Option<u64>,
30 pub total_count: Option<usize>,
31}
32
33pub struct AutoProducer {
35 fixtures: Arc<RwLock<HashMap<String, KafkaFixture>>>,
36 template_engine: mockforge_core::templating::TemplateEngine,
37 broker: Arc<super::broker::KafkaMockBroker>,
38}
39
40impl AutoProducer {
41 pub fn new(
43 broker: Arc<super::broker::KafkaMockBroker>,
44 template_engine: mockforge_core::templating::TemplateEngine,
45 ) -> Self {
46 Self {
47 fixtures: Arc::new(RwLock::new(HashMap::new())),
48 template_engine,
49 broker,
50 }
51 }
52
53 pub async fn add_fixture(&self, fixture: KafkaFixture) {
55 if fixture.auto_produce.as_ref().is_some_and(|ap| ap.enabled) {
56 let fixture_id = fixture.identifier.clone();
57 self.fixtures.write().await.insert(fixture_id, fixture);
58 }
59 }
60
61 pub async fn start(&self) -> anyhow::Result<()> {
63 let fixtures = self.fixtures.clone();
64 let _template_engine = self.template_engine.clone();
65 let _broker = self.broker.clone();
66
67 tokio::spawn(async move {
68 let mut interval = interval(Duration::from_secs(1));
69
70 loop {
71 interval.tick().await;
72
73 let fixtures_read = fixtures.read().await.clone();
74 for fixture in fixtures_read.values() {
75 if let Some(auto_produce) = &fixture.auto_produce {
76 if auto_produce.enabled {
77 for _ in 0..auto_produce.rate_per_second {
79 if let Ok(message) = fixture.generate_message(&HashMap::new()) {
80 let mut topics = _broker.topics.write().await;
82 if let Some(topic) = topics.get_mut(&fixture.topic) {
83 let partition = fixture.partition.unwrap_or_else(|| {
84 topic.assign_partition(message.key.as_deref())
85 });
86
87 if let Err(e) = topic.produce(partition, message).await {
88 tracing::error!(
89 "Failed to produce message to topic {}: {}",
90 fixture.topic,
91 e
92 );
93 } else {
94 tracing::debug!(
95 "Auto-produced message to topic {} partition {}",
96 fixture.topic,
97 partition
98 );
99 }
100 } else {
101 tracing::warn!(
102 "Topic {} not found for auto-production",
103 fixture.topic
104 );
105 }
106 }
107 }
108 }
109 }
110 }
111 }
112 });
113
114 Ok(())
115 }
116
117 pub async fn stop_fixture(&self, fixture_id: &str) {
119 if let Some(fixture) = self.fixtures.write().await.get_mut(fixture_id) {
120 if let Some(auto_produce) = &mut fixture.auto_produce {
121 auto_produce.enabled = false;
122 }
123 }
124 }
125}
126
127impl KafkaFixture {
128 pub fn load_from_dir(dir: &PathBuf) -> mockforge_core::Result<Vec<Self>> {
130 let mut fixtures = Vec::new();
131 for entry in fs::read_dir(dir)? {
132 let entry = entry?;
133 let path = entry.path();
134 if path.extension().and_then(|s| s.to_str()) == Some("yaml")
135 || path.extension().and_then(|s| s.to_str()) == Some("yml")
136 {
137 let file = fs::File::open(&path)?;
138 let file_fixtures: Vec<Self> = serde_yaml::from_reader(file)?;
139 fixtures.extend(file_fixtures);
140 }
141 }
142 Ok(fixtures)
143 }
144
145 pub fn generate_message(
147 &self,
148 context: &std::collections::HashMap<String, String>,
149 ) -> mockforge_core::Result<crate::partitions::KafkaMessage> {
150 let key = self.key_pattern.as_ref().map(|pattern| self.render_template(pattern, context));
152
153 let value_str = serde_json::to_string(&self.value_template)?;
155 let value_rendered = self.render_template(&value_str, context);
156 let value = value_rendered.into_bytes();
157
158 let headers = self
160 .headers
161 .iter()
162 .map(|(k, v)| (k.clone(), self.render_template(v, context).into_bytes()))
163 .collect();
164
165 Ok(crate::partitions::KafkaMessage {
166 offset: 0,
167 timestamp: Utc::now().timestamp_millis(),
168 key: key.map(|k| k.into_bytes()),
169 value,
170 headers,
171 })
172 }
173
174 fn render_template(
175 &self,
176 template: &str,
177 context: &std::collections::HashMap<String, String>,
178 ) -> String {
179 let mut result = template.to_string();
180 for (key, value) in context {
181 result = result.replace(&format!("{{{{{}}}}}", key), value);
182 }
183 result
184 }
185}
186
187#[cfg(test)]
188mod tests {
189 use super::*;
190 use std::sync::Arc;
191 use tempfile::TempDir;
192
193 #[test]
196 fn test_kafka_fixture_creation() {
197 let fixture = KafkaFixture {
198 identifier: "test-fixture".to_string(),
199 name: "Test Fixture".to_string(),
200 topic: "test-topic".to_string(),
201 partition: Some(0),
202 key_pattern: Some("key-{{id}}".to_string()),
203 value_template: serde_json::json!({"message": "test"}),
204 headers: HashMap::new(),
205 auto_produce: None,
206 };
207
208 assert_eq!(fixture.identifier, "test-fixture");
209 assert_eq!(fixture.topic, "test-topic");
210 assert_eq!(fixture.partition, Some(0));
211 assert!(fixture.auto_produce.is_none());
212 }
213
214 #[test]
215 fn test_kafka_fixture_with_auto_produce() {
216 let auto_produce = AutoProduceConfig {
217 enabled: true,
218 rate_per_second: 10,
219 duration_seconds: Some(60),
220 total_count: Some(100),
221 };
222
223 let fixture = KafkaFixture {
224 identifier: "auto-fixture".to_string(),
225 name: "Auto Fixture".to_string(),
226 topic: "auto-topic".to_string(),
227 partition: None,
228 key_pattern: None,
229 value_template: serde_json::json!({"auto": true}),
230 headers: HashMap::new(),
231 auto_produce: Some(auto_produce),
232 };
233
234 assert!(fixture.auto_produce.is_some());
235 let ap = fixture.auto_produce.as_ref().unwrap();
236 assert!(ap.enabled);
237 assert_eq!(ap.rate_per_second, 10);
238 assert_eq!(ap.duration_seconds, Some(60));
239 assert_eq!(ap.total_count, Some(100));
240 }
241
242 #[test]
243 fn test_kafka_fixture_clone() {
244 let fixture = KafkaFixture {
245 identifier: "clone-test".to_string(),
246 name: "Clone Test".to_string(),
247 topic: "clone-topic".to_string(),
248 partition: Some(1),
249 key_pattern: Some("key".to_string()),
250 value_template: serde_json::json!({"data": "value"}),
251 headers: HashMap::new(),
252 auto_produce: None,
253 };
254
255 let cloned = fixture.clone();
256 assert_eq!(fixture.identifier, cloned.identifier);
257 assert_eq!(fixture.topic, cloned.topic);
258 assert_eq!(fixture.partition, cloned.partition);
259 }
260
261 #[test]
262 fn test_kafka_fixture_serialize_deserialize() {
263 let fixture = KafkaFixture {
264 identifier: "serde-test".to_string(),
265 name: "Serde Test".to_string(),
266 topic: "serde-topic".to_string(),
267 partition: Some(0),
268 key_pattern: Some("key-pattern".to_string()),
269 value_template: serde_json::json!({"test": "data"}),
270 headers: HashMap::new(),
271 auto_produce: None,
272 };
273
274 let yaml = serde_yaml::to_string(&fixture).unwrap();
275 let deserialized: KafkaFixture = serde_yaml::from_str(&yaml).unwrap();
276
277 assert_eq!(fixture.identifier, deserialized.identifier);
278 assert_eq!(fixture.topic, deserialized.topic);
279 }
280
281 #[test]
284 fn test_auto_produce_config_enabled() {
285 let config = AutoProduceConfig {
286 enabled: true,
287 rate_per_second: 5,
288 duration_seconds: None,
289 total_count: None,
290 };
291
292 assert!(config.enabled);
293 assert_eq!(config.rate_per_second, 5);
294 assert!(config.duration_seconds.is_none());
295 assert!(config.total_count.is_none());
296 }
297
298 #[test]
299 fn test_auto_produce_config_disabled() {
300 let config = AutoProduceConfig {
301 enabled: false,
302 rate_per_second: 0,
303 duration_seconds: None,
304 total_count: None,
305 };
306
307 assert!(!config.enabled);
308 }
309
310 #[test]
311 fn test_auto_produce_config_with_limits() {
312 let config = AutoProduceConfig {
313 enabled: true,
314 rate_per_second: 100,
315 duration_seconds: Some(300),
316 total_count: Some(10000),
317 };
318
319 assert_eq!(config.rate_per_second, 100);
320 assert_eq!(config.duration_seconds, Some(300));
321 assert_eq!(config.total_count, Some(10000));
322 }
323
324 #[test]
325 fn test_auto_produce_config_clone() {
326 let config = AutoProduceConfig {
327 enabled: true,
328 rate_per_second: 10,
329 duration_seconds: Some(60),
330 total_count: Some(100),
331 };
332
333 let cloned = config.clone();
334 assert_eq!(config.enabled, cloned.enabled);
335 assert_eq!(config.rate_per_second, cloned.rate_per_second);
336 assert_eq!(config.duration_seconds, cloned.duration_seconds);
337 assert_eq!(config.total_count, cloned.total_count);
338 }
339
340 #[test]
343 fn test_generate_message_basic() {
344 let fixture = KafkaFixture {
345 identifier: "msg-test".to_string(),
346 name: "Message Test".to_string(),
347 topic: "test-topic".to_string(),
348 partition: Some(0),
349 key_pattern: None,
350 value_template: serde_json::json!({"message": "hello"}),
351 headers: HashMap::new(),
352 auto_produce: None,
353 };
354
355 let context = HashMap::new();
356 let message = fixture.generate_message(&context).unwrap();
357
358 assert!(message.key.is_none());
359 assert!(!message.value.is_empty());
360 assert_eq!(message.offset, 0);
361 assert!(message.timestamp > 0);
362 }
363
364 #[test]
365 fn test_generate_message_with_key() {
366 let fixture = KafkaFixture {
367 identifier: "key-test".to_string(),
368 name: "Key Test".to_string(),
369 topic: "test-topic".to_string(),
370 partition: Some(0),
371 key_pattern: Some("order-12345".to_string()),
372 value_template: serde_json::json!({"order": "data"}),
373 headers: HashMap::new(),
374 auto_produce: None,
375 };
376
377 let context = HashMap::new();
378 let message = fixture.generate_message(&context).unwrap();
379
380 assert!(message.key.is_some());
381 assert_eq!(message.key.unwrap(), b"order-12345".to_vec());
382 }
383
384 #[test]
385 fn test_generate_message_with_template_substitution() {
386 let fixture = KafkaFixture {
387 identifier: "template-test".to_string(),
388 name: "Template Test".to_string(),
389 topic: "test-topic".to_string(),
390 partition: Some(0),
391 key_pattern: Some("user-{{userId}}".to_string()),
392 value_template: serde_json::json!({"userId": "{{userId}}", "action": "login"}),
393 headers: HashMap::new(),
394 auto_produce: None,
395 };
396
397 let mut context = HashMap::new();
398 context.insert("userId".to_string(), "123".to_string());
399
400 let message = fixture.generate_message(&context).unwrap();
401
402 assert!(message.key.is_some());
403 assert_eq!(message.key.unwrap(), b"user-123".to_vec());
404
405 let value_str = String::from_utf8(message.value).unwrap();
406 assert!(value_str.contains("123"));
407 }
408
409 #[test]
410 fn test_generate_message_with_headers() {
411 let mut headers = HashMap::new();
412 headers.insert("correlation-id".to_string(), "abc-123".to_string());
413 headers.insert("source".to_string(), "test-service".to_string());
414
415 let fixture = KafkaFixture {
416 identifier: "header-test".to_string(),
417 name: "Header Test".to_string(),
418 topic: "test-topic".to_string(),
419 partition: Some(0),
420 key_pattern: None,
421 value_template: serde_json::json!({"data": "test"}),
422 headers,
423 auto_produce: None,
424 };
425
426 let context = HashMap::new();
427 let message = fixture.generate_message(&context).unwrap();
428
429 assert_eq!(message.headers.len(), 2);
430 assert!(message.headers.iter().any(|(k, _)| k == "correlation-id"));
431 assert!(message.headers.iter().any(|(k, _)| k == "source"));
432 }
433
434 #[test]
435 fn test_generate_message_with_template_headers() {
436 let mut headers = HashMap::new();
437 headers.insert("trace-id".to_string(), "trace-{{traceId}}".to_string());
438
439 let fixture = KafkaFixture {
440 identifier: "header-template-test".to_string(),
441 name: "Header Template Test".to_string(),
442 topic: "test-topic".to_string(),
443 partition: Some(0),
444 key_pattern: None,
445 value_template: serde_json::json!({"data": "test"}),
446 headers,
447 auto_produce: None,
448 };
449
450 let mut context = HashMap::new();
451 context.insert("traceId".to_string(), "xyz789".to_string());
452
453 let message = fixture.generate_message(&context).unwrap();
454
455 let trace_header = message.headers.iter().find(|(k, _)| k == "trace-id");
456 assert!(trace_header.is_some());
457 assert_eq!(trace_header.unwrap().1, b"trace-xyz789".to_vec());
458 }
459
460 #[test]
461 fn test_generate_message_empty_context() {
462 let fixture = KafkaFixture {
463 identifier: "empty-context".to_string(),
464 name: "Empty Context".to_string(),
465 topic: "test-topic".to_string(),
466 partition: Some(0),
467 key_pattern: Some("static-key".to_string()),
468 value_template: serde_json::json!({"static": "value"}),
469 headers: HashMap::new(),
470 auto_produce: None,
471 };
472
473 let context = HashMap::new();
474 let message = fixture.generate_message(&context).unwrap();
475
476 assert!(message.key.is_some());
477 assert_eq!(message.key.unwrap(), b"static-key".to_vec());
478 }
479
480 #[test]
483 fn test_render_template_no_substitution() {
484 let fixture = KafkaFixture {
485 identifier: "render-test".to_string(),
486 name: "Render Test".to_string(),
487 topic: "test-topic".to_string(),
488 partition: Some(0),
489 key_pattern: None,
490 value_template: serde_json::json!({}),
491 headers: HashMap::new(),
492 auto_produce: None,
493 };
494
495 let context = HashMap::new();
496 let result = fixture.render_template("static text", &context);
497 assert_eq!(result, "static text");
498 }
499
500 #[test]
501 fn test_render_template_single_substitution() {
502 let fixture = KafkaFixture {
503 identifier: "render-test".to_string(),
504 name: "Render Test".to_string(),
505 topic: "test-topic".to_string(),
506 partition: Some(0),
507 key_pattern: None,
508 value_template: serde_json::json!({}),
509 headers: HashMap::new(),
510 auto_produce: None,
511 };
512
513 let mut context = HashMap::new();
514 context.insert("name".to_string(), "Alice".to_string());
515
516 let result = fixture.render_template("Hello {{name}}", &context);
517 assert_eq!(result, "Hello Alice");
518 }
519
520 #[test]
521 fn test_render_template_multiple_substitutions() {
522 let fixture = KafkaFixture {
523 identifier: "render-test".to_string(),
524 name: "Render Test".to_string(),
525 topic: "test-topic".to_string(),
526 partition: Some(0),
527 key_pattern: None,
528 value_template: serde_json::json!({}),
529 headers: HashMap::new(),
530 auto_produce: None,
531 };
532
533 let mut context = HashMap::new();
534 context.insert("first".to_string(), "John".to_string());
535 context.insert("last".to_string(), "Doe".to_string());
536
537 let result = fixture.render_template("{{first}} {{last}}", &context);
538 assert_eq!(result, "John Doe");
539 }
540
541 #[test]
542 fn test_render_template_missing_variable() {
543 let fixture = KafkaFixture {
544 identifier: "render-test".to_string(),
545 name: "Render Test".to_string(),
546 topic: "test-topic".to_string(),
547 partition: Some(0),
548 key_pattern: None,
549 value_template: serde_json::json!({}),
550 headers: HashMap::new(),
551 auto_produce: None,
552 };
553
554 let context = HashMap::new();
555 let result = fixture.render_template("Hello {{name}}", &context);
556 assert_eq!(result, "Hello {{name}}");
558 }
559
560 #[test]
563 fn test_load_from_dir_empty_directory() {
564 let temp_dir = TempDir::new().unwrap();
565 let result = KafkaFixture::load_from_dir(&temp_dir.path().to_path_buf()).unwrap();
566 assert!(result.is_empty());
567 }
568
569 #[test]
570 fn test_load_from_dir_with_yaml_files() {
571 let temp_dir = TempDir::new().unwrap();
572 let fixture_path = temp_dir.path().join("fixtures.yaml");
573
574 let fixtures = vec![KafkaFixture {
575 identifier: "test-fixture".to_string(),
576 name: "Test Fixture".to_string(),
577 topic: "test-topic".to_string(),
578 partition: Some(0),
579 key_pattern: None,
580 value_template: serde_json::json!({"test": "data"}),
581 headers: HashMap::new(),
582 auto_produce: None,
583 }];
584
585 let yaml_content = serde_yaml::to_string(&fixtures).unwrap();
586 fs::write(&fixture_path, yaml_content).unwrap();
587
588 let loaded = KafkaFixture::load_from_dir(&temp_dir.path().to_path_buf()).unwrap();
589 assert_eq!(loaded.len(), 1);
590 assert_eq!(loaded[0].identifier, "test-fixture");
591 }
592
593 #[test]
594 fn test_load_from_dir_with_yml_extension() {
595 let temp_dir = TempDir::new().unwrap();
596 let fixture_path = temp_dir.path().join("fixtures.yml");
597
598 let fixtures = vec![KafkaFixture {
599 identifier: "yml-test".to_string(),
600 name: "YML Test".to_string(),
601 topic: "yml-topic".to_string(),
602 partition: None,
603 key_pattern: None,
604 value_template: serde_json::json!({"yml": true}),
605 headers: HashMap::new(),
606 auto_produce: None,
607 }];
608
609 let yaml_content = serde_yaml::to_string(&fixtures).unwrap();
610 fs::write(&fixture_path, yaml_content).unwrap();
611
612 let loaded = KafkaFixture::load_from_dir(&temp_dir.path().to_path_buf()).unwrap();
613 assert_eq!(loaded.len(), 1);
614 assert_eq!(loaded[0].identifier, "yml-test");
615 }
616
617 #[test]
618 fn test_load_from_dir_ignores_non_yaml_files() {
619 let temp_dir = TempDir::new().unwrap();
620 let txt_path = temp_dir.path().join("readme.txt");
621 fs::write(&txt_path, "This is not a YAML file").unwrap();
622
623 let loaded = KafkaFixture::load_from_dir(&temp_dir.path().to_path_buf()).unwrap();
624 assert!(loaded.is_empty());
625 }
626
627 #[test]
628 fn test_load_from_dir_multiple_files() {
629 let temp_dir = TempDir::new().unwrap();
630
631 let fixtures1 = vec![KafkaFixture {
632 identifier: "fixture-1".to_string(),
633 name: "Fixture 1".to_string(),
634 topic: "topic-1".to_string(),
635 partition: None,
636 key_pattern: None,
637 value_template: serde_json::json!({"id": 1}),
638 headers: HashMap::new(),
639 auto_produce: None,
640 }];
641
642 let fixtures2 = vec![KafkaFixture {
643 identifier: "fixture-2".to_string(),
644 name: "Fixture 2".to_string(),
645 topic: "topic-2".to_string(),
646 partition: None,
647 key_pattern: None,
648 value_template: serde_json::json!({"id": 2}),
649 headers: HashMap::new(),
650 auto_produce: None,
651 }];
652
653 fs::write(
654 temp_dir.path().join("fixtures1.yaml"),
655 serde_yaml::to_string(&fixtures1).unwrap(),
656 )
657 .unwrap();
658
659 fs::write(
660 temp_dir.path().join("fixtures2.yaml"),
661 serde_yaml::to_string(&fixtures2).unwrap(),
662 )
663 .unwrap();
664
665 let loaded = KafkaFixture::load_from_dir(&temp_dir.path().to_path_buf()).unwrap();
666 assert_eq!(loaded.len(), 2);
667 }
668
669 #[test]
670 fn test_load_from_dir_nonexistent() {
671 let result = KafkaFixture::load_from_dir(&PathBuf::from("/nonexistent/path"));
672 assert!(result.is_err());
673 }
674
675 #[tokio::test]
678 async fn test_auto_producer_creation() {
679 let config = mockforge_core::config::KafkaConfig::default();
680 let broker = Arc::new(crate::broker::KafkaMockBroker::new(config).await.unwrap());
681 let template_engine = mockforge_core::templating::TemplateEngine::new();
682
683 let producer = AutoProducer::new(broker, template_engine);
684 let fixtures = producer.fixtures.read().await;
685 assert!(fixtures.is_empty());
686 }
687
688 #[tokio::test]
689 async fn test_auto_producer_add_fixture_enabled() {
690 let config = mockforge_core::config::KafkaConfig::default();
691 let broker = Arc::new(crate::broker::KafkaMockBroker::new(config).await.unwrap());
692 let template_engine = mockforge_core::templating::TemplateEngine::new();
693
694 let producer = AutoProducer::new(broker, template_engine);
695
696 let fixture = KafkaFixture {
697 identifier: "auto-enabled".to_string(),
698 name: "Auto Enabled".to_string(),
699 topic: "auto-topic".to_string(),
700 partition: None,
701 key_pattern: None,
702 value_template: serde_json::json!({"auto": true}),
703 headers: HashMap::new(),
704 auto_produce: Some(AutoProduceConfig {
705 enabled: true,
706 rate_per_second: 1,
707 duration_seconds: None,
708 total_count: None,
709 }),
710 };
711
712 producer.add_fixture(fixture).await;
713
714 let fixtures = producer.fixtures.read().await;
715 assert_eq!(fixtures.len(), 1);
716 assert!(fixtures.contains_key("auto-enabled"));
717 }
718
719 #[tokio::test]
720 async fn test_auto_producer_add_fixture_disabled() {
721 let config = mockforge_core::config::KafkaConfig::default();
722 let broker = Arc::new(crate::broker::KafkaMockBroker::new(config).await.unwrap());
723 let template_engine = mockforge_core::templating::TemplateEngine::new();
724
725 let producer = AutoProducer::new(broker, template_engine);
726
727 let fixture = KafkaFixture {
728 identifier: "auto-disabled".to_string(),
729 name: "Auto Disabled".to_string(),
730 topic: "disabled-topic".to_string(),
731 partition: None,
732 key_pattern: None,
733 value_template: serde_json::json!({"auto": false}),
734 headers: HashMap::new(),
735 auto_produce: Some(AutoProduceConfig {
736 enabled: false,
737 rate_per_second: 1,
738 duration_seconds: None,
739 total_count: None,
740 }),
741 };
742
743 producer.add_fixture(fixture).await;
744
745 let fixtures = producer.fixtures.read().await;
746 assert!(fixtures.is_empty());
747 }
748
749 #[tokio::test]
750 async fn test_auto_producer_add_fixture_no_auto_produce() {
751 let config = mockforge_core::config::KafkaConfig::default();
752 let broker = Arc::new(crate::broker::KafkaMockBroker::new(config).await.unwrap());
753 let template_engine = mockforge_core::templating::TemplateEngine::new();
754
755 let producer = AutoProducer::new(broker, template_engine);
756
757 let fixture = KafkaFixture {
758 identifier: "no-auto".to_string(),
759 name: "No Auto".to_string(),
760 topic: "manual-topic".to_string(),
761 partition: None,
762 key_pattern: None,
763 value_template: serde_json::json!({"manual": true}),
764 headers: HashMap::new(),
765 auto_produce: None,
766 };
767
768 producer.add_fixture(fixture).await;
769
770 let fixtures = producer.fixtures.read().await;
771 assert!(fixtures.is_empty());
772 }
773
774 #[tokio::test]
775 async fn test_auto_producer_stop_fixture() {
776 let config = mockforge_core::config::KafkaConfig::default();
777 let broker = Arc::new(crate::broker::KafkaMockBroker::new(config).await.unwrap());
778 let template_engine = mockforge_core::templating::TemplateEngine::new();
779
780 let producer = AutoProducer::new(broker, template_engine);
781
782 let fixture = KafkaFixture {
783 identifier: "stop-test".to_string(),
784 name: "Stop Test".to_string(),
785 topic: "stop-topic".to_string(),
786 partition: None,
787 key_pattern: None,
788 value_template: serde_json::json!({"test": true}),
789 headers: HashMap::new(),
790 auto_produce: Some(AutoProduceConfig {
791 enabled: true,
792 rate_per_second: 1,
793 duration_seconds: None,
794 total_count: None,
795 }),
796 };
797
798 producer.add_fixture(fixture).await;
799 producer.stop_fixture("stop-test").await;
800
801 let fixtures = producer.fixtures.read().await;
802 let fixture = fixtures.get("stop-test");
803 assert!(fixture.is_some());
804 assert_eq!(fixture.unwrap().auto_produce.as_ref().unwrap().enabled, false);
805 }
806
807 #[tokio::test]
808 async fn test_auto_producer_stop_nonexistent_fixture() {
809 let config = mockforge_core::config::KafkaConfig::default();
810 let broker = Arc::new(crate::broker::KafkaMockBroker::new(config).await.unwrap());
811 let template_engine = mockforge_core::templating::TemplateEngine::new();
812
813 let producer = AutoProducer::new(broker, template_engine);
814 producer.stop_fixture("nonexistent").await;
815
816 let fixtures = producer.fixtures.read().await;
818 assert!(fixtures.is_empty());
819 }
820}