1use std::collections::HashMap;
2use std::sync::Arc;
3use tokio::io::{AsyncReadExt, AsyncWriteExt};
4use tokio::net::{TcpListener, TcpStream};
5use tokio::sync::RwLock;
6
7use crate::consumer_groups::ConsumerGroupManager;
8use crate::metrics::KafkaMetrics;
9use crate::protocol::{KafkaProtocolHandler, KafkaRequest, KafkaRequestType, KafkaResponse};
10use crate::spec_registry::KafkaSpecRegistry;
11use crate::topics::Topic;
12use mockforge_core::config::KafkaConfig;
13use mockforge_core::Result;
14
15#[derive(Clone)]
57#[allow(dead_code)]
58pub struct KafkaMockBroker {
59 config: KafkaConfig,
61 pub topics: Arc<RwLock<HashMap<String, Topic>>>,
63 pub consumer_groups: Arc<RwLock<ConsumerGroupManager>>,
65 spec_registry: Arc<KafkaSpecRegistry>,
67 metrics: Arc<KafkaMetrics>,
69}
70
71impl KafkaMockBroker {
72 pub async fn new(config: KafkaConfig) -> Result<Self> {
98 let topics = Arc::new(RwLock::new(HashMap::new()));
99 let consumer_groups = Arc::new(RwLock::new(ConsumerGroupManager::new()));
100 let spec_registry = KafkaSpecRegistry::new(config.clone(), Arc::clone(&topics)).await?;
101 let metrics = Arc::new(KafkaMetrics::new());
102
103 Ok(Self {
104 config,
105 topics,
106 consumer_groups,
107 spec_registry: Arc::new(spec_registry),
108 metrics,
109 })
110 }
111
112 pub async fn start(&self) -> Result<()> {
140 let addr = format!("{}:{}", self.config.host, self.config.port);
141 let listener = TcpListener::bind(&addr).await?;
142
143 tracing::info!("Starting Kafka mock broker on {}", addr);
144
145 loop {
146 let (socket, _) = listener.accept().await?;
147 let broker = Arc::new(self.clone());
148
149 tokio::spawn(async move {
150 if let Err(e) = broker.handle_connection(socket).await {
151 tracing::error!("Error handling connection: {}", e);
152 }
153 });
154 }
155 }
156
157 async fn handle_connection(&self, mut socket: TcpStream) -> Result<()> {
159 let protocol_handler = KafkaProtocolHandler::new();
160 self.metrics.record_connection();
161
162 let _guard = ConnectionGuard {
164 metrics: Arc::clone(&self.metrics),
165 };
166
167 loop {
168 let mut size_buf = [0u8; 4];
170 match tokio::time::timeout(
171 std::time::Duration::from_secs(30),
172 socket.read_exact(&mut size_buf),
173 )
174 .await
175 {
176 Ok(Ok(_)) => {
177 let message_size = i32::from_be_bytes(size_buf) as usize;
178
179 if message_size > 10 * 1024 * 1024 {
181 self.metrics.record_error();
183 tracing::warn!("Message size too large: {} bytes", message_size);
184 continue;
185 }
186
187 let mut message_buf = vec![0u8; message_size];
189 if let Err(e) = tokio::time::timeout(
190 std::time::Duration::from_secs(10),
191 socket.read_exact(&mut message_buf),
192 )
193 .await
194 {
195 self.metrics.record_error();
196 tracing::error!("Timeout reading message: {}", e);
197 break;
198 }
199
200 let request = match protocol_handler.parse_request(&message_buf) {
202 Ok(req) => req,
203 Err(e) => {
204 self.metrics.record_error();
205 tracing::error!("Failed to parse request: {}", e);
206 continue;
207 }
208 };
209
210 self.metrics.record_request(get_api_key_from_request(&request));
212
213 let start_time = std::time::Instant::now();
214
215 let response = match self.handle_request(request).await {
217 Ok(resp) => resp,
218 Err(e) => {
219 self.metrics.record_error();
220 tracing::error!("Failed to handle request: {}", e);
221 continue;
223 }
224 };
225
226 let latency = start_time.elapsed().as_micros() as u64;
227 self.metrics.record_request_latency(latency);
228 self.metrics.record_response();
229
230 let response_data = match protocol_handler.serialize_response(&response, 0) {
232 Ok(data) => data,
233 Err(e) => {
234 self.metrics.record_error();
235 tracing::error!("Failed to serialize response: {}", e);
236 continue;
237 }
238 };
239
240 let response_size = (response_data.len() as i32).to_be_bytes();
242 if let Err(e) = socket.write_all(&response_size).await {
243 self.metrics.record_error();
244 tracing::error!("Failed to write response size: {}", e);
245 break;
246 }
247
248 if let Err(e) = socket.write_all(&response_data).await {
250 self.metrics.record_error();
251 tracing::error!("Failed to write response: {}", e);
252 break;
253 }
254 }
255 Ok(Err(e)) => {
256 self.metrics.record_error();
257 tracing::error!("Failed to read message size: {}", e);
258 break;
259 }
260 Err(_) => {
261 continue;
263 }
264 }
265 }
266
267 Ok(())
268 }
269
270 async fn handle_request(&self, request: KafkaRequest) -> Result<KafkaResponse> {
272 match request.request_type {
273 KafkaRequestType::Metadata => self.handle_metadata().await,
274 KafkaRequestType::Produce => self.handle_produce().await,
275 KafkaRequestType::Fetch => self.handle_fetch().await,
276 KafkaRequestType::ListGroups => self.handle_list_groups().await,
277 KafkaRequestType::DescribeGroups => self.handle_describe_groups().await,
278 KafkaRequestType::ApiVersions => self.handle_api_versions().await,
279 KafkaRequestType::CreateTopics => self.handle_create_topics().await,
280 KafkaRequestType::DeleteTopics => self.handle_delete_topics().await,
281 KafkaRequestType::DescribeConfigs => self.handle_describe_configs().await,
282 }
283 }
284
285 async fn handle_metadata(&self) -> Result<KafkaResponse> {
286 Ok(KafkaResponse::Metadata)
288 }
289
290 async fn handle_produce(&self) -> Result<KafkaResponse> {
291 Ok(KafkaResponse::Produce)
293 }
294
295 async fn handle_fetch(&self) -> Result<KafkaResponse> {
296 Ok(KafkaResponse::Fetch)
298 }
299
300 async fn handle_api_versions(&self) -> Result<KafkaResponse> {
301 Ok(KafkaResponse::ApiVersions)
302 }
303
304 async fn handle_list_groups(&self) -> Result<KafkaResponse> {
305 Ok(KafkaResponse::ListGroups)
306 }
307
308 async fn handle_describe_groups(&self) -> Result<KafkaResponse> {
309 Ok(KafkaResponse::DescribeGroups)
310 }
311
312 async fn handle_create_topics(&self) -> Result<KafkaResponse> {
313 let topic_name = "default-topic".to_string();
316 let topic_config = crate::topics::TopicConfig::default();
317 let topic = crate::topics::Topic::new(topic_name.clone(), topic_config);
318
319 let mut topics = self.topics.write().await;
321 topics.insert(topic_name, topic);
322
323 Ok(KafkaResponse::CreateTopics)
324 }
325
326 async fn handle_delete_topics(&self) -> Result<KafkaResponse> {
327 Ok(KafkaResponse::DeleteTopics)
328 }
329
330 async fn handle_describe_configs(&self) -> Result<KafkaResponse> {
331 Ok(KafkaResponse::DescribeConfigs)
332 }
333
334 pub async fn test_commit_offsets(
336 &self,
337 group_id: &str,
338 offsets: std::collections::HashMap<(String, i32), i64>,
339 ) -> Result<()> {
340 let mut consumer_groups = self.consumer_groups.write().await;
341 consumer_groups
342 .commit_offsets(group_id, offsets)
343 .await
344 .map_err(|e| mockforge_core::Error::from(e.to_string()))
345 }
346
347 pub async fn test_get_committed_offsets(
349 &self,
350 group_id: &str,
351 ) -> std::collections::HashMap<(String, i32), i64> {
352 let consumer_groups = self.consumer_groups.read().await;
353 consumer_groups.get_committed_offsets(group_id)
354 }
355
356 pub async fn test_create_topic(&self, name: &str, config: crate::topics::TopicConfig) {
358 use crate::topics::Topic;
359 let topic = Topic::new(name.to_string(), config);
360 let mut topics = self.topics.write().await;
361 topics.insert(name.to_string(), topic);
362 }
363
364 pub async fn test_join_group(
366 &self,
367 group_id: &str,
368 member_id: &str,
369 client_id: &str,
370 ) -> Result<()> {
371 let mut consumer_groups = self.consumer_groups.write().await;
372 consumer_groups
373 .join_group(group_id, member_id, client_id)
374 .await
375 .map_err(|e| mockforge_core::Error::from(e.to_string()))?;
376 Ok(())
377 }
378
379 pub async fn test_sync_group(
381 &self,
382 group_id: &str,
383 assignments: Vec<crate::consumer_groups::PartitionAssignment>,
384 ) -> Result<()> {
385 let topics = self.topics.read().await;
386 let mut consumer_groups = self.consumer_groups.write().await;
387 consumer_groups
388 .sync_group(group_id, assignments, &topics)
389 .await
390 .map_err(|e| mockforge_core::Error::from(e.to_string()))?;
391 Ok(())
392 }
393
394 pub async fn test_get_assignments(
396 &self,
397 group_id: &str,
398 member_id: &str,
399 ) -> Vec<crate::consumer_groups::PartitionAssignment> {
400 let consumer_groups = self.consumer_groups.read().await;
401 if let Some(group) = consumer_groups.groups().get(group_id) {
402 if let Some(member) = group.members.get(member_id) {
403 return member.assignment.clone();
404 }
405 }
406 vec![]
407 }
408
409 pub async fn test_simulate_lag(&self, group_id: &str, topic: &str, lag: i64) -> Result<()> {
411 let topics = self.topics.read().await;
412 let mut consumer_groups = self.consumer_groups.write().await;
413 consumer_groups.simulate_lag(group_id, topic, lag, &topics).await;
414 Ok(())
415 }
416
417 pub async fn test_reset_offsets(&self, group_id: &str, topic: &str, to: &str) -> Result<()> {
419 let topics = self.topics.read().await;
420 let mut consumer_groups = self.consumer_groups.write().await;
421 consumer_groups.reset_offsets(group_id, topic, to, &topics).await;
422 Ok(())
423 }
424}
425
426#[derive(Debug, Clone)]
428pub struct Record {
429 pub key: Option<Vec<u8>>,
430 pub value: Vec<u8>,
431 pub headers: Vec<(String, Vec<u8>)>,
432}
433
434#[derive(Debug)]
436pub struct ProduceResponse {
437 pub partition: i32,
438 pub error_code: i16,
439 pub offset: i64,
440}
441
442#[derive(Debug)]
444pub struct FetchResponse {
445 pub partition: i32,
446 pub error_code: i16,
447 pub high_watermark: i64,
448 pub records: Vec<Record>,
449}
450
451struct ConnectionGuard {
453 metrics: Arc<KafkaMetrics>,
454}
455
456impl Drop for ConnectionGuard {
457 fn drop(&mut self) {
458 self.metrics.record_connection_closed();
459 }
460}
461
462fn get_api_key_from_request(request: &KafkaRequest) -> i16 {
464 request.api_key
465}