use mockforge_core::config::KafkaConfig;
use mockforge_kafka::consumer_groups::PartitionAssignment;
use mockforge_kafka::fixtures::KafkaFixture;
use mockforge_kafka::topics::{Topic, TopicConfig};
use mockforge_kafka::KafkaMockBroker;
use mockforge_kafka::KafkaSpecRegistry;
use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication};
use rdkafka::client::DefaultClientContext;
use rdkafka::config::ClientConfig;
use rdkafka::consumer::{Consumer, StreamConsumer};
use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::Message;
use std::collections::HashMap;
use std::fs::File;
use std::io::Write;
use std::sync::Arc;
use std::time::Duration;
use tempfile::TempDir;
use tokio::sync::RwLock;
use tokio::time::timeout;
#[tokio::test]
async fn test_broker_creation() {
let config = KafkaConfig::default();
let broker = KafkaMockBroker::new(config).await;
assert!(broker.is_ok());
}
#[tokio::test]
async fn test_spec_registry_creation() {
let config = KafkaConfig::default();
let topics = Arc::new(RwLock::new(HashMap::new()));
let registry = KafkaSpecRegistry::new(config, topics).await;
assert!(registry.is_ok());
}
#[tokio::test]
async fn test_topic_creation() {
let config = KafkaConfig::default();
let broker = KafkaMockBroker::new(config).await.unwrap();
let topic_name = "test-topic";
let topic_config = TopicConfig::default();
let topic = Topic::new(topic_name.to_string(), topic_config);
{
let mut topics = broker.topics.write().await;
topics.insert(topic_name.to_string(), topic);
}
{
let topics = broker.topics.read().await;
assert!(topics.contains_key(topic_name));
let created_topic = topics.get(topic_name).unwrap();
assert_eq!(created_topic.name, topic_name);
assert_eq!(created_topic.config.num_partitions, 3); }
}
#[tokio::test]
async fn test_fixture_loading() {
let temp_dir = TempDir::new().unwrap();
let yaml_content = r#"
- identifier: "test-fixture-1"
name: "Test Fixture 1"
topic: "test-topic"
partition: 0
key_pattern: "key-{{id}}"
value_template: {"message": "Hello {{name}}", "id": "{{id}}"}
headers: {"content-type": "application/json"}
auto_produce:
enabled: false
rate_per_second: 1
- identifier: "test-fixture-2"
name: "Test Fixture 2"
topic: "test-topic"
value_template: {"message": "World"}
headers: {}
"#;
let yaml_path = temp_dir.path().join("fixtures.yaml");
let mut file = File::create(&yaml_path).unwrap();
file.write_all(yaml_content.as_bytes()).unwrap();
let fixtures = KafkaFixture::load_from_dir(&temp_dir.path().to_path_buf()).unwrap();
assert_eq!(fixtures.len(), 2);
assert_eq!(fixtures[0].identifier, "test-fixture-1");
assert_eq!(fixtures[0].topic, "test-topic");
assert_eq!(fixtures[1].identifier, "test-fixture-2");
}
#[tokio::test]
async fn test_message_generation() {
let fixture = KafkaFixture {
identifier: "test-fixture".to_string(),
name: "Test Fixture".to_string(),
topic: "test-topic".to_string(),
partition: Some(0),
key_pattern: Some("key-{{id}}".to_string()),
value_template: serde_json::json!({"message": "Hello {{name}}", "id": "{{id}}"}),
headers: [("content-type".to_string(), "application/json".to_string())].into(),
auto_produce: None,
};
let mut context = HashMap::new();
context.insert("id".to_string(), "123".to_string());
context.insert("name".to_string(), "World".to_string());
let message = fixture.generate_message(&context).unwrap();
assert_eq!(message.key, Some(b"key-123".to_vec()));
let value: serde_json::Value = serde_json::from_slice(&message.value).unwrap();
assert_eq!(value["message"], "Hello World");
assert_eq!(value["id"], "123");
assert_eq!(message.headers.len(), 1);
assert_eq!(message.headers[0], ("content-type".to_string(), b"application/json".to_vec()));
}
#[tokio::test]
async fn test_consumer_group_operations() {
let config = KafkaConfig::default();
let broker = KafkaMockBroker::new(config).await.unwrap();
let topic_config = TopicConfig {
num_partitions: 2,
..Default::default()
};
broker.test_create_topic("test-topic", topic_config).await;
broker.test_join_group("test-group", "member-1", "client-1").await.unwrap();
broker.test_sync_group("test-group", vec![]).await.unwrap();
let assignments = broker.test_get_assignments("test-group", "member-1").await;
assert_eq!(assignments.len(), 1);
assert_eq!(assignments[0].topic, "test-topic");
assert_eq!(assignments[0].partitions.len(), 2);
broker.test_join_group("test-group", "member-2", "client-2").await.unwrap();
broker.test_sync_group("test-group", vec![]).await.unwrap();
let assignments1 = broker.test_get_assignments("test-group", "member-1").await;
let assignments2 = broker.test_get_assignments("test-group", "member-2").await;
assert_eq!(assignments1.len(), 1);
assert_eq!(assignments2.len(), 1);
assert_eq!(assignments1[0].partitions.len(), 1);
assert_eq!(assignments2[0].partitions.len(), 1);
let all_partitions: std::collections::HashSet<i32> = assignments1[0]
.partitions
.iter()
.chain(&assignments2[0].partitions)
.cloned()
.collect();
assert_eq!(all_partitions, std::collections::HashSet::from([0, 1]));
}
#[tokio::test]
async fn test_partition_assignment() {
let config = KafkaConfig::default();
let broker = KafkaMockBroker::new(config).await.unwrap();
let topic_config = TopicConfig::default();
broker.test_create_topic("test-topic", topic_config).await;
broker.test_join_group("test-group", "member-1", "client-1").await.unwrap();
broker.test_join_group("test-group", "member-2", "client-2").await.unwrap();
broker.test_join_group("test-group", "member-3", "client-3").await.unwrap();
broker.test_sync_group("test-group", vec![]).await.unwrap();
let assignments_member1 = broker.test_get_assignments("test-group", "member-1").await;
let assignments_member2 = broker.test_get_assignments("test-group", "member-2").await;
let assignments_member3 = broker.test_get_assignments("test-group", "member-3").await;
assert_eq!(assignments_member1.len(), 1);
assert_eq!(assignments_member2.len(), 1);
assert_eq!(assignments_member3.len(), 1);
let all_assigned_partitions: std::collections::HashSet<i32> = assignments_member1
.iter()
.chain(&assignments_member2)
.chain(&assignments_member3)
.flat_map(|a| &a.partitions)
.cloned()
.collect();
assert_eq!(all_assigned_partitions, std::collections::HashSet::from([0, 1, 2]));
let custom_assignments = vec![PartitionAssignment {
topic: "test-topic".to_string(),
partitions: vec![0, 1],
}];
broker.test_sync_group("test-group", custom_assignments).await.unwrap();
let new_assignments = broker.test_get_assignments("test-group", "member-1").await;
assert!(new_assignments
.iter()
.any(|a| a.topic == "test-topic" && a.partitions.contains(&0)));
assert!(new_assignments
.iter()
.any(|a| a.topic == "test-topic" && a.partitions.contains(&1)));
}
#[tokio::test]
async fn test_offset_management() {
let config = KafkaConfig::default();
let broker = KafkaMockBroker::new(config).await.unwrap();
let topic_config = TopicConfig::default();
broker.test_create_topic("test-topic", topic_config).await;
broker.test_join_group("test-group", "member-1", "client-1").await.unwrap();
let mut offsets = HashMap::new();
offsets.insert(("test-topic".to_string(), 0), 100);
offsets.insert(("test-topic".to_string(), 1), 200);
offsets.insert(("test-topic".to_string(), 2), 300);
broker.test_commit_offsets("test-group", offsets.clone()).await.unwrap();
let committed_offsets = broker.test_get_committed_offsets("test-group").await;
assert_eq!(committed_offsets.len(), 3);
assert_eq!(committed_offsets.get(&("test-topic".to_string(), 0)), Some(&100));
assert_eq!(committed_offsets.get(&("test-topic".to_string(), 1)), Some(&200));
assert_eq!(committed_offsets.get(&("test-topic".to_string(), 2)), Some(&300));
let empty_offsets = broker.test_get_committed_offsets("non-existent-group").await;
assert!(empty_offsets.is_empty());
}
#[tokio::test]
async fn test_broker_startup_timeout() {
let config = KafkaConfig {
port: 9093, ..Default::default()
};
let broker = KafkaMockBroker::new(config).await.unwrap();
let start_result = timeout(Duration::from_secs(1), broker.start()).await;
assert!(start_result.is_err());
}
#[tokio::test]
async fn test_full_broker_integration() {
let config = KafkaConfig {
port: 9094, ..Default::default()
};
let broker = KafkaMockBroker::new(config.clone()).await.unwrap();
let broker_handle = tokio::spawn(async move {
broker.start().await.unwrap();
});
tokio::time::sleep(Duration::from_millis(100)).await;
let mut client_config = ClientConfig::new();
client_config.set("bootstrap.servers", format!("127.0.0.1:{}", config.port));
client_config.set("group.id", "test-group");
client_config.set("auto.offset.reset", "earliest");
client_config.set("enable.auto.commit", "false");
let admin_client: AdminClient<DefaultClientContext> = client_config.clone().create().unwrap();
let topics = vec![NewTopic {
name: "test-topic",
num_partitions: 3,
replication: TopicReplication::Fixed(1),
config: vec![],
}];
let options = AdminOptions::new().request_timeout(Some(Duration::from_secs(5)));
let result = admin_client.create_topics(&topics, &options).await;
match result {
Ok(_) => println!("Topic creation succeeded"),
Err(e) => println!("Topic creation failed as expected: {:?}", e),
}
let producer: FutureProducer = client_config.clone().create().unwrap();
let record = FutureRecord::to("test-topic").payload("test message").key("test-key");
let produce_result = producer.send(record, Duration::from_secs(5)).await;
match produce_result {
Ok(_) => println!("Message production succeeded"),
Err((e, _)) => println!("Message production failed: {:?}", e),
}
let consumer: StreamConsumer = client_config.create().unwrap();
consumer.subscribe(&["test-topic"]).unwrap();
let consume_result = timeout(Duration::from_secs(2), consumer.recv()).await;
match consume_result {
Ok(Ok(message)) => {
println!("Message consumed successfully");
let payload = message.payload_view::<str>().unwrap().unwrap();
assert_eq!(payload, "test message");
}
Ok(Err(e)) => println!("Consume error: {:?}", e),
Err(_) => println!("Consume timeout - no messages available"),
}
consumer.unsubscribe();
drop(producer);
drop(admin_client);
broker_handle.abort();
}
#[tokio::test]
async fn test_protocol_operations() {
let config = KafkaConfig {
port: 9095, ..Default::default()
};
let broker = KafkaMockBroker::new(config.clone()).await.unwrap();
let broker_handle = tokio::spawn(async move {
broker.start().await.unwrap();
});
tokio::time::sleep(Duration::from_millis(100)).await;
let mut client_config = ClientConfig::new();
client_config.set("bootstrap.servers", format!("127.0.0.1:{}", config.port));
let admin_client: AdminClient<DefaultClientContext> = client_config.create().unwrap();
let timeout_duration = Duration::from_secs(5);
let metadata = admin_client.inner().fetch_metadata(None, timeout_duration);
match metadata {
Ok(metadata) => {
println!("Metadata fetch succeeded, brokers: {}", metadata.brokers().len());
assert!(!metadata.brokers().is_empty());
}
Err(e) => println!("Metadata fetch failed: {:?}", e),
}
drop(admin_client);
broker_handle.abort();
}