mockforge_kafka/
spec_registry.rs1use async_trait::async_trait;
2use chrono;
3use std::collections::HashMap;
4use std::sync::Arc;
5
6use mockforge_core::protocol_abstraction::{
7 ProtocolRequest, ProtocolResponse, ResponseStatus, SpecOperation, SpecRegistry,
8 ValidationError, ValidationResult,
9};
10use mockforge_core::{Protocol, Result};
11
12#[derive(Debug)]
14pub struct KafkaSpecRegistry {
15 fixtures: Vec<Arc<crate::fixtures::KafkaFixture>>,
16 template_engine: mockforge_core::templating::TemplateEngine,
17 topics: std::sync::Arc<
18 tokio::sync::RwLock<std::collections::HashMap<String, crate::topics::Topic>>,
19 >,
20}
21
22impl KafkaSpecRegistry {
23 pub async fn new(
25 config: mockforge_core::config::KafkaConfig,
26 topics: std::sync::Arc<
27 tokio::sync::RwLock<std::collections::HashMap<String, crate::topics::Topic>>,
28 >,
29 ) -> Result<Self> {
30 let fixtures = if let Some(fixtures_dir) = &config.fixtures_dir {
31 crate::fixtures::KafkaFixture::load_from_dir(fixtures_dir)?
32 .into_iter()
33 .map(Arc::new)
34 .collect()
35 } else {
36 vec![]
37 };
38
39 let template_engine = mockforge_core::templating::TemplateEngine::new();
40
41 Ok(Self {
42 fixtures,
43 template_engine,
44 topics,
45 })
46 }
47
48 pub fn find_fixture_by_topic(&self, topic: &str) -> Option<Arc<crate::fixtures::KafkaFixture>> {
50 self.fixtures.iter().find(|f| f.topic == topic).cloned()
51 }
52
53 pub async fn produce(
55 &self,
56 topic: &str,
57 key: Option<&str>,
58 value: &serde_json::Value,
59 ) -> Result<i64> {
60 let mut topics = self.topics.write().await;
61
62 let topic_entry = topics.entry(topic.to_string()).or_insert_with(|| {
64 crate::topics::Topic::new(topic.to_string(), crate::topics::TopicConfig::default())
65 });
66
67 let partition_id = topic_entry.assign_partition(key.map(|k| k.as_bytes()));
69
70 let message = crate::partitions::KafkaMessage {
72 offset: 0, timestamp: chrono::Utc::now().timestamp_millis(),
74 key: key.map(|k| k.as_bytes().to_vec()),
75 value: serde_json::to_vec(value).map_err(mockforge_core::Error::Json)?,
76 headers: vec![],
77 };
78
79 let offset = topic_entry
81 .get_partition_mut(partition_id)
82 .ok_or_else(|| {
83 mockforge_core::Error::generic(format!("Partition {} not found", partition_id))
84 })?
85 .append(message);
86
87 Ok(offset)
88 }
89
90 pub async fn fetch(
92 &self,
93 topic: &str,
94 partition: i32,
95 offset: i64,
96 ) -> Result<Vec<crate::partitions::KafkaMessage>> {
97 let topics = self.topics.read().await;
98
99 if let Some(topic_entry) = topics.get(topic) {
100 if let Some(partition_entry) = topic_entry.get_partition(partition) {
101 let messages = partition_entry.fetch(offset, 1000); Ok(messages.into_iter().cloned().collect())
104 } else {
105 Err(mockforge_core::Error::generic(format!(
106 "Partition {} not found in topic {}",
107 partition, topic
108 )))
109 }
110 } else {
111 Err(mockforge_core::Error::generic(format!("Topic {} not found", topic)))
112 }
113 }
114}
115
116#[async_trait]
117impl SpecRegistry for KafkaSpecRegistry {
118 fn protocol(&self) -> Protocol {
119 Protocol::Kafka
120 }
121
122 fn operations(&self) -> Vec<SpecOperation> {
123 self.fixtures
125 .iter()
126 .map(|fixture| SpecOperation {
127 name: fixture.identifier.clone(),
128 path: fixture.topic.clone(),
129 operation_type: "PRODUCE".to_string(),
130 input_schema: Some("KafkaMessage".to_string()),
131 output_schema: Some("ProduceResponse".to_string()),
132 metadata: std::collections::HashMap::new(),
133 })
134 .collect()
135 }
136
137 fn find_operation(&self, operation: &str, path: &str) -> Option<SpecOperation> {
138 self.fixtures
139 .iter()
140 .find(|fixture| fixture.topic == path && operation == "PRODUCE")
141 .map(|fixture| SpecOperation {
142 name: fixture.identifier.clone(),
143 path: fixture.topic.clone(),
144 operation_type: "PRODUCE".to_string(),
145 input_schema: Some("KafkaMessage".to_string()),
146 output_schema: Some("ProduceResponse".to_string()),
147 metadata: std::collections::HashMap::new(),
148 })
149 }
150
151 fn validate_request(&self, request: &ProtocolRequest) -> Result<ValidationResult> {
152 let valid = if let Some(topic) = &request.topic {
154 self.fixtures.iter().any(|f| f.topic == *topic)
155 } else {
156 false
157 };
158
159 Ok(ValidationResult {
160 valid,
161 errors: if valid {
162 vec![]
163 } else {
164 vec![ValidationError {
165 message: "Topic not found in fixtures".to_string(),
166 path: Some("topic".to_string()),
167 code: Some("TOPIC_NOT_FOUND".to_string()),
168 }]
169 },
170 warnings: vec![],
171 })
172 }
173
174 fn generate_mock_response(&self, request: &ProtocolRequest) -> Result<ProtocolResponse> {
175 let operation = &request.operation;
176 let topic = request
177 .topic
178 .as_ref()
179 .ok_or_else(|| mockforge_core::Error::generic("Missing topic"))?;
180
181 match operation.as_str() {
182 "PRODUCE" => {
183 let fixture = self.find_fixture_by_topic(topic).ok_or_else(|| {
184 mockforge_core::Error::generic(format!("No fixture found for topic {}", topic))
185 })?;
186
187 let templating_context = mockforge_core::templating::TemplatingContext::with_env(
189 request.metadata.clone(),
190 );
191 let value = self
192 .template_engine
193 .expand_tokens_with_context(&fixture.value_template, &templating_context);
194 let _key = fixture.key_pattern.as_ref().map(|key_pattern| {
195 self.template_engine.expand_str_with_context(key_pattern, &templating_context)
196 });
197
198 let offset = 0i64;
200
201 Ok(ProtocolResponse {
202 status: ResponseStatus::KafkaStatus(0), metadata: HashMap::from([
204 ("topic".to_string(), topic.clone()),
205 ("offset".to_string(), offset.to_string()),
206 ]),
207 body: serde_json::to_string(&value)
208 .map_err(mockforge_core::Error::Json)?
209 .into_bytes(),
210 content_type: "application/json".to_string(),
211 })
212 }
213 "FETCH" => {
214 let partition = request
215 .partition
216 .ok_or_else(|| mockforge_core::Error::generic("Missing partition"))?;
217 let _offset =
218 request.metadata.get("offset").and_then(|s| s.parse().ok()).unwrap_or(0);
219
220 let messages: Vec<crate::partitions::KafkaMessage> = vec![];
222
223 Ok(ProtocolResponse {
224 status: ResponseStatus::KafkaStatus(0),
225 metadata: HashMap::from([
226 ("topic".to_string(), topic.clone()),
227 ("partition".to_string(), partition.to_string()),
228 ("message_count".to_string(), messages.len().to_string()),
229 ]),
230 body: serde_json::to_vec(&messages).map_err(mockforge_core::Error::Json)?,
231 content_type: "application/json".to_string(),
232 })
233 }
234 _ => {
235 Err(mockforge_core::Error::generic(format!("Unsupported operation: {}", operation)))
236 }
237 }
238 }
239}