#![allow(unused_imports)]
mod common;
use std::sync::Arc;
use std::time::{Duration, Instant};
use anyhow::{Result, anyhow, bail};
use bytes::Bytes;
use kafkit_client::{
AdminConfig, AlterConfigOp, AutoOffsetReset, ConfigResource, ConsumerConfig, ConsumerError,
IsolationLevel, KafkaAdmin, KafkaClient, KafkaConsumer, KafkaMessage, KafkaProducer,
NewPartitions, NewTopic, ProduceRecord, ProducerCompression, ProducerConfig, RecordHeader,
SubscriptionPattern, TopicPartition, TopicPartitionTimestamp,
};
use crate::common::{
KafkaHarness, collect_values, expected_assignment, poll_describe_until, poll_until,
poll_until_assignment, poll_until_consumer_error, poll_until_with_admin, poll_until_with_api,
unique_group, unique_topic,
};
#[tokio::test(flavor = "multi_thread")]
async fn admin_client_creates_lists_describes_and_deletes_topics() -> Result<()> {
let kafka = KafkaHarness::start().await?;
let topic = unique_topic("admin-topic");
let admin = KafkaAdmin::connect(
AdminConfig::new(kafka.bootstrap_server()).with_client_id("integration-admin"),
)
.await?;
admin
.create_topics([NewTopic::new(topic.clone(), 2, 1)])
.await?;
let listings = poll_until_with_admin(
&admin,
|topics| topics.iter().any(|listed| listed.name == topic),
Duration::from_secs(20),
)
.await?;
assert!(listings.iter().any(|listed| listed.name == topic));
let descriptions = poll_describe_until(
&admin,
std::slice::from_ref(&topic),
|topics| topics.iter().any(|described| described.name == topic),
Duration::from_secs(20),
)
.await?;
let description = descriptions
.iter()
.find(|described| described.name == topic)
.ok_or_else(|| anyhow!("expected topic description for '{topic}'"))?;
assert_eq!(description.partitions.len(), 2);
assert!(
description
.partitions
.iter()
.all(|partition| partition.leader_id >= 0)
);
admin.delete_topics([topic.clone()]).await?;
let deadline = Instant::now() + Duration::from_secs(20);
loop {
let topics = admin.list_topics().await?;
if !topics.iter().any(|listed| listed.name == topic) {
break;
}
if Instant::now() >= deadline {
bail!("timed out waiting for topic '{topic}' deletion");
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn admin_client_create_topics_treats_existing_topics_as_success() -> Result<()> {
let kafka = KafkaHarness::start().await?;
let topic = unique_topic("admin-topic-exists");
let admin = KafkaAdmin::connect(AdminConfig::new(kafka.bootstrap_server())).await?;
admin
.create_topics([NewTopic::new(topic.clone(), 1, 1)])
.await?;
admin
.create_topics([NewTopic::new(topic.clone(), 1, 1)])
.await?;
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn admin_client_describes_cluster() -> Result<()> {
let kafka = KafkaHarness::start().await?;
let admin = KafkaAdmin::connect(AdminConfig::new(kafka.bootstrap_server())).await?;
let cluster = admin.describe_cluster().await?;
assert!(!cluster.cluster_id.is_empty());
assert!(!cluster.brokers.is_empty());
assert!(
cluster
.brokers
.iter()
.any(|broker| broker.broker_id == cluster.controller_id)
);
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn admin_client_creates_partitions_for_existing_topics() -> Result<()> {
let kafka = KafkaHarness::start().await?;
let topic = unique_topic("admin-create-partitions");
let admin = KafkaAdmin::connect(AdminConfig::new(kafka.bootstrap_server())).await?;
admin
.create_topics([NewTopic::new(topic.clone(), 2, 1)])
.await?;
admin
.create_partitions([(topic.clone(), NewPartitions::increase_to(4))])
.await?;
let descriptions = poll_describe_until(
&admin,
std::slice::from_ref(&topic),
|topics| {
topics
.iter()
.find(|described| described.name == topic)
.is_some_and(|described| described.partitions.len() == 4)
},
Duration::from_secs(20),
)
.await?;
let description = descriptions
.iter()
.find(|described| described.name == topic)
.ok_or_else(|| anyhow!("expected topic description for '{topic}'"))?;
assert_eq!(description.partitions.len(), 4);
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn admin_client_incrementally_alters_and_describes_group_configs() -> Result<()> {
let kafka = KafkaHarness::start().await?;
let admin = KafkaAdmin::connect(AdminConfig::new(kafka.bootstrap_server())).await?;
let group = unique_group("group-config");
admin
.incremental_alter_configs([(
ConfigResource::group(group.clone()),
vec![AlterConfigOp::set("consumer.heartbeat.interval.ms", "6000")],
)])
.await?;
let deadline = Instant::now() + Duration::from_secs(20);
loop {
let described = admin
.describe_configs([ConfigResource::group(group.clone())])
.await?;
let entry = described[0]
.entry("consumer.heartbeat.interval.ms")
.and_then(|entry| entry.value.as_deref());
if entry == Some("6000") {
break;
}
if Instant::now() >= deadline {
bail!("timed out waiting for altered group config to become visible");
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
Ok(())
}