mockforge_kafka/
broker.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use tokio::io::{AsyncReadExt, AsyncWriteExt};
4use tokio::net::{TcpListener, TcpStream};
5use tokio::sync::RwLock;
6
7use crate::consumer_groups::ConsumerGroupManager;
8use crate::metrics::KafkaMetrics;
9use crate::protocol::{KafkaProtocolHandler, KafkaRequest, KafkaRequestType, KafkaResponse};
10use crate::spec_registry::KafkaSpecRegistry;
11use crate::topics::Topic;
12use mockforge_core::config::KafkaConfig;
13use mockforge_core::Result;
14
15/// Mock Kafka broker implementation
16///
17/// The `KafkaMockBroker` simulates a complete Apache Kafka broker, handling
18/// TCP connections and responding to Kafka protocol requests. It supports
19/// multiple concurrent connections and provides comprehensive metrics collection.
20///
21/// # Architecture
22///
23/// The broker maintains several key components:
24/// - **Topics**: Managed topic and partition storage
25/// - **Consumer Groups**: Consumer group coordination and partition assignment
26/// - **Spec Registry**: Fixture-based request/response handling
27/// - **Metrics**: Real-time performance and usage statistics
28///
29/// # Supported Operations
30///
31/// - Produce: Message production with acknowledgments
32/// - Fetch: Message consumption with offset tracking
33/// - Metadata: Topic and broker discovery
34/// - ListGroups/DescribeGroups: Consumer group management
35/// - ApiVersions: Protocol version negotiation
36/// - CreateTopics/DeleteTopics: Dynamic topic management
37/// - DescribeConfigs: Configuration retrieval
38///
39/// # Example
40///
41/// ```rust,no_run
42/// use mockforge_kafka::KafkaMockBroker;
43/// use mockforge_core::config::KafkaConfig;
44///
45/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
46/// let config = KafkaConfig {
47///     port: 9092,
48///     ..Default::default()
49/// };
50///
51/// let broker = KafkaMockBroker::new(config).await?;
52/// broker.start().await?;
53/// # Ok(())
54/// # }
55/// ```
56#[derive(Clone)]
57#[allow(dead_code)]
58pub struct KafkaMockBroker {
59    /// Broker configuration
60    config: KafkaConfig,
61    /// Topic storage with thread-safe access
62    pub topics: Arc<RwLock<HashMap<String, Topic>>>,
63    /// Consumer group manager
64    pub consumer_groups: Arc<RwLock<ConsumerGroupManager>>,
65    /// Specification registry for fixture-based responses
66    spec_registry: Arc<KafkaSpecRegistry>,
67    /// Metrics collection and reporting
68    metrics: Arc<KafkaMetrics>,
69}
70
71impl KafkaMockBroker {
72    /// Create a new Kafka mock broker
73    ///
74    /// Initializes the broker with the provided configuration, setting up
75    /// internal data structures for topics, consumer groups, and metrics.
76    ///
77    /// # Arguments
78    ///
79    /// * `config` - Kafka broker configuration including port, timeouts, and fixture paths
80    ///
81    /// # Returns
82    ///
83    /// Returns a `Result` containing the initialized broker or an error if initialization fails.
84    ///
85    /// # Example
86    ///
87    /// ```rust,no_run
88    /// use mockforge_kafka::KafkaMockBroker;
89    /// use mockforge_core::config::KafkaConfig;
90    ///
91    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
92    /// let config = KafkaConfig::default();
93    /// let broker = KafkaMockBroker::new(config).await?;
94    /// # Ok(())
95    /// # }
96    /// ```
97    pub async fn new(config: KafkaConfig) -> Result<Self> {
98        let topics = Arc::new(RwLock::new(HashMap::new()));
99        let consumer_groups = Arc::new(RwLock::new(ConsumerGroupManager::new()));
100        let spec_registry = KafkaSpecRegistry::new(config.clone(), Arc::clone(&topics)).await?;
101        let metrics = Arc::new(KafkaMetrics::new());
102
103        Ok(Self {
104            config,
105            topics,
106            consumer_groups,
107            spec_registry: Arc::new(spec_registry),
108            metrics,
109        })
110    }
111
112    /// Start the Kafka broker server
113    ///
114    /// Binds to the configured host and port, then begins accepting TCP connections.
115    /// Each connection is handled in a separate async task, allowing concurrent client connections.
116    ///
117    /// The broker will run indefinitely until the task is cancelled or an error occurs.
118    ///
119    /// # Returns
120    ///
121    /// Returns a `Result` that indicates whether the broker started successfully.
122    /// The method only returns on error, as it runs an infinite accept loop.
123    ///
124    /// # Example
125    ///
126    /// ```rust,no_run
127    /// use mockforge_kafka::KafkaMockBroker;
128    /// use mockforge_core::config::KafkaConfig;
129    ///
130    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
131    /// let config = KafkaConfig::default();
132    /// let broker = KafkaMockBroker::new(config).await?;
133    ///
134    /// // Start the broker (this will run indefinitely)
135    /// broker.start().await?;
136    /// # Ok(())
137    /// # }
138    /// ```
139    pub async fn start(&self) -> Result<()> {
140        let addr = format!("{}:{}", self.config.host, self.config.port);
141        let listener = TcpListener::bind(&addr).await?;
142
143        tracing::info!("Starting Kafka mock broker on {}", addr);
144
145        loop {
146            let (socket, _) = listener.accept().await?;
147            let broker = Arc::new(self.clone());
148
149            tokio::spawn(async move {
150                if let Err(e) = broker.handle_connection(socket).await {
151                    tracing::error!("Error handling connection: {}", e);
152                }
153            });
154        }
155    }
156
157    /// Handle a client connection
158    async fn handle_connection(&self, mut socket: TcpStream) -> Result<()> {
159        let protocol_handler = KafkaProtocolHandler::new();
160        self.metrics.record_connection();
161
162        // Ensure we decrement active connections when done
163        let _guard = ConnectionGuard {
164            metrics: Arc::clone(&self.metrics),
165        };
166
167        loop {
168            // Read message size (4 bytes) with timeout
169            let mut size_buf = [0u8; 4];
170            match tokio::time::timeout(
171                std::time::Duration::from_secs(30),
172                socket.read_exact(&mut size_buf),
173            )
174            .await
175            {
176                Ok(Ok(_)) => {
177                    let message_size = i32::from_be_bytes(size_buf) as usize;
178
179                    // Validate message size (prevent DoS)
180                    if message_size > 10 * 1024 * 1024 {
181                        // 10MB limit
182                        self.metrics.record_error();
183                        tracing::warn!("Message size too large: {} bytes", message_size);
184                        continue;
185                    }
186
187                    // Read message
188                    let mut message_buf = vec![0u8; message_size];
189                    if let Err(e) = tokio::time::timeout(
190                        std::time::Duration::from_secs(10),
191                        socket.read_exact(&mut message_buf),
192                    )
193                    .await
194                    {
195                        self.metrics.record_error();
196                        tracing::error!("Timeout reading message: {}", e);
197                        break;
198                    }
199
200                    // Parse request
201                    let request = match protocol_handler.parse_request(&message_buf) {
202                        Ok(req) => req,
203                        Err(e) => {
204                            self.metrics.record_error();
205                            tracing::error!("Failed to parse request: {}", e);
206                            continue;
207                        }
208                    };
209
210                    // Record request metrics
211                    self.metrics.record_request(get_api_key_from_request(&request));
212
213                    let start_time = std::time::Instant::now();
214
215                    // Handle request
216                    let response = match self.handle_request(request).await {
217                        Ok(resp) => resp,
218                        Err(e) => {
219                            self.metrics.record_error();
220                            tracing::error!("Failed to handle request: {}", e);
221                            // Return error response
222                            continue;
223                        }
224                    };
225
226                    let latency = start_time.elapsed().as_micros() as u64;
227                    self.metrics.record_request_latency(latency);
228                    self.metrics.record_response();
229
230                    // Serialize response
231                    let response_data = match protocol_handler.serialize_response(&response, 0) {
232                        Ok(data) => data,
233                        Err(e) => {
234                            self.metrics.record_error();
235                            tracing::error!("Failed to serialize response: {}", e);
236                            continue;
237                        }
238                    };
239
240                    // Write response size
241                    let response_size = (response_data.len() as i32).to_be_bytes();
242                    if let Err(e) = socket.write_all(&response_size).await {
243                        self.metrics.record_error();
244                        tracing::error!("Failed to write response size: {}", e);
245                        break;
246                    }
247
248                    // Write response
249                    if let Err(e) = socket.write_all(&response_data).await {
250                        self.metrics.record_error();
251                        tracing::error!("Failed to write response: {}", e);
252                        break;
253                    }
254                }
255                Ok(Err(e)) => {
256                    self.metrics.record_error();
257                    tracing::error!("Failed to read message size: {}", e);
258                    break;
259                }
260                Err(_) => {
261                    // Timeout - client may be idle, just continue
262                    continue;
263                }
264            }
265        }
266
267        Ok(())
268    }
269
270    /// Handle a parsed Kafka request
271    async fn handle_request(&self, request: KafkaRequest) -> Result<KafkaResponse> {
272        match request.request_type {
273            KafkaRequestType::Metadata => self.handle_metadata().await,
274            KafkaRequestType::Produce => self.handle_produce().await,
275            KafkaRequestType::Fetch => self.handle_fetch().await,
276            KafkaRequestType::ListGroups => self.handle_list_groups().await,
277            KafkaRequestType::DescribeGroups => self.handle_describe_groups().await,
278            KafkaRequestType::ApiVersions => self.handle_api_versions().await,
279            KafkaRequestType::CreateTopics => self.handle_create_topics().await,
280            KafkaRequestType::DeleteTopics => self.handle_delete_topics().await,
281            KafkaRequestType::DescribeConfigs => self.handle_describe_configs().await,
282        }
283    }
284
285    async fn handle_metadata(&self) -> Result<KafkaResponse> {
286        // Simplified metadata response
287        Ok(KafkaResponse::Metadata)
288    }
289
290    async fn handle_produce(&self) -> Result<KafkaResponse> {
291        // Produce logic not yet implemented
292        Ok(KafkaResponse::Produce)
293    }
294
295    async fn handle_fetch(&self) -> Result<KafkaResponse> {
296        // Fetch logic not yet implemented
297        Ok(KafkaResponse::Fetch)
298    }
299
300    async fn handle_api_versions(&self) -> Result<KafkaResponse> {
301        Ok(KafkaResponse::ApiVersions)
302    }
303
304    async fn handle_list_groups(&self) -> Result<KafkaResponse> {
305        Ok(KafkaResponse::ListGroups)
306    }
307
308    async fn handle_describe_groups(&self) -> Result<KafkaResponse> {
309        Ok(KafkaResponse::DescribeGroups)
310    }
311
312    async fn handle_create_topics(&self) -> Result<KafkaResponse> {
313        // For now, create a default topic as a placeholder
314        // Protocol parsing for actual topic creation parameters is not yet implemented
315        let topic_name = "default-topic".to_string();
316        let topic_config = crate::topics::TopicConfig::default();
317        let topic = crate::topics::Topic::new(topic_name.clone(), topic_config);
318
319        // Store the topic
320        let mut topics = self.topics.write().await;
321        topics.insert(topic_name, topic);
322
323        Ok(KafkaResponse::CreateTopics)
324    }
325
326    async fn handle_delete_topics(&self) -> Result<KafkaResponse> {
327        Ok(KafkaResponse::DeleteTopics)
328    }
329
330    async fn handle_describe_configs(&self) -> Result<KafkaResponse> {
331        Ok(KafkaResponse::DescribeConfigs)
332    }
333
334    /// Test helper: Commit offsets for a consumer group (only available in tests)
335    pub async fn test_commit_offsets(
336        &self,
337        group_id: &str,
338        offsets: std::collections::HashMap<(String, i32), i64>,
339    ) -> Result<()> {
340        let mut consumer_groups = self.consumer_groups.write().await;
341        consumer_groups
342            .commit_offsets(group_id, offsets)
343            .await
344            .map_err(|e| mockforge_core::Error::from(e.to_string()))
345    }
346
347    /// Test helper: Get committed offsets for a consumer group (only available in tests)
348    pub async fn test_get_committed_offsets(
349        &self,
350        group_id: &str,
351    ) -> std::collections::HashMap<(String, i32), i64> {
352        let consumer_groups = self.consumer_groups.read().await;
353        consumer_groups.get_committed_offsets(group_id)
354    }
355
356    /// Test helper: Create a topic (only available in tests)
357    pub async fn test_create_topic(&self, name: &str, config: crate::topics::TopicConfig) {
358        use crate::topics::Topic;
359        let topic = Topic::new(name.to_string(), config);
360        let mut topics = self.topics.write().await;
361        topics.insert(name.to_string(), topic);
362    }
363
364    /// Test helper: Join a consumer group (only available in tests)
365    pub async fn test_join_group(
366        &self,
367        group_id: &str,
368        member_id: &str,
369        client_id: &str,
370    ) -> Result<()> {
371        let mut consumer_groups = self.consumer_groups.write().await;
372        consumer_groups
373            .join_group(group_id, member_id, client_id)
374            .await
375            .map_err(|e| mockforge_core::Error::from(e.to_string()))?;
376        Ok(())
377    }
378
379    /// Test helper: Sync group assignment (only available in tests)
380    pub async fn test_sync_group(
381        &self,
382        group_id: &str,
383        assignments: Vec<crate::consumer_groups::PartitionAssignment>,
384    ) -> Result<()> {
385        let topics = self.topics.read().await;
386        let mut consumer_groups = self.consumer_groups.write().await;
387        consumer_groups
388            .sync_group(group_id, assignments, &topics)
389            .await
390            .map_err(|e| mockforge_core::Error::from(e.to_string()))?;
391        Ok(())
392    }
393
394    /// Test helper: Get group member assignments (only available in tests)
395    pub async fn test_get_assignments(
396        &self,
397        group_id: &str,
398        member_id: &str,
399    ) -> Vec<crate::consumer_groups::PartitionAssignment> {
400        let consumer_groups = self.consumer_groups.read().await;
401        if let Some(group) = consumer_groups.groups().get(group_id) {
402            if let Some(member) = group.members.get(member_id) {
403                return member.assignment.clone();
404            }
405        }
406        vec![]
407    }
408
409    /// Test helper: Simulate consumer lag (only available in tests)
410    pub async fn test_simulate_lag(&self, group_id: &str, topic: &str, lag: i64) -> Result<()> {
411        let topics = self.topics.read().await;
412        let mut consumer_groups = self.consumer_groups.write().await;
413        consumer_groups.simulate_lag(group_id, topic, lag, &topics).await;
414        Ok(())
415    }
416
417    /// Test helper: Reset consumer offsets (only available in tests)
418    pub async fn test_reset_offsets(&self, group_id: &str, topic: &str, to: &str) -> Result<()> {
419        let topics = self.topics.read().await;
420        let mut consumer_groups = self.consumer_groups.write().await;
421        consumer_groups.reset_offsets(group_id, topic, to, &topics).await;
422        Ok(())
423    }
424}
425
426/// Record represents a Kafka message record
427#[derive(Debug, Clone)]
428pub struct Record {
429    pub key: Option<Vec<u8>>,
430    pub value: Vec<u8>,
431    pub headers: Vec<(String, Vec<u8>)>,
432}
433
434/// Response for produce requests
435#[derive(Debug)]
436pub struct ProduceResponse {
437    pub partition: i32,
438    pub error_code: i16,
439    pub offset: i64,
440}
441
442/// Response for fetch requests
443#[derive(Debug)]
444pub struct FetchResponse {
445    pub partition: i32,
446    pub error_code: i16,
447    pub high_watermark: i64,
448    pub records: Vec<Record>,
449}
450
451/// Guard to ensure connection metrics are properly cleaned up
452struct ConnectionGuard {
453    metrics: Arc<KafkaMetrics>,
454}
455
456impl Drop for ConnectionGuard {
457    fn drop(&mut self) {
458        self.metrics.record_connection_closed();
459    }
460}
461
462/// Extract API key from request for metrics
463fn get_api_key_from_request(request: &KafkaRequest) -> i16 {
464    request.api_key
465}