use std::time::Duration;
use rdkafka::config::ClientConfig;
use rdkafka::consumer::{Consumer, StreamConsumer};
use rdkafka::error::KafkaError;
use rdkafka::topic_partition_list::TopicPartitionList;
use rdkafka_sys::types::RDKafkaConfRes;
use crate::utils::*;
mod utils;
fn create_consumer(group_id: &str) -> StreamConsumer {
ClientConfig::new()
.set("group.id", group_id)
.set("enable.partition.eof", "true")
.set("client.id", "rdkafka_integration_test_client")
.set("bootstrap.servers", get_bootstrap_server().as_str())
.set("session.timeout.ms", "6000")
.set("debug", "all")
.set("auto.offset.reset", "earliest")
.create()
.expect("Failed to create StreamConsumer")
}
#[tokio::test]
async fn test_metadata() {
let _r = env_logger::try_init();
let topic_name = rand_test_topic("test_metadata");
populate_topic(&topic_name, 1, &value_fn, &key_fn, Some(0), None).await;
populate_topic(&topic_name, 1, &value_fn, &key_fn, Some(1), None).await;
populate_topic(&topic_name, 1, &value_fn, &key_fn, Some(2), None).await;
let consumer = create_consumer(&rand_test_group());
let metadata = consumer
.fetch_metadata(None, Duration::from_secs(5))
.unwrap();
let orig_broker_id = metadata.orig_broker_id();
if orig_broker_id != -1 && orig_broker_id != 0 {
panic!(
"metadata.orig_broker_id = {}, not 0 or 1 as expected",
orig_broker_id
)
}
assert!(!metadata.orig_broker_name().is_empty());
let broker_metadata = metadata.brokers();
assert_eq!(broker_metadata.len(), 1);
assert_eq!(broker_metadata[0].id(), 0);
assert!(!broker_metadata[0].host().is_empty());
assert_eq!(broker_metadata[0].port(), 9092);
let topic_metadata = metadata
.topics()
.iter()
.find(|m| m.name() == topic_name)
.unwrap();
let mut ids = topic_metadata
.partitions()
.iter()
.map(|p| {
assert_eq!(p.error(), None);
p.id()
})
.collect::<Vec<_>>();
ids.sort();
assert_eq!(ids, vec![0, 1, 2]);
assert_eq!(topic_metadata.error(), None);
assert_eq!(topic_metadata.partitions().len(), 3);
assert_eq!(topic_metadata.partitions()[0].leader(), 0);
assert_eq!(topic_metadata.partitions()[1].leader(), 0);
assert_eq!(topic_metadata.partitions()[2].leader(), 0);
assert_eq!(topic_metadata.partitions()[0].replicas(), &[0]);
assert_eq!(topic_metadata.partitions()[0].isr(), &[0]);
let metadata_one_topic = consumer
.fetch_metadata(Some(&topic_name), Duration::from_secs(5))
.unwrap();
assert_eq!(metadata_one_topic.topics().len(), 1);
}
#[tokio::test]
async fn test_subscription() {
let _r = env_logger::try_init();
let topic_name = rand_test_topic("test_subscription");
populate_topic(&topic_name, 10, &value_fn, &key_fn, None, None).await;
let consumer = create_consumer(&rand_test_group());
consumer.subscribe(&[topic_name.as_str()]).unwrap();
let _consumer_future = consumer.recv().await;
let mut tpl = TopicPartitionList::new();
tpl.add_topic_unassigned(&topic_name);
assert_eq!(tpl, consumer.subscription().unwrap());
}
#[tokio::test]
async fn test_group_membership() {
let _r = env_logger::try_init();
let topic_name = rand_test_topic("test_group_membership");
let group_name = rand_test_group();
populate_topic(&topic_name, 1, &value_fn, &key_fn, Some(0), None).await;
populate_topic(&topic_name, 1, &value_fn, &key_fn, Some(1), None).await;
populate_topic(&topic_name, 1, &value_fn, &key_fn, Some(2), None).await;
let consumer = create_consumer(&group_name);
consumer.subscribe(&[topic_name.as_str()]).unwrap();
let _consumer_future = consumer.recv().await;
let group_list = consumer
.fetch_group_list(None, Duration::from_secs(5))
.unwrap();
for group in group_list.groups().iter() {
println!(
"{} {} {} {}",
group.name(),
group.state(),
group.protocol(),
group.protocol_type()
);
for member in group.members() {
println!(
" {} {} {}",
member.id(),
member.client_id(),
member.client_host()
);
}
}
let group_list2 = consumer
.fetch_group_list(Some(&group_name), Duration::from_secs(5))
.unwrap();
assert_eq!(group_list2.groups().len(), 1);
let consumer_group = group_list2
.groups()
.iter()
.find(|&g| g.name() == group_name)
.unwrap();
assert_eq!(consumer_group.members().len(), 1);
let consumer_member = &consumer_group.members()[0];
assert_eq!(
consumer_member.client_id(),
"rdkafka_integration_test_client"
);
}
#[tokio::test]
async fn test_client_config() {
let config = ClientConfig::new().create_native_config().unwrap();
assert_eq!(config.get("session.timeout.ms").unwrap(), "45000");
let config = ClientConfig::new()
.set("session.timeout.ms", "42")
.create_native_config()
.unwrap();
assert_eq!(config.get("session.timeout.ms").unwrap(), "42");
assert_eq!(
config.get("noexist"),
Err(KafkaError::ClientConfig(
RDKafkaConfRes::RD_KAFKA_CONF_UNKNOWN,
"Unknown configuration name".into(),
"noexist".into(),
"".into(),
))
);
}