mockforge_kafka/
spec_registry.rs

1use async_trait::async_trait;
2use chrono;
3use std::collections::HashMap;
4use std::sync::Arc;
5
6use mockforge_core::protocol_abstraction::{
7    ProtocolRequest, ProtocolResponse, ResponseStatus, SpecOperation, SpecRegistry,
8    ValidationError, ValidationResult,
9};
10use mockforge_core::{Protocol, Result};
11
12/// Kafka-specific spec registry implementation
13#[derive(Debug)]
14pub struct KafkaSpecRegistry {
15    fixtures: Vec<Arc<crate::fixtures::KafkaFixture>>,
16    template_engine: mockforge_core::templating::TemplateEngine,
17    topics: std::sync::Arc<
18        tokio::sync::RwLock<std::collections::HashMap<String, crate::topics::Topic>>,
19    >,
20}
21
22impl KafkaSpecRegistry {
23    /// Create a new Kafka spec registry
24    pub async fn new(
25        config: mockforge_core::config::KafkaConfig,
26        topics: std::sync::Arc<
27            tokio::sync::RwLock<std::collections::HashMap<String, crate::topics::Topic>>,
28        >,
29    ) -> Result<Self> {
30        let fixtures = if let Some(fixtures_dir) = &config.fixtures_dir {
31            crate::fixtures::KafkaFixture::load_from_dir(fixtures_dir)?
32                .into_iter()
33                .map(Arc::new)
34                .collect()
35        } else {
36            vec![]
37        };
38
39        let template_engine = mockforge_core::templating::TemplateEngine::new();
40
41        Ok(Self {
42            fixtures,
43            template_engine,
44            topics,
45        })
46    }
47
48    /// Find fixture by topic
49    pub fn find_fixture_by_topic(&self, topic: &str) -> Option<Arc<crate::fixtures::KafkaFixture>> {
50        self.fixtures.iter().find(|f| f.topic == topic).cloned()
51    }
52
53    /// Produce a message to a topic
54    pub async fn produce(
55        &self,
56        topic: &str,
57        key: Option<&str>,
58        value: &serde_json::Value,
59    ) -> Result<i64> {
60        let mut topics = self.topics.write().await;
61
62        // Get or create the topic
63        let topic_entry = topics.entry(topic.to_string()).or_insert_with(|| {
64            crate::topics::Topic::new(topic.to_string(), crate::topics::TopicConfig::default())
65        });
66
67        // Assign partition based on key
68        let partition_id = topic_entry.assign_partition(key.map(|k| k.as_bytes()));
69
70        // Create the message
71        let message = crate::partitions::KafkaMessage {
72            offset: 0, // Will be set by partition.append
73            timestamp: chrono::Utc::now().timestamp_millis(),
74            key: key.map(|k| k.as_bytes().to_vec()),
75            value: serde_json::to_vec(value).map_err(mockforge_core::Error::Json)?,
76            headers: vec![],
77        };
78
79        // Append to partition
80        let offset = topic_entry
81            .get_partition_mut(partition_id)
82            .ok_or_else(|| {
83                mockforge_core::Error::generic(format!("Partition {} not found", partition_id))
84            })?
85            .append(message);
86
87        Ok(offset)
88    }
89
90    /// Fetch messages from a topic partition
91    pub async fn fetch(
92        &self,
93        topic: &str,
94        partition: i32,
95        offset: i64,
96    ) -> Result<Vec<crate::partitions::KafkaMessage>> {
97        let topics = self.topics.read().await;
98
99        if let Some(topic_entry) = topics.get(topic) {
100            if let Some(partition_entry) = topic_entry.get_partition(partition) {
101                // Fetch messages starting from offset
102                let messages = partition_entry.fetch(offset, 1000); // Max 1000 messages
103                Ok(messages.into_iter().cloned().collect())
104            } else {
105                Err(mockforge_core::Error::generic(format!(
106                    "Partition {} not found in topic {}",
107                    partition, topic
108                )))
109            }
110        } else {
111            Err(mockforge_core::Error::generic(format!("Topic {} not found", topic)))
112        }
113    }
114}
115
116#[async_trait]
117impl SpecRegistry for KafkaSpecRegistry {
118    fn protocol(&self) -> Protocol {
119        Protocol::Kafka
120    }
121
122    fn operations(&self) -> Vec<SpecOperation> {
123        // Return operations based on fixtures
124        self.fixtures
125            .iter()
126            .map(|fixture| SpecOperation {
127                name: fixture.identifier.clone(),
128                path: fixture.topic.clone(),
129                operation_type: "PRODUCE".to_string(),
130                input_schema: Some("KafkaMessage".to_string()),
131                output_schema: Some("ProduceResponse".to_string()),
132                metadata: std::collections::HashMap::new(),
133            })
134            .collect()
135    }
136
137    fn find_operation(&self, operation: &str, path: &str) -> Option<SpecOperation> {
138        self.fixtures
139            .iter()
140            .find(|fixture| fixture.topic == path && operation == "PRODUCE")
141            .map(|fixture| SpecOperation {
142                name: fixture.identifier.clone(),
143                path: fixture.topic.clone(),
144                operation_type: "PRODUCE".to_string(),
145                input_schema: Some("KafkaMessage".to_string()),
146                output_schema: Some("ProduceResponse".to_string()),
147                metadata: std::collections::HashMap::new(),
148            })
149    }
150
151    fn validate_request(&self, request: &ProtocolRequest) -> Result<ValidationResult> {
152        // Basic validation - check if topic exists in fixtures
153        let valid = if let Some(topic) = &request.topic {
154            self.fixtures.iter().any(|f| f.topic == *topic)
155        } else {
156            false
157        };
158
159        Ok(ValidationResult {
160            valid,
161            errors: if valid {
162                vec![]
163            } else {
164                vec![ValidationError {
165                    message: "Topic not found in fixtures".to_string(),
166                    path: Some("topic".to_string()),
167                    code: Some("TOPIC_NOT_FOUND".to_string()),
168                }]
169            },
170            warnings: vec![],
171        })
172    }
173
174    fn generate_mock_response(&self, request: &ProtocolRequest) -> Result<ProtocolResponse> {
175        let operation = &request.operation;
176        let topic = request
177            .topic
178            .as_ref()
179            .ok_or_else(|| mockforge_core::Error::generic("Missing topic"))?;
180
181        match operation.as_str() {
182            "PRODUCE" => {
183                let fixture = self.find_fixture_by_topic(topic).ok_or_else(|| {
184                    mockforge_core::Error::generic(format!("No fixture found for topic {}", topic))
185                })?;
186
187                // Generate message using template
188                let templating_context = mockforge_core::templating::TemplatingContext::with_env(
189                    request.metadata.clone(),
190                );
191                let value = self
192                    .template_engine
193                    .expand_tokens_with_context(&fixture.value_template, &templating_context);
194                let _key = fixture.key_pattern.as_ref().map(|key_pattern| {
195                    self.template_engine.expand_str_with_context(key_pattern, &templating_context)
196                });
197
198                // For now, return a mock offset since we don't have actual broker integration
199                let offset = 0i64;
200
201                Ok(ProtocolResponse {
202                    status: ResponseStatus::KafkaStatus(0), // No error
203                    metadata: HashMap::from([
204                        ("topic".to_string(), topic.clone()),
205                        ("offset".to_string(), offset.to_string()),
206                    ]),
207                    body: serde_json::to_string(&value)
208                        .map_err(mockforge_core::Error::Json)?
209                        .into_bytes(),
210                    content_type: "application/json".to_string(),
211                })
212            }
213            "FETCH" => {
214                let partition = request
215                    .partition
216                    .ok_or_else(|| mockforge_core::Error::generic("Missing partition"))?;
217                let _offset =
218                    request.metadata.get("offset").and_then(|s| s.parse().ok()).unwrap_or(0);
219
220                // For now, return empty messages since we don't have actual broker integration
221                let messages: Vec<crate::partitions::KafkaMessage> = vec![];
222
223                Ok(ProtocolResponse {
224                    status: ResponseStatus::KafkaStatus(0),
225                    metadata: HashMap::from([
226                        ("topic".to_string(), topic.clone()),
227                        ("partition".to_string(), partition.to_string()),
228                        ("message_count".to_string(), messages.len().to_string()),
229                    ]),
230                    body: serde_json::to_vec(&messages).map_err(mockforge_core::Error::Json)?,
231                    content_type: "application/json".to_string(),
232                })
233            }
234            _ => {
235                Err(mockforge_core::Error::generic(format!("Unsupported operation: {}", operation)))
236            }
237        }
238    }
239}