mockforge-kafka 0.2.0

Kafka protocol support for MockForge
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
# MockForge Kafka

Kafka protocol support for MockForge with full broker simulation, topic management, and consumer group coordination.

This crate provides comprehensive Kafka mocking capabilities, allowing you to simulate Apache Kafka brokers for testing event-driven applications. Perfect for testing Kafka producers, consumers, and stream processing applications without requiring a full Kafka cluster.

## Features

- **Full Kafka Protocol**: Support for 10+ Kafka APIs (Produce, Fetch, Metadata, etc.)
- **Broker Simulation**: Complete Kafka broker implementation without external dependencies
- **Topic Management**: Dynamic topic creation, deletion, and configuration
- **Partition Handling**: Multi-partition topics with proper offset management
- **Consumer Groups**: Simulate consumer group coordination and rebalancing
- **Message Fixtures**: YAML-based message templates and auto-production
- **Metrics & Monitoring**: Comprehensive metrics with Prometheus integration
- **Protocol Compliance**: Full Kafka protocol v2.8+ compatibility

## Quick Start

### Basic Kafka Broker

```rust,no_run
use mockforge_kafka::KafkaMockBroker;
use mockforge_core::config::KafkaConfig;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create broker configuration
    let config = KafkaConfig {
        host: "127.0.0.1".to_string(),
        port: 9092,
        ..Default::default()
    };

    // Initialize and start broker
    let broker = KafkaMockBroker::new(config).await?;
    broker.start().await?;

    Ok(())
}
```

### Testing with Kafka Clients

```rust,no_run
use rdkafka::config::ClientConfig;
use rdkafka::producer::{FutureProducer, FutureRecord};
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Connect to MockForge Kafka broker
    let producer: FutureProducer = ClientConfig::new()
        .set("bootstrap.servers", "localhost:9092")
        .set("message.timeout.ms", "5000")
        .create()?;

    // Produce a message
    let delivery_status = producer
        .send(
            FutureRecord::to("test-topic")
                .payload("Hello from MockForge!")
                .key("test-key"),
            Duration::from_secs(0),
        )
        .await;

    match delivery_status {
        Ok((partition, offset)) => {
            println!("Message delivered to partition {} at offset {}", partition, offset);
        }
        Err((e, _)) => println!("Failed to deliver message: {}", e),
    }

    Ok(())
}
```

## Core Components

### KafkaMockBroker

The main broker implementation that handles all Kafka protocol operations:

```rust,no_run
use mockforge_kafka::KafkaMockBroker;
use mockforge_core::config::KafkaConfig;

let config = KafkaConfig {
    host: "0.0.0.0".to_string(),
    port: 9092,
    auto_create_topics: true,
    default_partitions: 3,
    ..Default::default()
};

let broker = KafkaMockBroker::new(config).await?;
broker.start().await?;
```

### Topic Management

Create and manage Kafka topics dynamically:

```rust,no_run
use mockforge_kafka::topics::{Topic, TopicConfig};

// Create a topic with specific configuration
let topic_config = TopicConfig {
    name: "user-events".to_string(),
    partitions: 3,
    replication_factor: 1,
    retention_ms: Some(604800000), // 7 days
};

let topic = Topic::new(topic_config);

// Topics are automatically created when first accessed
// or can be pre-created through the broker API
```

### Message Production

Handle produce requests with full protocol compliance:

```rust,no_run
use mockforge_kafka::partitions::KafkaMessage;

// Create messages for production
let messages = vec![
    KafkaMessage {
        key: Some(b"user-123".to_vec()),
        value: b"{\"action\": \"login\", \"user_id\": 123}".to_vec(),
        timestamp: Some(chrono::Utc::now().timestamp_millis()),
        headers: None,
    },
    KafkaMessage {
        key: Some(b"user-456".to_vec()),
        value: b"{\"action\": \"logout\", \"user_id\": 456}".to_vec(),
        timestamp: Some(chrono::Utc::now().timestamp_millis()),
        headers: None,
    },
];

// Messages are automatically routed to appropriate partitions
// based on key hashing (if key provided) or round-robin
```

### Consumer Groups

Simulate consumer group behavior and coordination:

```rust,no_run
use mockforge_kafka::consumer_groups::{ConsumerGroup, ConsumerGroupManager};

// Create consumer group manager
let group_manager = ConsumerGroupManager::new();

// Consumer groups are automatically managed when consumers join
// Partition assignment follows Kafka's standard algorithms
let group = ConsumerGroup::new(
    "my-consumer-group".to_string(),
    vec!["consumer-1".to_string(), "consumer-2".to_string()],
);

// Group handles partition rebalancing when members join/leave
```

## Fixture System

Define message templates and auto-production rules using YAML:

```yaml
# kafka-fixture.yaml
topics:
  - name: "user-events"
    partitions: 3
    config:
      retention.ms: "604800000"  # 7 days

  - name: "order-events"
    partitions: 2

fixtures:
  - topic: "user-events"
    key_template: "{{uuid}}"
    value_template: |
      {
        "user_id": "{{uuid}}",
        "action": "{{random_element 'login' 'logout' 'signup' 'update_profile'}}",
        "timestamp": "{{now}}",
        "metadata": {
          "source": "web",
          "version": "1.0"
        }
      }
    headers:
      content-type: "application/json"

auto_produce:
  - topic: "user-events"
    rate_per_second: 5
    duration_seconds: 300  # 5 minutes
    key_template: "{{uuid}}"
    value_template: |
      {
        "event_type": "heartbeat",
        "service": "user-service",
        "timestamp": "{{now}}"
      }

  - topic: "order-events"
    rate_per_second: 2
    duration_seconds: 600  # 10 minutes
    key_template: "order-{{sequence}}"
    value_template: |
      {
        "order_id": "{{sequence}}",
        "user_id": "{{uuid}}",
        "amount": {{float_range 10.0 1000.0}},
        "items": {{int_range 1 10}},
        "status": "created",
        "created_at": "{{now}}"
      }
```

### Loading Fixtures

```rust,no_run
use mockforge_kafka::{KafkaMockBroker, KafkaSpecRegistry};

// Create broker with fixture support
let spec_registry = KafkaSpecRegistry::new();
let broker = KafkaMockBroker::with_registry(config, spec_registry).await?;

// Load fixtures from file
broker.load_fixtures_from_file("kafka-fixture.yaml").await?;

// Or create fixtures programmatically
use mockforge_kafka::fixtures::{KafkaFixture, AutoProduceConfig};

let fixture = KafkaFixture {
    topics: vec![/* ... */],
    fixtures: vec![/* ... */],
    auto_produce: vec![/* ... */],
};

broker.add_fixture(fixture).await?;
```

## Supported Kafka APIs

MockForge Kafka implements the following Kafka protocol APIs:

- **Produce (API 0)**: Message production with acknowledgments
- **Fetch (API 1)**: Message consumption with offset management
- **Metadata (API 3)**: Topic and broker metadata discovery
- **ListGroups (API 9)**: Consumer group listing
- **DescribeGroups (API 15)**: Consumer group details and member information
- **ApiVersions (API 18)**: Protocol version negotiation
- **CreateTopics (API 19)**: Dynamic topic creation
- **DeleteTopics (API 20)**: Topic deletion
- **DescribeConfigs (API 32)**: Configuration retrieval

## Metrics & Monitoring

### Prometheus Metrics

Comprehensive metrics exported in Prometheus format:

```rust,no_run
use mockforge_kafka::metrics::MetricsExporter;

// Create metrics exporter
let exporter = MetricsExporter::new();

// Export current metrics
let metrics = exporter.export_prometheus().await?;
println!("{}", metrics);

// Sample metrics:
// kafka_requests_total{api="produce"} 150
// kafka_messages_produced_total{topic="user-events"} 1000
// kafka_consumer_groups_total 5
// kafka_connections_active 12
```

### Metrics Categories

- **Request Metrics**: Total requests, errors, latency by API
- **Message Metrics**: Messages produced/consumed by topic
- **Connection Metrics**: Active connections, connection rate
- **Consumer Group Metrics**: Group count, partition assignments
- **Topic Metrics**: Topic count, partition count, message count

## Configuration

### KafkaConfig

```rust,no_run
use mockforge_core::config::KafkaConfig;

let config = KafkaConfig {
    host: "0.0.0.0".to_string(),
    port: 9092,
    auto_create_topics: true,
    default_partitions: 3,
    default_replication_factor: 1,
    log_retention_hours: 168, // 7 days
    max_message_size: 1048576, // 1MB
    num_threads: 4,
    ..Default::default()
};
```

### Environment Variables

```bash
# Server configuration
export KAFKA_HOST=0.0.0.0
export KAFKA_PORT=9092

# Topic defaults
export KAFKA_AUTO_CREATE_TOPICS=true
export KAFKA_DEFAULT_PARTITIONS=3

# Performance
export KAFKA_MAX_MESSAGE_SIZE=1048576
export KAFKA_NUM_THREADS=4
```

## Testing Examples

### Producer Testing

```rust,no_run
use rdkafka::config::ClientConfig;
use rdkafka::producer::{FutureProducer, FutureRecord};
use std::time::Duration;

#[tokio::test]
async fn test_kafka_producer() {
    // Start MockForge Kafka broker in background
    let broker = KafkaMockBroker::new(KafkaConfig::default()).await.unwrap();
    tokio::spawn(async move { broker.start().await.unwrap() });

    // Give broker time to start
    tokio::time::sleep(Duration::from_millis(100)).await;

    // Test producer
    let producer: FutureProducer = ClientConfig::new()
        .set("bootstrap.servers", "localhost:9092")
        .set("message.timeout.ms", "5000")
        .create()
        .unwrap();

    // Send test message
    let result = producer
        .send(
            FutureRecord::to("test-topic")
                .payload("test message")
                .key("test-key"),
            Duration::from_secs(5),
        )
        .await;

    assert!(result.is_ok());
    let (partition, offset) = result.unwrap();
    assert!(partition >= 0);
    assert!(offset >= 0);
}
```

### Consumer Testing

```rust,no_run
use rdkafka::config::ClientConfig;
use rdkafka::consumer::{Consumer, StreamConsumer};
use rdkafka::Message;
use futures::StreamExt;

#[tokio::test]
async fn test_kafka_consumer() {
    // Start broker and produce test messages
    // ... setup code ...

    // Create consumer
    let consumer: StreamConsumer = ClientConfig::new()
        .set("bootstrap.servers", "localhost:9092")
        .set("group.id", "test-group")
        .set("auto.offset.reset", "earliest")
        .create()
        .unwrap();

    consumer.subscribe(&["test-topic"]).unwrap();

    // Consume messages
    let mut message_stream = consumer.stream();
    let message = message_stream.next().await.unwrap().unwrap();

    let payload = message.payload().unwrap();
    assert_eq!(std::str::from_utf8(payload).unwrap(), "test message");
}
```

### Consumer Group Testing

```rust,no_run
use rdkafka::config::ClientConfig;
use rdkafka::consumer::{Consumer, StreamConsumer};

#[tokio::test]
async fn test_consumer_groups() {
    // Start broker
    // ... setup code ...

    // Create multiple consumers in same group
    let mut consumers = vec![];

    for i in 0..3 {
        let consumer: StreamConsumer = ClientConfig::new()
            .set("bootstrap.servers", "localhost:9092")
            .set("group.id", "test-group")
            .set("client.id", &format!("consumer-{}", i))
            .create()
            .unwrap();

        consumer.subscribe(&["test-topic"]).unwrap();
        consumers.push(consumer);
    }

    // Verify partition assignment
    // Consumers should automatically balance partitions
    for consumer in consumers {
        let assignment = consumer.assignment().unwrap();
        assert!(!assignment.is_empty());
    }
}
```

## Performance

MockForge Kafka is optimized for testing scenarios:

- **In-Memory Storage**: Fast message operations without disk persistence
- **Concurrent Connections**: Handle multiple simultaneous Kafka clients
- **Low Latency**: Minimal overhead for message operations
- **Scalable**: Support for high-throughput testing scenarios
- **Resource Efficient**: Configurable memory limits and cleanup

## Integration with MockForge

MockForge Kafka integrates seamlessly with the MockForge ecosystem:

- **MockForge Core**: Shared configuration and logging
- **MockForge CLI**: Command-line Kafka broker management
- **MockForge Data**: Enhanced message generation with templates
- **MockForge Observability**: Metrics and tracing integration

## Troubleshooting

### Common Issues

**Connection refused:**
- Ensure broker is started and listening on correct port
- Check firewall settings and port availability
- Verify client configuration (bootstrap servers)

**Messages not consumed:**
- Check consumer group configuration
- Verify topic exists (auto-create may be disabled)
- Check offset reset policy (earliest/latest)

**High latency:**
- Adjust broker thread count for better concurrency
- Check system resources (CPU, memory)
- Review message size and batch settings

**Protocol errors:**
- Ensure client and broker use compatible Kafka versions
- Check message format and serialization
- Verify topic and partition configurations

## Examples

See the [examples directory](https://github.com/SaaSy-Solutions/mockforge/tree/main/examples) for complete working examples including:

- Basic Kafka broker setup
- Producer/consumer testing patterns
- Consumer group coordination
- Fixture-driven message generation
- Load testing scenarios

## Related Crates

- [`mockforge-core`]https://docs.rs/mockforge-core: Core mocking functionality
- [`rdkafka`]https://docs.rs/rdkafka: Kafka client library for testing

## License

Licensed under MIT OR Apache-2.0