#![allow(unused_imports)]
mod common;
use std::sync::Arc;
use std::time::{Duration, Instant};
use anyhow::{Result, anyhow, bail};
use bytes::Bytes;
use kafkit_client::{
AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation,
AclPermissionType, AdminConfig, AlterConfigOp, AutoOffsetReset, ConfigResource, ConsumerConfig,
ConsumerError, Error, IsolationLevel, KafkaAdmin, KafkaClient, KafkaConsumer, KafkaMessage,
KafkaProducer, NewPartitions, NewTopic, PatternType, ProduceRecord, ProducerCompression,
ProducerConfig, RecordHeader, ResourcePattern, ResourcePatternFilter, ResourceType,
SubscriptionPattern, TopicPartition, TopicPartitionTimestamp,
};
use crate::common::{
KafkaHarness, collect_values, expected_assignment, poll_describe_until, poll_until,
poll_until_assignment, poll_until_consumer_error, poll_until_with_admin, poll_until_with_api,
unique_group, unique_topic,
};
#[tokio::test(flavor = "multi_thread")]
async fn admin_client_creates_lists_describes_and_deletes_topics() -> Result<()> {
let kafka = KafkaHarness::start().await?;
let topic = unique_topic("admin-topic");
let admin = KafkaAdmin::connect(
AdminConfig::new(kafka.bootstrap_server()).with_client_id("integration-admin"),
)
.await?;
admin
.create_topics([NewTopic::new(topic.clone(), 2, 1)])
.await?;
let listings = poll_until_with_admin(
&admin,
|topics| topics.iter().any(|listed| listed.name == topic),
Duration::from_secs(20),
)
.await?;
assert!(listings.iter().any(|listed| listed.name == topic));
let descriptions = poll_describe_until(
&admin,
std::slice::from_ref(&topic),
|topics| topics.iter().any(|described| described.name == topic),
Duration::from_secs(20),
)
.await?;
let description = descriptions
.iter()
.find(|described| described.name == topic)
.ok_or_else(|| anyhow!("expected topic description for '{topic}'"))?;
assert_eq!(description.partitions.len(), 2);
assert!(
description
.partitions
.iter()
.all(|partition| partition.leader_id >= 0)
);
admin.delete_topics([topic.clone()]).await?;
let deadline = Instant::now() + Duration::from_secs(20);
loop {
let topics = admin.list_topics().await?;
if !topics.iter().any(|listed| listed.name == topic) {
break;
}
if Instant::now() >= deadline {
bail!("timed out waiting for topic '{topic}' deletion");
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn admin_client_create_topics_treats_existing_topics_as_success() -> Result<()> {
let kafka = KafkaHarness::start().await?;
let topic = unique_topic("admin-topic-exists");
let admin = KafkaAdmin::connect(AdminConfig::new(kafka.bootstrap_server())).await?;
admin
.create_topics([NewTopic::new(topic.clone(), 1, 1)])
.await?;
admin
.create_topics([NewTopic::new(topic.clone(), 1, 1)])
.await?;
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn admin_client_describes_cluster() -> Result<()> {
let kafka = KafkaHarness::start().await?;
let admin = KafkaAdmin::connect(AdminConfig::new(kafka.bootstrap_server())).await?;
let cluster = admin.describe_cluster().await?;
assert!(!cluster.cluster_id.is_empty());
assert!(!cluster.brokers.is_empty());
assert!(
cluster
.brokers
.iter()
.any(|broker| broker.broker_id == cluster.controller_id)
);
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn admin_client_describes_broker_log_dirs() -> Result<()> {
let kafka = KafkaHarness::start().await?;
let admin = KafkaAdmin::connect(AdminConfig::new(kafka.bootstrap_server())).await?;
let cluster = admin.describe_cluster().await?;
let log_dirs = admin
.describe_log_dirs(cluster.brokers.clone(), None)
.await?;
assert_eq!(log_dirs.len(), cluster.brokers.len());
for broker in &cluster.brokers {
let described = log_dirs
.iter()
.find(|log_dir| log_dir.broker_id == broker.broker_id)
.ok_or_else(|| anyhow!("missing log dirs for broker {}", broker.broker_id))?;
assert!(!described.log_dirs.is_empty());
}
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn admin_client_lists_finalized_feature_levels() -> Result<()> {
let kafka = KafkaHarness::start().await?;
let admin = KafkaAdmin::connect(AdminConfig::new(kafka.bootstrap_server())).await?;
let features = admin.finalized_feature_levels().await?;
assert!(
features.iter().any(|feature| !feature.name.is_empty()),
"expected at least one finalized broker feature level"
);
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn admin_client_creates_partitions_for_existing_topics() -> Result<()> {
let kafka = KafkaHarness::start().await?;
let topic = unique_topic("admin-create-partitions");
let admin = KafkaAdmin::connect(AdminConfig::new(kafka.bootstrap_server())).await?;
admin
.create_topics([NewTopic::new(topic.clone(), 2, 1)])
.await?;
admin
.create_partitions([(topic.clone(), NewPartitions::increase_to(4))])
.await?;
let descriptions = poll_describe_until(
&admin,
std::slice::from_ref(&topic),
|topics| {
topics
.iter()
.find(|described| described.name == topic)
.is_some_and(|described| described.partitions.len() == 4)
},
Duration::from_secs(20),
)
.await?;
let description = descriptions
.iter()
.find(|described| described.name == topic)
.ok_or_else(|| anyhow!("expected topic description for '{topic}'"))?;
assert_eq!(description.partitions.len(), 4);
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn admin_client_reports_broker_rejections_as_typed_errors() -> Result<()> {
let kafka = KafkaHarness::start().await?;
let topic = unique_topic("admin-missing-topic");
let admin = KafkaAdmin::connect(AdminConfig::new(kafka.bootstrap_server())).await?;
let error = admin
.create_partitions([(topic.clone(), NewPartitions::increase_to(2))])
.await
.expect_err("missing topic should be rejected by the broker");
match error {
Error::Broker(kafkit_client::BrokerError::Response {
operation,
resource,
name,
retriable,
..
}) => {
assert_eq!(operation, "create_partitions");
assert!(resource.contains(&topic));
assert_eq!(name, "UnknownTopicOrPartition");
assert!(retriable);
}
error => bail!("expected typed broker error, got {error:?}"),
}
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn admin_client_reports_disabled_acl_authorizer_as_typed_error() -> Result<()> {
let kafka = KafkaHarness::start().await?;
let admin = KafkaAdmin::connect(AdminConfig::new(kafka.bootstrap_server())).await?;
let error = admin
.describe_acls(AclBindingFilter::any())
.await
.expect_err("default broker without an authorizer should reject ACL reads");
match error {
Error::Broker(kafkit_client::BrokerError::Response {
operation,
name,
retriable,
..
}) => {
assert_eq!(operation, "describe_acls");
assert_eq!(name, "SecurityDisabled");
assert!(!retriable);
}
error => bail!("expected typed ACL broker error, got {error:?}"),
}
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn admin_client_creates_describes_and_deletes_acls_with_authorizer() -> Result<()> {
let kafka = KafkaHarness::start_with_authorizer().await?;
let topic = unique_topic("admin-acl-authorizer");
let admin = KafkaAdmin::connect(AdminConfig::new(kafka.bootstrap_server())).await?;
admin
.create_topics([NewTopic::new(topic.clone(), 1, 1)])
.await?;
let binding = AclBinding::new(
ResourcePattern::new(ResourceType::Topic, topic.clone(), PatternType::Literal),
AccessControlEntry::new(
"User:integration",
"*",
AclOperation::Read,
AclPermissionType::Allow,
),
);
let filter = AclBindingFilter::new(
ResourcePatternFilter::new(
ResourceType::Topic,
Some(topic.clone()),
PatternType::Literal,
),
AccessControlEntryFilter::new(
Some("User:integration".to_owned()),
Some("*".to_owned()),
AclOperation::Read,
AclPermissionType::Allow,
),
);
admin.create_acls([binding.clone()]).await?;
let described = poll_acls_until(
&admin,
filter.clone(),
|acls| acls.iter().any(|acl| acl == &binding),
Duration::from_secs(20),
"created ACL to become visible through an exact describe filter",
)
.await?;
assert!(
described.iter().any(|acl| acl == &binding),
"created ACL should be visible through an exact describe filter"
);
let deleted = admin.delete_acls([binding.to_filter()]).await?;
assert!(
deleted
.iter()
.flat_map(|result| result.matching_acls.iter())
.any(|acl| acl == &binding),
"delete result should report the removed ACL"
);
let remaining = poll_acls_until(
&admin,
filter,
|acls| acls.is_empty(),
Duration::from_secs(20),
"deleted ACL to stop matching the exact describe filter",
)
.await?;
assert!(
remaining.is_empty(),
"deleted ACL should no longer match the exact describe filter"
);
Ok(())
}
async fn poll_acls_until<F>(
admin: &KafkaAdmin,
filter: AclBindingFilter,
predicate: F,
timeout: Duration,
description: &str,
) -> Result<Vec<AclBinding>>
where
F: Fn(&[AclBinding]) -> bool,
{
let deadline = Instant::now() + timeout;
loop {
let acls = admin.describe_acls(filter.clone()).await?;
if predicate(&acls) {
return Ok(acls);
}
if Instant::now() >= deadline {
bail!("timed out waiting for {description}");
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
#[tokio::test(flavor = "multi_thread")]
async fn admin_client_incrementally_alters_and_describes_group_configs() -> Result<()> {
let kafka = KafkaHarness::start().await?;
let admin = KafkaAdmin::connect(AdminConfig::new(kafka.bootstrap_server())).await?;
let group = unique_group("group-config");
admin
.incremental_alter_configs([(
ConfigResource::group(group.clone()),
vec![AlterConfigOp::set("consumer.heartbeat.interval.ms", "6000")],
)])
.await?;
let deadline = Instant::now() + Duration::from_secs(20);
loop {
let described = admin
.describe_configs([ConfigResource::group(group.clone())])
.await?;
let entry = described[0]
.entry("consumer.heartbeat.interval.ms")
.and_then(|entry| entry.value.as_deref());
if entry == Some("6000") {
break;
}
if Instant::now() >= deadline {
bail!("timed out waiting for altered group config to become visible");
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn admin_client_lists_describes_and_deletes_consumer_groups() -> Result<()> {
let kafka = KafkaHarness::start().await?;
let topic = unique_topic("admin-group");
let group = unique_group("admin-group");
let admin = KafkaAdmin::connect(AdminConfig::new(kafka.bootstrap_server())).await?;
admin
.create_topics([NewTopic::new(topic.clone(), 2, 1)])
.await?;
poll_describe_until(
&admin,
std::slice::from_ref(&topic),
|topics| {
topics
.iter()
.find(|described| described.name == topic)
.is_some_and(|described| described.partitions.len() == 2)
},
Duration::from_secs(20),
)
.await?;
let consumer = KafkaConsumer::connect(
ConsumerConfig::new(kafka.bootstrap_server(), group.clone())
.with_auto_offset_reset(AutoOffsetReset::Earliest)
.with_server_assignor("range"),
)
.await?;
consumer.subscribe(vec![topic.clone()]).await?;
poll_until_assignment(
&consumer,
expected_assignment(&topic, 2),
Duration::from_secs(20),
)
.await?;
let listings = poll_admin_groups_until(
&admin,
|groups| groups.iter().any(|listed| listed.group_id == group),
Duration::from_secs(20),
)
.await?;
let listed = listings
.iter()
.find(|listed| listed.group_id == group)
.ok_or_else(|| anyhow!("expected listed group '{group}'"))?;
assert_eq!(listed.group_type.as_deref(), Some("consumer"));
let description = poll_admin_group_description_until(
&admin,
&group,
|description| !description.members.is_empty(),
Duration::from_secs(20),
)
.await?;
assert_eq!(description.group_id, group);
assert_eq!(description.protocol_type, "consumer");
assert_eq!(description.protocol_data, "range");
assert_eq!(description.members.len(), 1);
let member = &description.members[0];
assert!(!member.member_id.is_empty());
assert!(!member.client_id.is_empty());
assert!(!member.client_host.is_empty());
assert_eq!(member.member_metadata_bytes, 1);
assert_eq!(member.member_assignment_bytes, 2);
consumer.shutdown().await?;
poll_admin_group_description_until(
&admin,
&group,
|description| description.members.is_empty(),
Duration::from_secs(20),
)
.await?;
admin.delete_groups([group.clone()]).await?;
poll_admin_groups_until(
&admin,
|groups| !groups.iter().any(|listed| listed.group_id == group),
Duration::from_secs(20),
)
.await?;
Ok(())
}
async fn poll_admin_groups_until<F>(
admin: &KafkaAdmin,
predicate: F,
timeout: Duration,
) -> Result<Vec<kafkit_client::ConsumerGroupListing>>
where
F: Fn(&[kafkit_client::ConsumerGroupListing]) -> bool,
{
let deadline = Instant::now() + timeout;
loop {
let groups = admin.list_groups().await?;
if predicate(&groups) {
return Ok(groups);
}
if Instant::now() >= deadline {
bail!("timed out waiting for admin group listing");
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
async fn poll_admin_group_description_until<F>(
admin: &KafkaAdmin,
group: &str,
predicate: F,
timeout: Duration,
) -> Result<kafkit_client::ConsumerGroupDescription>
where
F: Fn(&kafkit_client::ConsumerGroupDescription) -> bool,
{
let deadline = Instant::now() + timeout;
loop {
let description = admin
.describe_groups([group.to_owned()])
.await?
.into_iter()
.next()
.ok_or_else(|| anyhow!("expected group description for '{group}'"))?;
if predicate(&description) {
return Ok(description);
}
if Instant::now() >= deadline {
bail!("timed out waiting for admin group description");
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
}