mockforge_kafka/
spec_registry.rs

1use async_trait::async_trait;
2use chrono;
3use std::collections::HashMap;
4use std::sync::Arc;
5
6use mockforge_core::protocol_abstraction::{
7    ProtocolRequest, ProtocolResponse, ResponseStatus, SpecOperation, SpecRegistry,
8    ValidationError, ValidationResult,
9};
10use mockforge_core::{Protocol, Result};
11
12/// Kafka-specific spec registry implementation
13#[derive(Debug)]
14pub struct KafkaSpecRegistry {
15    fixtures: Vec<Arc<crate::fixtures::KafkaFixture>>,
16    template_engine: mockforge_core::templating::TemplateEngine,
17    topics: std::sync::Arc<
18        tokio::sync::RwLock<std::collections::HashMap<String, crate::topics::Topic>>,
19    >,
20}
21
22impl KafkaSpecRegistry {
23    /// Create a new Kafka spec registry
24    pub async fn new(
25        config: mockforge_core::config::KafkaConfig,
26        topics: std::sync::Arc<
27            tokio::sync::RwLock<std::collections::HashMap<String, crate::topics::Topic>>,
28        >,
29    ) -> Result<Self> {
30        let fixtures = if let Some(fixtures_dir) = &config.fixtures_dir {
31            crate::fixtures::KafkaFixture::load_from_dir(fixtures_dir)?
32                .into_iter()
33                .map(Arc::new)
34                .collect()
35        } else {
36            vec![]
37        };
38
39        let template_engine = mockforge_core::templating::TemplateEngine::new();
40
41        Ok(Self {
42            fixtures,
43            template_engine,
44            topics,
45        })
46    }
47
48    /// Find fixture by topic
49    pub fn find_fixture_by_topic(&self, topic: &str) -> Option<Arc<crate::fixtures::KafkaFixture>> {
50        self.fixtures.iter().find(|f| f.topic == topic).cloned()
51    }
52
53    /// Produce a message to a topic
54    pub async fn produce(
55        &self,
56        topic: &str,
57        key: Option<&str>,
58        value: &serde_json::Value,
59    ) -> Result<i64> {
60        let mut topics = self.topics.write().await;
61
62        // Get or create the topic
63        let topic_entry = topics.entry(topic.to_string()).or_insert_with(|| {
64            crate::topics::Topic::new(topic.to_string(), crate::topics::TopicConfig::default())
65        });
66
67        // Assign partition based on key
68        let partition_id = topic_entry.assign_partition(key.map(|k| k.as_bytes()));
69
70        // Create the message
71        let message = crate::partitions::KafkaMessage {
72            offset: 0, // Will be set by partition.append
73            timestamp: chrono::Utc::now().timestamp_millis(),
74            key: key.map(|k| k.as_bytes().to_vec()),
75            value: serde_json::to_vec(value).map_err(mockforge_core::Error::Json)?,
76            headers: vec![],
77        };
78
79        // Append to partition
80        let offset = topic_entry
81            .get_partition_mut(partition_id)
82            .ok_or_else(|| {
83                mockforge_core::Error::generic(format!("Partition {} not found", partition_id))
84            })?
85            .append(message);
86
87        Ok(offset)
88    }
89
90    /// Fetch messages from a topic partition
91    pub async fn fetch(
92        &self,
93        topic: &str,
94        partition: i32,
95        offset: i64,
96    ) -> Result<Vec<crate::partitions::KafkaMessage>> {
97        let topics = self.topics.read().await;
98
99        if let Some(topic_entry) = topics.get(topic) {
100            if let Some(partition_entry) = topic_entry.get_partition(partition) {
101                // Fetch messages starting from offset
102                let messages = partition_entry.fetch(offset, 1000); // Max 1000 messages
103                Ok(messages.into_iter().cloned().collect())
104            } else {
105                Err(mockforge_core::Error::generic(format!(
106                    "Partition {} not found in topic {}",
107                    partition, topic
108                )))
109            }
110        } else {
111            Err(mockforge_core::Error::generic(format!("Topic {} not found", topic)))
112        }
113    }
114}
115
116#[async_trait]
117impl SpecRegistry for KafkaSpecRegistry {
118    fn protocol(&self) -> Protocol {
119        Protocol::Kafka
120    }
121
122    fn operations(&self) -> Vec<SpecOperation> {
123        // Return operations based on fixtures
124        self.fixtures
125            .iter()
126            .map(|fixture| SpecOperation {
127                name: fixture.identifier.clone(),
128                path: fixture.topic.clone(),
129                operation_type: "PRODUCE".to_string(),
130                input_schema: Some("KafkaMessage".to_string()),
131                output_schema: Some("ProduceResponse".to_string()),
132                metadata: std::collections::HashMap::new(),
133            })
134            .collect()
135    }
136
137    fn find_operation(&self, operation: &str, path: &str) -> Option<SpecOperation> {
138        self.fixtures
139            .iter()
140            .find(|fixture| fixture.topic == path && operation == "PRODUCE")
141            .map(|fixture| SpecOperation {
142                name: fixture.identifier.clone(),
143                path: fixture.topic.clone(),
144                operation_type: "PRODUCE".to_string(),
145                input_schema: Some("KafkaMessage".to_string()),
146                output_schema: Some("ProduceResponse".to_string()),
147                metadata: std::collections::HashMap::new(),
148            })
149    }
150
151    fn validate_request(&self, request: &ProtocolRequest) -> Result<ValidationResult> {
152        // Basic validation - check if topic exists in fixtures
153        let valid = if let Some(topic) = &request.topic {
154            self.fixtures.iter().any(|f| f.topic == *topic)
155        } else {
156            false
157        };
158
159        Ok(ValidationResult {
160            valid,
161            errors: if valid {
162                vec![]
163            } else {
164                vec![ValidationError {
165                    message: "Topic not found in fixtures".to_string(),
166                    path: Some("topic".to_string()),
167                    code: Some("TOPIC_NOT_FOUND".to_string()),
168                }]
169            },
170            warnings: vec![],
171        })
172    }
173
174    fn generate_mock_response(&self, request: &ProtocolRequest) -> Result<ProtocolResponse> {
175        let operation = &request.operation;
176        let topic = request
177            .topic
178            .as_ref()
179            .ok_or_else(|| mockforge_core::Error::generic("Missing topic"))?;
180
181        match operation.as_str() {
182            "PRODUCE" => {
183                let fixture = self.find_fixture_by_topic(topic).ok_or_else(|| {
184                    mockforge_core::Error::generic(format!("No fixture found for topic {}", topic))
185                })?;
186
187                // Generate message using template
188                let templating_context = mockforge_core::templating::TemplatingContext::with_env(
189                    request.metadata.clone(),
190                );
191                let value = self
192                    .template_engine
193                    .expand_tokens_with_context(&fixture.value_template, &templating_context);
194                let _key = fixture.key_pattern.as_ref().map(|key_pattern| {
195                    self.template_engine.expand_str_with_context(key_pattern, &templating_context)
196                });
197
198                // For now, return a mock offset since we don't have actual broker integration
199                let offset = 0i64;
200
201                Ok(ProtocolResponse {
202                    status: ResponseStatus::KafkaStatus(0), // No error
203                    metadata: HashMap::from([
204                        ("topic".to_string(), topic.clone()),
205                        ("offset".to_string(), offset.to_string()),
206                    ]),
207                    body: serde_json::to_string(&value)
208                        .map_err(mockforge_core::Error::Json)?
209                        .into_bytes(),
210                    content_type: "application/json".to_string(),
211                })
212            }
213            "FETCH" => {
214                let partition = request
215                    .partition
216                    .ok_or_else(|| mockforge_core::Error::generic("Missing partition"))?;
217                let _offset =
218                    request.metadata.get("offset").and_then(|s| s.parse().ok()).unwrap_or(0);
219
220                // For now, return empty messages since we don't have actual broker integration
221                let messages: Vec<crate::partitions::KafkaMessage> = vec![];
222
223                Ok(ProtocolResponse {
224                    status: ResponseStatus::KafkaStatus(0),
225                    metadata: HashMap::from([
226                        ("topic".to_string(), topic.clone()),
227                        ("partition".to_string(), partition.to_string()),
228                        ("message_count".to_string(), messages.len().to_string()),
229                    ]),
230                    body: serde_json::to_vec(&messages).map_err(mockforge_core::Error::Json)?,
231                    content_type: "application/json".to_string(),
232                })
233            }
234            _ => {
235                Err(mockforge_core::Error::generic(format!("Unsupported operation: {}", operation)))
236            }
237        }
238    }
239}
240
241#[cfg(test)]
242mod tests {
243    use super::*;
244    use mockforge_core::protocol_abstraction::ProtocolRequest;
245    use std::collections::HashMap;
246    use tempfile::TempDir;
247
248    async fn create_test_registry() -> KafkaSpecRegistry {
249        let topics = Arc::new(tokio::sync::RwLock::new(HashMap::new()));
250        let config = mockforge_core::config::KafkaConfig::default();
251        KafkaSpecRegistry::new(config, topics).await.unwrap()
252    }
253
254    async fn create_registry_with_fixtures() -> (KafkaSpecRegistry, TempDir) {
255        let temp_dir = TempDir::new().unwrap();
256        let fixture_path = temp_dir.path().join("fixtures.yaml");
257
258        let fixtures = vec![crate::fixtures::KafkaFixture {
259            identifier: "test-produce".to_string(),
260            name: "Test Produce".to_string(),
261            topic: "test-topic".to_string(),
262            partition: Some(0),
263            key_pattern: Some("key-{{id}}".to_string()),
264            value_template: serde_json::json!({"message": "test-{{id}}"}),
265            headers: HashMap::new(),
266            auto_produce: None,
267        }];
268
269        let yaml_content = serde_yaml::to_string(&fixtures).unwrap();
270        std::fs::write(&fixture_path, yaml_content).unwrap();
271
272        let topics = Arc::new(tokio::sync::RwLock::new(HashMap::new()));
273        let config = mockforge_core::config::KafkaConfig {
274            fixtures_dir: Some(temp_dir.path().to_path_buf()),
275            ..Default::default()
276        };
277        let registry = KafkaSpecRegistry::new(config, topics).await.unwrap();
278
279        (registry, temp_dir)
280    }
281
282    // ==================== KafkaSpecRegistry::new Tests ====================
283
284    #[tokio::test]
285    async fn test_new_registry_without_fixtures() {
286        let topics = Arc::new(tokio::sync::RwLock::new(HashMap::new()));
287        let config = mockforge_core::config::KafkaConfig::default();
288
289        let registry = KafkaSpecRegistry::new(config, topics).await.unwrap();
290        assert_eq!(registry.fixtures.len(), 0);
291    }
292
293    #[tokio::test]
294    async fn test_new_registry_with_fixtures() {
295        let (registry, _temp_dir) = create_registry_with_fixtures().await;
296        assert_eq!(registry.fixtures.len(), 1);
297        assert_eq!(registry.fixtures[0].topic, "test-topic");
298    }
299
300    // ==================== KafkaSpecRegistry::find_fixture_by_topic Tests ====================
301
302    #[tokio::test]
303    async fn test_find_fixture_by_topic_exists() {
304        let (registry, _temp_dir) = create_registry_with_fixtures().await;
305
306        let fixture = registry.find_fixture_by_topic("test-topic");
307        assert!(fixture.is_some());
308        assert_eq!(fixture.unwrap().identifier, "test-produce");
309    }
310
311    #[tokio::test]
312    async fn test_find_fixture_by_topic_not_found() {
313        let (registry, _temp_dir) = create_registry_with_fixtures().await;
314
315        let fixture = registry.find_fixture_by_topic("nonexistent-topic");
316        assert!(fixture.is_none());
317    }
318
319    #[tokio::test]
320    async fn test_find_fixture_by_topic_empty_registry() {
321        let registry = create_test_registry().await;
322
323        let fixture = registry.find_fixture_by_topic("any-topic");
324        assert!(fixture.is_none());
325    }
326
327    // ==================== KafkaSpecRegistry::produce Tests ====================
328
329    #[tokio::test]
330    async fn test_produce_message_without_key() {
331        let registry = create_test_registry().await;
332        let value = serde_json::json!({"message": "hello"});
333
334        let result = registry.produce("test-topic", None, &value).await;
335        assert!(result.is_ok());
336
337        let offset = result.unwrap();
338        assert_eq!(offset, 0);
339    }
340
341    #[tokio::test]
342    async fn test_produce_message_with_key() {
343        let registry = create_test_registry().await;
344        let value = serde_json::json!({"message": "hello"});
345
346        let result = registry.produce("test-topic", Some("my-key"), &value).await;
347        assert!(result.is_ok());
348
349        let offset = result.unwrap();
350        assert_eq!(offset, 0);
351    }
352
353    #[tokio::test]
354    async fn test_produce_multiple_messages() {
355        let registry = create_test_registry().await;
356
357        // Use the same key to ensure all messages go to the same partition
358        // Without a key, round-robin distributes to different partitions (each starting at offset 0)
359        let offset1 = registry
360            .produce("test-topic", Some("same-key"), &serde_json::json!({"id": 1}))
361            .await
362            .unwrap();
363
364        let offset2 = registry
365            .produce("test-topic", Some("same-key"), &serde_json::json!({"id": 2}))
366            .await
367            .unwrap();
368
369        let offset3 = registry
370            .produce("test-topic", Some("same-key"), &serde_json::json!({"id": 3}))
371            .await
372            .unwrap();
373
374        assert_eq!(offset1, 0);
375        assert_eq!(offset2, 1);
376        assert_eq!(offset3, 2);
377    }
378
379    #[tokio::test]
380    async fn test_produce_creates_topic_if_not_exists() {
381        let registry = create_test_registry().await;
382        let value = serde_json::json!({"test": "data"});
383
384        let result = registry.produce("new-topic", None, &value).await;
385        assert!(result.is_ok());
386
387        let topics = registry.topics.read().await;
388        assert!(topics.contains_key("new-topic"));
389    }
390
391    #[tokio::test]
392    async fn test_produce_to_multiple_topics() {
393        let registry = create_test_registry().await;
394
395        registry.produce("topic-1", None, &serde_json::json!({"id": 1})).await.unwrap();
396
397        registry.produce("topic-2", None, &serde_json::json!({"id": 2})).await.unwrap();
398
399        let topics = registry.topics.read().await;
400        assert_eq!(topics.len(), 2);
401        assert!(topics.contains_key("topic-1"));
402        assert!(topics.contains_key("topic-2"));
403    }
404
405    // ==================== KafkaSpecRegistry::fetch Tests ====================
406
407    #[tokio::test]
408    async fn test_fetch_from_empty_topic() {
409        let registry = create_test_registry().await;
410
411        let result = registry.fetch("nonexistent-topic", 0, 0).await;
412        assert!(result.is_err());
413    }
414
415    #[tokio::test]
416    async fn test_fetch_from_nonexistent_partition() {
417        let registry = create_test_registry().await;
418
419        // Produce a message to create the topic
420        registry
421            .produce("test-topic", None, &serde_json::json!({"test": "data"}))
422            .await
423            .unwrap();
424
425        // Try to fetch from a partition that doesn't exist
426        let result = registry.fetch("test-topic", 99, 0).await;
427        assert!(result.is_err());
428    }
429
430    #[tokio::test]
431    async fn test_fetch_messages_after_produce() {
432        let registry = create_test_registry().await;
433
434        // Produce messages
435        registry
436            .produce("test-topic", None, &serde_json::json!({"id": 1}))
437            .await
438            .unwrap();
439
440        registry
441            .produce("test-topic", None, &serde_json::json!({"id": 2}))
442            .await
443            .unwrap();
444
445        // Fetch messages
446        let messages = registry.fetch("test-topic", 0, 0).await.unwrap();
447        assert!(messages.len() >= 1);
448    }
449
450    #[tokio::test]
451    async fn test_fetch_from_specific_offset() {
452        let registry = create_test_registry().await;
453
454        // Produce multiple messages
455        for i in 0..5 {
456            registry
457                .produce("test-topic", None, &serde_json::json!({"id": i}))
458                .await
459                .unwrap();
460        }
461
462        // Fetch from offset 2
463        let messages = registry.fetch("test-topic", 0, 2).await.unwrap();
464        assert!(messages.len() <= 3); // Messages 2, 3, 4
465    }
466
467    // ==================== SpecRegistry Trait Implementation Tests ====================
468
469    #[tokio::test]
470    async fn test_protocol_returns_kafka() {
471        let registry = create_test_registry().await;
472        assert_eq!(registry.protocol(), Protocol::Kafka);
473    }
474
475    #[tokio::test]
476    async fn test_operations_empty_registry() {
477        let registry = create_test_registry().await;
478        let operations = registry.operations();
479        assert!(operations.is_empty());
480    }
481
482    #[tokio::test]
483    async fn test_operations_with_fixtures() {
484        let (registry, _temp_dir) = create_registry_with_fixtures().await;
485        let operations = registry.operations();
486
487        assert_eq!(operations.len(), 1);
488        assert_eq!(operations[0].name, "test-produce");
489        assert_eq!(operations[0].path, "test-topic");
490        assert_eq!(operations[0].operation_type, "PRODUCE");
491    }
492
493    #[tokio::test]
494    async fn test_find_operation_exists() {
495        let (registry, _temp_dir) = create_registry_with_fixtures().await;
496
497        let operation = registry.find_operation("PRODUCE", "test-topic");
498        assert!(operation.is_some());
499
500        let op = operation.unwrap();
501        assert_eq!(op.name, "test-produce");
502        assert_eq!(op.path, "test-topic");
503    }
504
505    #[tokio::test]
506    async fn test_find_operation_wrong_operation_type() {
507        let (registry, _temp_dir) = create_registry_with_fixtures().await;
508
509        let operation = registry.find_operation("FETCH", "test-topic");
510        assert!(operation.is_none());
511    }
512
513    #[tokio::test]
514    async fn test_find_operation_wrong_path() {
515        let (registry, _temp_dir) = create_registry_with_fixtures().await;
516
517        let operation = registry.find_operation("PRODUCE", "wrong-topic");
518        assert!(operation.is_none());
519    }
520
521    #[tokio::test]
522    async fn test_find_operation_empty_registry() {
523        let registry = create_test_registry().await;
524
525        let operation = registry.find_operation("PRODUCE", "any-topic");
526        assert!(operation.is_none());
527    }
528
529    // ==================== validate_request Tests ====================
530
531    #[tokio::test]
532    async fn test_validate_request_valid_topic() {
533        let (registry, _temp_dir) = create_registry_with_fixtures().await;
534
535        let request = ProtocolRequest {
536            operation: "PRODUCE".to_string(),
537            topic: Some("test-topic".to_string()),
538            partition: None,
539            metadata: HashMap::new(),
540            ..Default::default()
541        };
542
543        let result = registry.validate_request(&request).unwrap();
544        assert!(result.valid);
545        assert!(result.errors.is_empty());
546    }
547
548    #[tokio::test]
549    async fn test_validate_request_invalid_topic() {
550        let (registry, _temp_dir) = create_registry_with_fixtures().await;
551
552        let request = ProtocolRequest {
553            operation: "PRODUCE".to_string(),
554            topic: Some("wrong-topic".to_string()),
555            partition: None,
556            metadata: HashMap::new(),
557            ..Default::default()
558        };
559
560        let result = registry.validate_request(&request).unwrap();
561        assert!(!result.valid);
562        assert_eq!(result.errors.len(), 1);
563        assert_eq!(result.errors[0].message, "Topic not found in fixtures");
564    }
565
566    #[tokio::test]
567    async fn test_validate_request_missing_topic() {
568        let (registry, _temp_dir) = create_registry_with_fixtures().await;
569
570        let request = ProtocolRequest {
571            operation: "PRODUCE".to_string(),
572            topic: None,
573            partition: None,
574            metadata: HashMap::new(),
575            ..Default::default()
576        };
577
578        let result = registry.validate_request(&request).unwrap();
579        assert!(!result.valid);
580        assert_eq!(result.errors.len(), 1);
581    }
582
583    // ==================== generate_mock_response Tests ====================
584
585    #[tokio::test]
586    async fn test_generate_mock_response_produce() {
587        let (registry, _temp_dir) = create_registry_with_fixtures().await;
588
589        let request = ProtocolRequest {
590            operation: "PRODUCE".to_string(),
591            topic: Some("test-topic".to_string()),
592            partition: None,
593            metadata: HashMap::new(),
594            ..Default::default()
595        };
596
597        let response = registry.generate_mock_response(&request).unwrap();
598
599        assert!(matches!(
600            response.status,
601            mockforge_core::protocol_abstraction::ResponseStatus::KafkaStatus(0)
602        ));
603        assert_eq!(response.content_type, "application/json");
604        assert!(response.metadata.contains_key("topic"));
605        assert!(response.metadata.contains_key("offset"));
606    }
607
608    #[tokio::test]
609    async fn test_generate_mock_response_produce_missing_topic() {
610        let (registry, _temp_dir) = create_registry_with_fixtures().await;
611
612        let request = ProtocolRequest {
613            operation: "PRODUCE".to_string(),
614            topic: None,
615            partition: None,
616            metadata: HashMap::new(),
617            ..Default::default()
618        };
619
620        let result = registry.generate_mock_response(&request);
621        assert!(result.is_err());
622    }
623
624    #[tokio::test]
625    async fn test_generate_mock_response_produce_no_fixture() {
626        let (registry, _temp_dir) = create_registry_with_fixtures().await;
627
628        let request = ProtocolRequest {
629            operation: "PRODUCE".to_string(),
630            topic: Some("nonexistent-topic".to_string()),
631            partition: None,
632            metadata: HashMap::new(),
633            ..Default::default()
634        };
635
636        let result = registry.generate_mock_response(&request);
637        assert!(result.is_err());
638    }
639
640    #[tokio::test]
641    async fn test_generate_mock_response_fetch() {
642        let (registry, _temp_dir) = create_registry_with_fixtures().await;
643
644        let mut metadata = HashMap::new();
645        metadata.insert("offset".to_string(), "0".to_string());
646
647        let request = ProtocolRequest {
648            operation: "FETCH".to_string(),
649            topic: Some("test-topic".to_string()),
650            partition: Some(0),
651            metadata,
652            ..Default::default()
653        };
654
655        let response = registry.generate_mock_response(&request).unwrap();
656
657        assert!(matches!(
658            response.status,
659            mockforge_core::protocol_abstraction::ResponseStatus::KafkaStatus(0)
660        ));
661        assert_eq!(response.content_type, "application/json");
662        assert_eq!(response.metadata.get("topic").unwrap(), "test-topic");
663        assert_eq!(response.metadata.get("partition").unwrap(), "0");
664    }
665
666    #[tokio::test]
667    async fn test_generate_mock_response_fetch_missing_partition() {
668        let (registry, _temp_dir) = create_registry_with_fixtures().await;
669
670        let request = ProtocolRequest {
671            operation: "FETCH".to_string(),
672            topic: Some("test-topic".to_string()),
673            partition: None,
674            metadata: HashMap::new(),
675            ..Default::default()
676        };
677
678        let result = registry.generate_mock_response(&request);
679        assert!(result.is_err());
680    }
681
682    #[tokio::test]
683    async fn test_generate_mock_response_unsupported_operation() {
684        let (registry, _temp_dir) = create_registry_with_fixtures().await;
685
686        let request = ProtocolRequest {
687            operation: "UNSUPPORTED".to_string(),
688            topic: Some("test-topic".to_string()),
689            partition: None,
690            metadata: HashMap::new(),
691            ..Default::default()
692        };
693
694        let result = registry.generate_mock_response(&request);
695        assert!(result.is_err());
696    }
697
698    #[tokio::test]
699    async fn test_generate_mock_response_with_metadata() {
700        let (registry, _temp_dir) = create_registry_with_fixtures().await;
701
702        let mut metadata = HashMap::new();
703        metadata.insert("id".to_string(), "42".to_string());
704
705        let request = ProtocolRequest {
706            operation: "PRODUCE".to_string(),
707            topic: Some("test-topic".to_string()),
708            partition: None,
709            metadata,
710            ..Default::default()
711        };
712
713        let response = registry.generate_mock_response(&request).unwrap();
714        assert!(response.body.len() > 0);
715    }
716
717    // ==================== Template Engine Integration Tests ====================
718
719    #[tokio::test]
720    async fn test_template_expansion_in_mock_response() {
721        let (registry, _temp_dir) = create_registry_with_fixtures().await;
722
723        let mut metadata = HashMap::new();
724        metadata.insert("id".to_string(), "123".to_string());
725
726        let request = ProtocolRequest {
727            operation: "PRODUCE".to_string(),
728            topic: Some("test-topic".to_string()),
729            partition: None,
730            metadata,
731            ..Default::default()
732        };
733
734        let response = registry.generate_mock_response(&request).unwrap();
735        let body_str = String::from_utf8(response.body).unwrap();
736
737        // The fixture template has "message": "test-{{id}}"
738        // With id=123, it should expand to contain "test-123"
739        assert!(body_str.contains("test") || body_str.contains("message"));
740    }
741
742    // ==================== Concurrent Access Tests ====================
743
744    #[tokio::test]
745    async fn test_concurrent_produce() {
746        let registry = Arc::new(create_test_registry().await);
747
748        let mut handles = vec![];
749        for i in 0..10 {
750            let reg = Arc::clone(&registry);
751            let handle = tokio::spawn(async move {
752                reg.produce("concurrent-topic", None, &serde_json::json!({"id": i})).await
753            });
754            handles.push(handle);
755        }
756
757        for handle in handles {
758            assert!(handle.await.unwrap().is_ok());
759        }
760
761        let topics = registry.topics.read().await;
762        assert!(topics.contains_key("concurrent-topic"));
763    }
764
765    #[tokio::test]
766    async fn test_concurrent_produce_and_fetch() {
767        let registry = Arc::new(create_test_registry().await);
768
769        // Produce some initial messages
770        for i in 0..5 {
771            registry
772                .produce("test-topic", None, &serde_json::json!({"id": i}))
773                .await
774                .unwrap();
775        }
776
777        // Concurrent produces (separate vector for different return type)
778        let mut produce_handles = vec![];
779        for i in 5..10 {
780            let reg = Arc::clone(&registry);
781            let handle = tokio::spawn(async move {
782                reg.produce("test-topic", None, &serde_json::json!({"id": i})).await
783            });
784            produce_handles.push(handle);
785        }
786
787        // Concurrent fetches (separate vector for different return type)
788        let mut fetch_handles = vec![];
789        for _ in 0..5 {
790            let reg = Arc::clone(&registry);
791            let handle = tokio::spawn(async move { reg.fetch("test-topic", 0, 0).await });
792            fetch_handles.push(handle);
793        }
794
795        // Await produce handles
796        for handle in produce_handles {
797            let _ = handle.await;
798        }
799
800        // Await fetch handles
801        for handle in fetch_handles {
802            let _ = handle.await;
803        }
804    }
805}