kafkit-client 0.1.6

Kafka 4.0+ pure Rust client.
Documentation
#![allow(unused_imports)]

mod common;

use std::sync::Arc;
use std::time::{Duration, Instant};

use anyhow::{Result, anyhow, bail};
use bytes::Bytes;
use kafkit_client::{
    AdminConfig, AlterConfigOp, AutoOffsetReset, ConfigResource, ConsumerConfig, ConsumerError,
    IsolationLevel, KafkaAdmin, KafkaClient, KafkaConsumer, KafkaMessage, KafkaProducer,
    NewPartitions, NewTopic, ProduceRecord, ProducerCompression, ProducerConfig, RecordHeader,
    SubscriptionPattern, TopicPartition, TopicPartitionTimestamp,
};

use crate::common::{
    KafkaHarness, collect_values, expected_assignment, poll_describe_until, poll_until,
    poll_until_assignment, poll_until_consumer_error, poll_until_with_admin, poll_until_with_api,
    unique_group, unique_topic,
};

#[tokio::test(flavor = "multi_thread")]
async fn admin_client_creates_lists_describes_and_deletes_topics() -> Result<()> {
    let kafka = KafkaHarness::start().await?;
    let topic = unique_topic("admin-topic");
    let admin = KafkaAdmin::connect(
        AdminConfig::new(kafka.bootstrap_server()).with_client_id("integration-admin"),
    )
    .await?;

    admin
        .create_topics([NewTopic::new(topic.clone(), 2, 1)])
        .await?;

    let listings = poll_until_with_admin(
        &admin,
        |topics| topics.iter().any(|listed| listed.name == topic),
        Duration::from_secs(20),
    )
    .await?;
    assert!(listings.iter().any(|listed| listed.name == topic));

    let descriptions = poll_describe_until(
        &admin,
        std::slice::from_ref(&topic),
        |topics| topics.iter().any(|described| described.name == topic),
        Duration::from_secs(20),
    )
    .await?;
    let description = descriptions
        .iter()
        .find(|described| described.name == topic)
        .ok_or_else(|| anyhow!("expected topic description for '{topic}'"))?;
    assert_eq!(description.partitions.len(), 2);
    assert!(
        description
            .partitions
            .iter()
            .all(|partition| partition.leader_id >= 0)
    );

    admin.delete_topics([topic.clone()]).await?;
    let deadline = Instant::now() + Duration::from_secs(20);
    loop {
        let topics = admin.list_topics().await?;
        if !topics.iter().any(|listed| listed.name == topic) {
            break;
        }
        if Instant::now() >= deadline {
            bail!("timed out waiting for topic '{topic}' deletion");
        }
        tokio::time::sleep(Duration::from_millis(100)).await;
    }

    Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn admin_client_create_topics_treats_existing_topics_as_success() -> Result<()> {
    let kafka = KafkaHarness::start().await?;
    let topic = unique_topic("admin-topic-exists");
    let admin = KafkaAdmin::connect(AdminConfig::new(kafka.bootstrap_server())).await?;

    admin
        .create_topics([NewTopic::new(topic.clone(), 1, 1)])
        .await?;
    admin
        .create_topics([NewTopic::new(topic.clone(), 1, 1)])
        .await?;

    Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn admin_client_describes_cluster() -> Result<()> {
    let kafka = KafkaHarness::start().await?;
    let admin = KafkaAdmin::connect(AdminConfig::new(kafka.bootstrap_server())).await?;

    let cluster = admin.describe_cluster().await?;
    assert!(!cluster.cluster_id.is_empty());
    assert!(!cluster.brokers.is_empty());
    assert!(
        cluster
            .brokers
            .iter()
            .any(|broker| broker.broker_id == cluster.controller_id)
    );

    Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn admin_client_creates_partitions_for_existing_topics() -> Result<()> {
    let kafka = KafkaHarness::start().await?;
    let topic = unique_topic("admin-create-partitions");
    let admin = KafkaAdmin::connect(AdminConfig::new(kafka.bootstrap_server())).await?;

    admin
        .create_topics([NewTopic::new(topic.clone(), 2, 1)])
        .await?;
    admin
        .create_partitions([(topic.clone(), NewPartitions::increase_to(4))])
        .await?;

    let descriptions = poll_describe_until(
        &admin,
        std::slice::from_ref(&topic),
        |topics| {
            topics
                .iter()
                .find(|described| described.name == topic)
                .is_some_and(|described| described.partitions.len() == 4)
        },
        Duration::from_secs(20),
    )
    .await?;
    let description = descriptions
        .iter()
        .find(|described| described.name == topic)
        .ok_or_else(|| anyhow!("expected topic description for '{topic}'"))?;
    assert_eq!(description.partitions.len(), 4);

    Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn admin_client_incrementally_alters_and_describes_group_configs() -> Result<()> {
    let kafka = KafkaHarness::start().await?;
    let admin = KafkaAdmin::connect(AdminConfig::new(kafka.bootstrap_server())).await?;
    let group = unique_group("group-config");

    admin
        .incremental_alter_configs([(
            ConfigResource::group(group.clone()),
            vec![AlterConfigOp::set("consumer.heartbeat.interval.ms", "6000")],
        )])
        .await?;

    let deadline = Instant::now() + Duration::from_secs(20);
    loop {
        let described = admin
            .describe_configs([ConfigResource::group(group.clone())])
            .await?;
        let entry = described[0]
            .entry("consumer.heartbeat.interval.ms")
            .and_then(|entry| entry.value.as_deref());
        if entry == Some("6000") {
            break;
        }
        if Instant::now() >= deadline {
            bail!("timed out waiting for altered group config to become visible");
        }
        tokio::time::sleep(Duration::from_millis(100)).await;
    }

    Ok(())
}