mockforge_kafka/
lib.rs

1//! # MockForge Kafka
2//!
3//! Kafka protocol support for MockForge.
4//!
5//! This crate provides Kafka-specific functionality for creating mock Kafka brokers,
6//! including topic management, partition handling, consumer groups, and fixture-driven message generation.
7//!
8//! ## Overview
9//!
10//! MockForge Kafka enables you to:
11//!
12//! - **Mock Kafka brokers**: Simulate Apache Kafka brokers for testing
13//! - **Topic and partition management**: Create and manage topics with configurable partitions
14//! - **Producer/consumer simulation**: Handle produce and fetch requests
15//! - **Consumer group coordination**: Simulate consumer group behavior and rebalancing
16//! - **Fixture-based messaging**: Generate messages using templates and patterns
17//! - **Auto-produce functionality**: Automatically generate messages at specified rates
18//!
19//! ## Quick Start
20//!
21//! ### Basic Kafka Broker
22//!
23//! ```rust,no_run
24//! use mockforge_kafka::KafkaMockBroker;
25//! use mockforge_core::config::KafkaConfig;
26//!
27//! #[tokio::main]
28//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
29//!     let config = KafkaConfig::default();
30//!     let broker = KafkaMockBroker::new(config).await?;
31//!
32//!     broker.start().await?;
33//!     Ok(())
34//! }
35//! ```
36//!
37//! ## Key Features
38//!
39//! ### Broker Simulation
40//! - **10+ Kafka APIs supported**: Produce, Fetch, Metadata, ListGroups, DescribeGroups, ApiVersions, CreateTopics, DeleteTopics, DescribeConfigs
41//! - **Protocol-compliant responses**: Full Kafka protocol implementation without external dependencies
42//! - **Connection handling**: TCP-based broker connections with proper error handling
43//! - **Topic and partition management**: Dynamic topic/partition creation and management
44//!
45//! ### Metrics and Monitoring
46//! - **Comprehensive metrics**: Request counts, error rates, connection tracking
47//! - **Prometheus integration**: Export metrics in Prometheus format
48//! - **Real-time monitoring**: Live metrics collection during broker operation
49//!
50//! ### Fixture System
51//! - **YAML-based fixtures**: Define message templates and auto-production rules
52//! - **Template engine integration**: Use MockForge's templating system for dynamic content
53//! - **Auto-produce functionality**: Automatically generate messages at specified rates
54//! - **Key and value templating**: Flexible message generation with context variables
55//!
56//! ### Consumer Group Management
57//! - **Group coordination**: Simulate consumer group joins and synchronization
58//! - **Partition assignment**: Automatic partition distribution among consumers
59//! - **Offset management**: Track and manage consumer offsets
60//! - **Rebalancing simulation**: Test consumer group rebalancing scenarios
61//!
62//! ### Testing Features
63//! - **Protocol validation**: Ensure requests conform to Kafka protocol specifications
64//! - **Error simulation**: Configurable error responses for testing error handling
65//! - **Performance testing**: Built-in benchmarking support
66//! - **Integration testing**: Compatible with standard Kafka client libraries
67//!
68//! ## Supported Kafka APIs
69//!
70//! - **Produce** (API Key 0): Message production with acknowledgments
71//! - **Fetch** (API Key 1): Message consumption with offset management
72//! - **Metadata** (API Key 3): Topic and broker metadata discovery
73//! - **ListGroups** (API Key 9): Consumer group listing
74//! - **DescribeGroups** (API Key 15): Consumer group details and member information
75//! - **ApiVersions** (API Key 18): Protocol version negotiation
76//! - **CreateTopics** (API Key 19): Dynamic topic creation
77//! - **DeleteTopics** (API Key 20): Topic deletion
78//! - **DescribeConfigs** (API Key 32): Configuration retrieval
79//!
80//! ## Example Usage
81//!
82//! ### Basic Broker Setup
83//!
84//! ```rust,no_run
85//! use mockforge_kafka::KafkaMockBroker;
86//! use mockforge_core::config::KafkaConfig;
87//!
88//! #[tokio::main]
89//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
90//!     let config = KafkaConfig {
91//!         port: 9092,
92//!         ..Default::default()
93//!     };
94//!
95//!     let broker = KafkaMockBroker::new(config).await?;
96//!     broker.start().await?;
97//!
98//!     Ok(())
99//! }
100//! ```
101//!
102//! ### Metrics Export
103//!
104//! ```rust,no_run
105//! use mockforge_kafka::MetricsExporter;
106//! use mockforge_kafka::KafkaMetrics;
107//! use std::sync::Arc;
108//!
109//! #[tokio::main]
110//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
111//!     let metrics = Arc::new(KafkaMetrics::default());
112//!     let exporter = MetricsExporter::new(metrics.clone());
113//!
114//!     // Export metrics in Prometheus format
115//!     let snapshot = exporter.export_prometheus();
116//!     println!("{}", snapshot);
117//!
118//!     Ok(())
119//! }
120//! ```
121//!
122//! ## Related Crates
123//!
124//! - [`mockforge-core`](https://docs.rs/mockforge-core): Core mocking functionality and configuration
125//! - [`rdkafka`](https://docs.rs/rdkafka): Kafka client library for testing integration
126
127pub mod broker;
128pub mod consumer_groups;
129pub mod fixtures;
130pub mod metrics;
131pub mod partitions;
132pub mod protocol;
133pub mod spec_registry;
134pub mod topics;
135
136// Re-export main types
137pub use broker::KafkaMockBroker;
138pub use consumer_groups::{ConsumerGroup, ConsumerGroupManager};
139pub use fixtures::{AutoProduceConfig, KafkaFixture};
140pub use metrics::{KafkaMetrics, MetricsExporter, MetricsSnapshot};
141pub use partitions::{KafkaMessage, Partition};
142pub use spec_registry::KafkaSpecRegistry;
143pub use topics::{Topic, TopicConfig};
144
145#[cfg(test)]
146mod tests {
147    use super::*;
148
149    // ==================== Module Re-export Tests ====================
150
151    #[test]
152    fn test_kafka_mock_broker_available() {
153        // Ensure KafkaMockBroker is re-exported and accessible
154        let _type_exists: Option<KafkaMockBroker> = None;
155    }
156
157    #[test]
158    fn test_consumer_group_available() {
159        // Ensure ConsumerGroup is re-exported and accessible
160        let _type_exists: Option<ConsumerGroup> = None;
161    }
162
163    #[test]
164    fn test_consumer_group_manager_available() {
165        // Ensure ConsumerGroupManager is re-exported and accessible
166        let _type_exists: Option<ConsumerGroupManager> = None;
167    }
168
169    #[test]
170    fn test_kafka_fixture_available() {
171        // Ensure KafkaFixture is re-exported and accessible
172        let _type_exists: Option<KafkaFixture> = None;
173    }
174
175    #[test]
176    fn test_auto_produce_config_available() {
177        // Ensure AutoProduceConfig is re-exported and accessible
178        let _type_exists: Option<AutoProduceConfig> = None;
179    }
180
181    #[test]
182    fn test_kafka_metrics_available() {
183        // Ensure KafkaMetrics is re-exported and accessible
184        let _type_exists: Option<KafkaMetrics> = None;
185    }
186
187    #[test]
188    fn test_metrics_exporter_available() {
189        // Ensure MetricsExporter is re-exported and accessible
190        let _type_exists: Option<MetricsExporter> = None;
191    }
192
193    #[test]
194    fn test_metrics_snapshot_available() {
195        // Ensure MetricsSnapshot is re-exported and accessible
196        let _type_exists: Option<MetricsSnapshot> = None;
197    }
198
199    #[test]
200    fn test_kafka_message_available() {
201        // Ensure KafkaMessage is re-exported and accessible
202        let _type_exists: Option<KafkaMessage> = None;
203    }
204
205    #[test]
206    fn test_partition_available() {
207        // Ensure Partition is re-exported and accessible
208        let _type_exists: Option<Partition> = None;
209    }
210
211    #[test]
212    fn test_kafka_spec_registry_available() {
213        // Ensure KafkaSpecRegistry is re-exported and accessible
214        let _type_exists: Option<KafkaSpecRegistry> = None;
215    }
216
217    #[test]
218    fn test_topic_available() {
219        // Ensure Topic is re-exported and accessible
220        let _type_exists: Option<Topic> = None;
221    }
222
223    #[test]
224    fn test_topic_config_available() {
225        // Ensure TopicConfig is re-exported and accessible
226        let _type_exists: Option<TopicConfig> = None;
227    }
228
229    // ==================== Basic Functionality Tests ====================
230
231    #[tokio::test]
232    async fn test_broker_creation() {
233        let config = mockforge_core::config::KafkaConfig::default();
234        let broker = KafkaMockBroker::new(config).await;
235        assert!(broker.is_ok());
236    }
237
238    #[test]
239    fn test_metrics_creation() {
240        let metrics = KafkaMetrics::default();
241        let snapshot = metrics.snapshot();
242        assert_eq!(snapshot.messages_produced_total, 0);
243    }
244
245    #[test]
246    fn test_partition_creation() {
247        let partition = Partition::new(0);
248        assert_eq!(partition.id, 0);
249        assert_eq!(partition.high_watermark, 0);
250    }
251
252    #[test]
253    fn test_topic_config_default() {
254        let config = TopicConfig::default();
255        assert!(config.num_partitions > 0);
256    }
257
258    #[test]
259    fn test_consumer_group_manager_creation() {
260        let manager = ConsumerGroupManager::new();
261        assert_eq!(manager.groups().len(), 0);
262    }
263
264    // ==================== Integration Tests ====================
265
266    #[tokio::test]
267    async fn test_end_to_end_message_flow() {
268        let config = mockforge_core::config::KafkaConfig::default();
269        let broker = KafkaMockBroker::new(config).await.unwrap();
270
271        // Create a topic
272        let topic_config = TopicConfig::default();
273        broker.test_create_topic("test-topic", topic_config).await;
274
275        // Check that metrics are initialized
276        let metrics = broker.metrics();
277        let snapshot = metrics.snapshot();
278        assert_eq!(snapshot.messages_produced_total, 0);
279    }
280
281    #[tokio::test]
282    async fn test_consumer_group_workflow() {
283        let config = mockforge_core::config::KafkaConfig::default();
284        let broker = KafkaMockBroker::new(config).await.unwrap();
285
286        // Create a topic
287        let topic_config = TopicConfig::default();
288        broker.test_create_topic("workflow-topic", topic_config).await;
289
290        // Join a consumer group
291        let result = broker.test_join_group("test-group", "member-1", "client-1").await;
292        assert!(result.is_ok());
293    }
294
295    #[tokio::test]
296    async fn test_fixture_to_message_conversion() {
297        use std::collections::HashMap;
298
299        let fixture = KafkaFixture {
300            identifier: "test-id".to_string(),
301            name: "Test".to_string(),
302            topic: "test-topic".to_string(),
303            partition: Some(0),
304            key_pattern: Some("key-test".to_string()),
305            value_template: serde_json::json!({"data": "test"}),
306            headers: HashMap::new(),
307            auto_produce: None,
308        };
309
310        let context = HashMap::new();
311        let message = fixture.generate_message(&context).unwrap();
312
313        assert!(message.key.is_some());
314        assert!(!message.value.is_empty());
315    }
316
317    #[tokio::test]
318    async fn test_spec_registry_creation() {
319        use std::sync::Arc;
320        use tokio::sync::RwLock;
321
322        let topics = Arc::new(RwLock::new(std::collections::HashMap::new()));
323        let config = mockforge_core::config::KafkaConfig::default();
324
325        let registry = KafkaSpecRegistry::new(config, topics).await;
326        assert!(registry.is_ok());
327    }
328
329    // ==================== Protocol Abstraction Tests ====================
330
331    #[tokio::test]
332    async fn test_spec_registry_protocol_trait() {
333        use mockforge_core::protocol_abstraction::SpecRegistry;
334        use std::sync::Arc;
335        use tokio::sync::RwLock;
336
337        let topics = Arc::new(RwLock::new(std::collections::HashMap::new()));
338        let config = mockforge_core::config::KafkaConfig::default();
339
340        let registry = KafkaSpecRegistry::new(config, topics).await.unwrap();
341
342        // Test protocol method
343        assert_eq!(registry.protocol(), mockforge_core::Protocol::Kafka);
344
345        // Test operations method
346        let ops = registry.operations();
347        assert!(ops.is_empty() || ops.len() > 0);
348    }
349
350    // ==================== Metrics Integration Tests ====================
351
352    #[test]
353    fn test_metrics_exporter_creation() {
354        use std::sync::Arc;
355
356        let metrics = Arc::new(KafkaMetrics::default());
357        let exporter = MetricsExporter::new(metrics);
358
359        let prometheus_output = exporter.export_prometheus();
360        assert!(prometheus_output.contains("kafka") || prometheus_output.contains("#"));
361    }
362
363    #[test]
364    fn test_metrics_snapshot_serialization() {
365        let metrics = KafkaMetrics::default();
366        metrics.record_request(0); // Produce
367        metrics.record_response();
368
369        let snapshot = metrics.snapshot();
370        assert_eq!(snapshot.requests_total, 1);
371        assert_eq!(snapshot.responses_total, 1);
372    }
373}