mockforge_kafka/lib.rs
1//! # MockForge Kafka
2//!
3//! Kafka protocol support for MockForge.
4//!
5//! This crate provides Kafka-specific functionality for creating mock Kafka brokers,
6//! including topic management, partition handling, consumer groups, and fixture-driven message generation.
7//!
8//! ## Overview
9//!
10//! MockForge Kafka enables you to:
11//!
12//! - **Mock Kafka brokers**: Simulate Apache Kafka brokers for testing
13//! - **Topic and partition management**: Create and manage topics with configurable partitions
14//! - **Producer/consumer simulation**: Handle produce and fetch requests
15//! - **Consumer group coordination**: Simulate consumer group behavior and rebalancing
16//! - **Fixture-based messaging**: Generate messages using templates and patterns
17//! - **Auto-produce functionality**: Automatically generate messages at specified rates
18//!
19//! ## Quick Start
20//!
21//! ### Basic Kafka Broker
22//!
23//! ```rust,no_run
24//! use mockforge_kafka::KafkaMockBroker;
25//! use mockforge_core::config::KafkaConfig;
26//!
27//! #[tokio::main]
28//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
29//! let config = KafkaConfig::default();
30//! let broker = KafkaMockBroker::new(config).await?;
31//!
32//! broker.start().await?;
33//! Ok(())
34//! }
35//! ```
36//!
37//! ## Key Features
38//!
39//! ### Broker Simulation
40//! - **10+ Kafka APIs supported**: Produce, Fetch, Metadata, ListGroups, DescribeGroups, ApiVersions, CreateTopics, DeleteTopics, DescribeConfigs
41//! - **Protocol-compliant responses**: Full Kafka protocol implementation without external dependencies
42//! - **Connection handling**: TCP-based broker connections with proper error handling
43//! - **Topic and partition management**: Dynamic topic/partition creation and management
44//!
45//! ### Metrics and Monitoring
46//! - **Comprehensive metrics**: Request counts, error rates, connection tracking
47//! - **Prometheus integration**: Export metrics in Prometheus format
48//! - **Real-time monitoring**: Live metrics collection during broker operation
49//!
50//! ### Fixture System
51//! - **YAML-based fixtures**: Define message templates and auto-production rules
52//! - **Template engine integration**: Use MockForge's templating system for dynamic content
53//! - **Auto-produce functionality**: Automatically generate messages at specified rates
54//! - **Key and value templating**: Flexible message generation with context variables
55//!
56//! ### Consumer Group Management
57//! - **Group coordination**: Simulate consumer group joins and synchronization
58//! - **Partition assignment**: Automatic partition distribution among consumers
59//! - **Offset management**: Track and manage consumer offsets
60//! - **Rebalancing simulation**: Test consumer group rebalancing scenarios
61//!
62//! ### Testing Features
63//! - **Protocol validation**: Ensure requests conform to Kafka protocol specifications
64//! - **Error simulation**: Configurable error responses for testing error handling
65//! - **Performance testing**: Built-in benchmarking support
66//! - **Integration testing**: Compatible with standard Kafka client libraries
67//!
68//! ## Supported Kafka APIs
69//!
70//! - **Produce** (API Key 0): Message production with acknowledgments
71//! - **Fetch** (API Key 1): Message consumption with offset management
72//! - **Metadata** (API Key 3): Topic and broker metadata discovery
73//! - **ListGroups** (API Key 9): Consumer group listing
74//! - **DescribeGroups** (API Key 15): Consumer group details and member information
75//! - **ApiVersions** (API Key 18): Protocol version negotiation
76//! - **CreateTopics** (API Key 19): Dynamic topic creation
77//! - **DeleteTopics** (API Key 20): Topic deletion
78//! - **DescribeConfigs** (API Key 32): Configuration retrieval
79//!
80//! ## Example Usage
81//!
82//! ### Basic Broker Setup
83//!
84//! ```rust,no_run
85//! use mockforge_kafka::KafkaMockBroker;
86//! use mockforge_core::config::KafkaConfig;
87//!
88//! #[tokio::main]
89//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
90//! let config = KafkaConfig {
91//! port: 9092,
92//! ..Default::default()
93//! };
94//!
95//! let broker = KafkaMockBroker::new(config).await?;
96//! broker.start().await?;
97//!
98//! Ok(())
99//! }
100//! ```
101//!
102//! ### Metrics Export
103//!
104//! ```rust,no_run
105//! use mockforge_kafka::MetricsExporter;
106//! use mockforge_kafka::KafkaMetrics;
107//! use std::sync::Arc;
108//!
109//! #[tokio::main]
110//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
111//! let metrics = Arc::new(KafkaMetrics::default());
112//! let exporter = MetricsExporter::new(metrics.clone());
113//!
114//! // Export metrics in Prometheus format
115//! let snapshot = exporter.export_prometheus();
116//! println!("{}", snapshot);
117//!
118//! Ok(())
119//! }
120//! ```
121//!
122//! ## Related Crates
123//!
124//! - [`mockforge-core`](https://docs.rs/mockforge-core): Core mocking functionality and configuration
125//! - [`rdkafka`](https://docs.rs/rdkafka): Kafka client library for testing integration
126
127pub mod broker;
128pub mod consumer_groups;
129pub mod fixtures;
130pub mod metrics;
131pub mod partitions;
132pub mod protocol;
133pub mod spec_registry;
134pub mod topics;
135
136// Re-export main types
137pub use broker::KafkaMockBroker;
138pub use consumer_groups::{ConsumerGroup, ConsumerGroupManager};
139pub use fixtures::{AutoProduceConfig, KafkaFixture};
140pub use metrics::{KafkaMetrics, MetricsExporter, MetricsSnapshot};
141pub use partitions::{KafkaMessage, Partition};
142pub use spec_registry::KafkaSpecRegistry;
143pub use topics::{Topic, TopicConfig};
144
145#[cfg(test)]
146mod tests {
147 use super::*;
148
149 // ==================== Module Re-export Tests ====================
150
151 #[test]
152 fn test_kafka_mock_broker_available() {
153 // Ensure KafkaMockBroker is re-exported and accessible
154 let _type_exists: Option<KafkaMockBroker> = None;
155 }
156
157 #[test]
158 fn test_consumer_group_available() {
159 // Ensure ConsumerGroup is re-exported and accessible
160 let _type_exists: Option<ConsumerGroup> = None;
161 }
162
163 #[test]
164 fn test_consumer_group_manager_available() {
165 // Ensure ConsumerGroupManager is re-exported and accessible
166 let _type_exists: Option<ConsumerGroupManager> = None;
167 }
168
169 #[test]
170 fn test_kafka_fixture_available() {
171 // Ensure KafkaFixture is re-exported and accessible
172 let _type_exists: Option<KafkaFixture> = None;
173 }
174
175 #[test]
176 fn test_auto_produce_config_available() {
177 // Ensure AutoProduceConfig is re-exported and accessible
178 let _type_exists: Option<AutoProduceConfig> = None;
179 }
180
181 #[test]
182 fn test_kafka_metrics_available() {
183 // Ensure KafkaMetrics is re-exported and accessible
184 let _type_exists: Option<KafkaMetrics> = None;
185 }
186
187 #[test]
188 fn test_metrics_exporter_available() {
189 // Ensure MetricsExporter is re-exported and accessible
190 let _type_exists: Option<MetricsExporter> = None;
191 }
192
193 #[test]
194 fn test_metrics_snapshot_available() {
195 // Ensure MetricsSnapshot is re-exported and accessible
196 let _type_exists: Option<MetricsSnapshot> = None;
197 }
198
199 #[test]
200 fn test_kafka_message_available() {
201 // Ensure KafkaMessage is re-exported and accessible
202 let _type_exists: Option<KafkaMessage> = None;
203 }
204
205 #[test]
206 fn test_partition_available() {
207 // Ensure Partition is re-exported and accessible
208 let _type_exists: Option<Partition> = None;
209 }
210
211 #[test]
212 fn test_kafka_spec_registry_available() {
213 // Ensure KafkaSpecRegistry is re-exported and accessible
214 let _type_exists: Option<KafkaSpecRegistry> = None;
215 }
216
217 #[test]
218 fn test_topic_available() {
219 // Ensure Topic is re-exported and accessible
220 let _type_exists: Option<Topic> = None;
221 }
222
223 #[test]
224 fn test_topic_config_available() {
225 // Ensure TopicConfig is re-exported and accessible
226 let _type_exists: Option<TopicConfig> = None;
227 }
228
229 // ==================== Basic Functionality Tests ====================
230
231 #[tokio::test]
232 async fn test_broker_creation() {
233 let config = mockforge_core::config::KafkaConfig::default();
234 let broker = KafkaMockBroker::new(config).await;
235 assert!(broker.is_ok());
236 }
237
238 #[test]
239 fn test_metrics_creation() {
240 let metrics = KafkaMetrics::default();
241 let snapshot = metrics.snapshot();
242 assert_eq!(snapshot.messages_produced_total, 0);
243 }
244
245 #[test]
246 fn test_partition_creation() {
247 let partition = Partition::new(0);
248 assert_eq!(partition.id, 0);
249 assert_eq!(partition.high_watermark, 0);
250 }
251
252 #[test]
253 fn test_topic_config_default() {
254 let config = TopicConfig::default();
255 assert!(config.num_partitions > 0);
256 }
257
258 #[test]
259 fn test_consumer_group_manager_creation() {
260 let manager = ConsumerGroupManager::new();
261 assert_eq!(manager.groups().len(), 0);
262 }
263
264 // ==================== Integration Tests ====================
265
266 #[tokio::test]
267 async fn test_end_to_end_message_flow() {
268 let config = mockforge_core::config::KafkaConfig::default();
269 let broker = KafkaMockBroker::new(config).await.unwrap();
270
271 // Create a topic
272 let topic_config = TopicConfig::default();
273 broker.test_create_topic("test-topic", topic_config).await;
274
275 // Check that metrics are initialized
276 let metrics = broker.metrics();
277 let snapshot = metrics.snapshot();
278 assert_eq!(snapshot.messages_produced_total, 0);
279 }
280
281 #[tokio::test]
282 async fn test_consumer_group_workflow() {
283 let config = mockforge_core::config::KafkaConfig::default();
284 let broker = KafkaMockBroker::new(config).await.unwrap();
285
286 // Create a topic
287 let topic_config = TopicConfig::default();
288 broker.test_create_topic("workflow-topic", topic_config).await;
289
290 // Join a consumer group
291 let result = broker.test_join_group("test-group", "member-1", "client-1").await;
292 assert!(result.is_ok());
293 }
294
295 #[tokio::test]
296 async fn test_fixture_to_message_conversion() {
297 use std::collections::HashMap;
298
299 let fixture = KafkaFixture {
300 identifier: "test-id".to_string(),
301 name: "Test".to_string(),
302 topic: "test-topic".to_string(),
303 partition: Some(0),
304 key_pattern: Some("key-test".to_string()),
305 value_template: serde_json::json!({"data": "test"}),
306 headers: HashMap::new(),
307 auto_produce: None,
308 };
309
310 let context = HashMap::new();
311 let message = fixture.generate_message(&context).unwrap();
312
313 assert!(message.key.is_some());
314 assert!(!message.value.is_empty());
315 }
316
317 #[tokio::test]
318 async fn test_spec_registry_creation() {
319 use std::sync::Arc;
320 use tokio::sync::RwLock;
321
322 let topics = Arc::new(RwLock::new(std::collections::HashMap::new()));
323 let config = mockforge_core::config::KafkaConfig::default();
324
325 let registry = KafkaSpecRegistry::new(config, topics).await;
326 assert!(registry.is_ok());
327 }
328
329 // ==================== Protocol Abstraction Tests ====================
330
331 #[tokio::test]
332 async fn test_spec_registry_protocol_trait() {
333 use mockforge_core::protocol_abstraction::SpecRegistry;
334 use std::sync::Arc;
335 use tokio::sync::RwLock;
336
337 let topics = Arc::new(RwLock::new(std::collections::HashMap::new()));
338 let config = mockforge_core::config::KafkaConfig::default();
339
340 let registry = KafkaSpecRegistry::new(config, topics).await.unwrap();
341
342 // Test protocol method
343 assert_eq!(registry.protocol(), mockforge_core::Protocol::Kafka);
344
345 // Test operations method
346 let ops = registry.operations();
347 assert!(ops.is_empty() || ops.len() > 0);
348 }
349
350 // ==================== Metrics Integration Tests ====================
351
352 #[test]
353 fn test_metrics_exporter_creation() {
354 use std::sync::Arc;
355
356 let metrics = Arc::new(KafkaMetrics::default());
357 let exporter = MetricsExporter::new(metrics);
358
359 let prometheus_output = exporter.export_prometheus();
360 assert!(prometheus_output.contains("kafka") || prometheus_output.contains("#"));
361 }
362
363 #[test]
364 fn test_metrics_snapshot_serialization() {
365 let metrics = KafkaMetrics::default();
366 metrics.record_request(0); // Produce
367 metrics.record_response();
368
369 let snapshot = metrics.snapshot();
370 assert_eq!(snapshot.requests_total, 1);
371 assert_eq!(snapshot.responses_total, 1);
372 }
373}