kafkit-client 0.1.6

Kafka 4.0+ pure Rust client.
Documentation
#![allow(unused_imports)]

mod common;

use std::sync::Arc;
use std::time::{Duration, Instant};

use anyhow::{Result, anyhow, bail};
use bytes::Bytes;
use kafkit_client::{
    AdminConfig, AlterConfigOp, AutoOffsetReset, ConfigResource, ConsumerConfig, ConsumerError,
    IsolationLevel, KafkaAdmin, KafkaClient, KafkaConsumer, KafkaMessage, KafkaProducer,
    NewPartitions, NewTopic, ProduceRecord, ProducerCompression, ProducerConfig, RecordHeader,
    SubscriptionPattern, TopicPartition, TopicPartitionTimestamp,
};

use crate::common::{
    KafkaHarness, collect_values, expected_assignment, poll_describe_until, poll_until,
    poll_until_assignment, poll_until_consumer_error, poll_until_with_admin, poll_until_with_api,
    unique_group, unique_topic,
};

#[tokio::test(flavor = "multi_thread")]
async fn ergonomic_api_round_trip_hides_topic_and_subscription_setup() -> Result<()> {
    let kafka = KafkaHarness::start().await?;
    let topic = unique_topic("api-roundtrip");
    kafka.create_topic(&topic).await?;

    let client = KafkaClient::new(kafka.bootstrap_server());
    let topic_api = client.topic(topic.clone());

    let consumer = topic_api
        .consumer(unique_group("api-roundtrip"))
        .with_auto_offset_reset(AutoOffsetReset::Earliest)
        .with_poll_timeout(Duration::from_millis(250))
        .connect()
        .await?;

    let producer = topic_api
        .producer()
        .with_client_id("integration-api-producer")
        .with_compression(ProducerCompression::Gzip)
        .connect()
        .await?;

    producer.send("hello from ergonomic api".to_owned()).await?;

    let records = poll_until_with_api(
        &consumer,
        |records| {
            records.iter().any(|record| {
                record.topic == topic
                    && record.partition == 0
                    && record.value.as_deref() == Some(&b"hello from ergonomic api"[..])
            })
        },
        Duration::from_secs(20),
    )
    .await?;

    assert!(records.iter().any(|record| {
        record.topic == topic
            && record.partition == 0
            && record.value.as_deref() == Some(&b"hello from ergonomic api"[..])
    }));

    consumer.commit(&records).await?;
    consumer.shutdown().await?;
    producer.shutdown().await?;
    Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn ergonomic_api_message_builder_preserves_headers_and_timestamps() -> Result<()> {
    let kafka = KafkaHarness::start().await?;
    let topic = unique_topic("api-message");
    let produced_timestamp = 1_700_000_123_456i64;
    kafka.create_topic(&topic).await?;

    let client = KafkaClient::new(kafka.bootstrap_server());
    let topic_api = client.topic(topic.clone());

    let consumer = topic_api
        .consumer(unique_group("api-message"))
        .with_auto_offset_reset(AutoOffsetReset::Earliest)
        .connect()
        .await?;
    let producer = topic_api.producer().connect().await?;

    producer
        .send_message(
            KafkaMessage::new("payload".to_owned())
                .with_key("record-key".to_owned())
                .with_timestamp(produced_timestamp)
                .with_headers([
                    RecordHeader::new("trace-id", Bytes::from_static(b"trace-123")),
                    RecordHeader::null("nullable"),
                ]),
        )
        .await?;

    let records = poll_until_with_api(
        &consumer,
        |records| {
            records.iter().any(|record| {
                record.topic == topic && record.value.as_deref() == Some(&b"payload"[..])
            })
        },
        Duration::from_secs(20),
    )
    .await?;

    let record = records
        .iter()
        .find(|record| record.topic == topic && record.value.as_deref() == Some(&b"payload"[..]))
        .ok_or_else(|| anyhow!("expected produced record in fetched batch"))?;
    assert_eq!(record.key.as_deref(), Some(&b"record-key"[..]));
    assert_eq!(record.timestamp, produced_timestamp);
    assert_eq!(record.headers.len(), 2);
    assert_eq!(record.headers[0].key, "trace-id");
    assert_eq!(record.headers[0].value.as_deref(), Some(&b"trace-123"[..]));
    assert_eq!(record.headers[1].key, "nullable");
    assert!(record.headers[1].value.is_none());

    consumer.shutdown().await?;
    producer.shutdown().await?;
    Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn ergonomic_api_commit_survives_consumer_restart() -> Result<()> {
    let kafka = KafkaHarness::start().await?;
    let topic = unique_topic("api-commit-restart");
    let group = unique_group("api-commit-restart");
    kafka.create_topic(&topic).await?;

    let client = KafkaClient::new(kafka.bootstrap_server());
    let topic_api = client.topic(topic.clone());

    let producer = topic_api
        .producer()
        .with_compression(ProducerCompression::None)
        .connect()
        .await?;
    producer.send("first".to_owned()).await?;

    let consumer = topic_api
        .consumer(group.clone())
        .with_auto_offset_reset(AutoOffsetReset::Earliest)
        .connect()
        .await?;
    let first_batch = poll_until_with_api(
        &consumer,
        |records| {
            records
                .iter()
                .any(|record| record.value.as_deref() == Some(&b"first"[..]))
        },
        Duration::from_secs(20),
    )
    .await?;
    consumer.commit(&first_batch).await?;
    consumer.shutdown().await?;

    producer.send("second".to_owned()).await?;

    let consumer = topic_api
        .consumer(group)
        .with_auto_offset_reset(AutoOffsetReset::Earliest)
        .connect()
        .await?;
    let second_batch = poll_until_with_api(
        &consumer,
        |records| {
            records
                .iter()
                .any(|record| record.value.as_deref() == Some(&b"second"[..]))
        },
        Duration::from_secs(20),
    )
    .await?;

    assert!(
        second_batch
            .iter()
            .all(|record| record.value.as_deref() != Some(&b"first"[..]))
    );
    assert!(
        second_batch
            .iter()
            .any(|record| record.value.as_deref() == Some(&b"second"[..]))
    );

    consumer.shutdown().await?;
    producer.shutdown().await?;
    Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn ergonomic_api_supports_static_membership_via_instance_id() -> Result<()> {
    let kafka = KafkaHarness::start().await?;
    let topic = unique_topic("api-static-member");
    let group = unique_group("api-static-member");
    let instance_id = unique_group("api-static-instance");
    kafka.create_topic(&topic).await?;

    let client = KafkaClient::new(kafka.bootstrap_server());
    let topic_api = client.topic(topic.clone());

    let producer = topic_api.producer().connect().await?;
    producer.send("first".to_owned()).await?;

    let consumer = topic_api
        .consumer(group.clone())
        .with_instance_id(instance_id.clone())
        .with_auto_offset_reset(AutoOffsetReset::Earliest)
        .connect()
        .await?;
    let first_batch = poll_until_with_api(
        &consumer,
        |records| {
            records
                .iter()
                .any(|record| record.value.as_deref() == Some(&b"first"[..]))
        },
        Duration::from_secs(20),
    )
    .await?;
    let group_metadata = consumer.group_metadata().await?;
    assert_eq!(group_metadata.group_id, group);
    assert_eq!(
        group_metadata.group_instance_id.as_deref(),
        Some(instance_id.as_str())
    );
    assert!(group_metadata.generation_id > 0);
    consumer.commit(&first_batch).await?;
    consumer.shutdown().await?;

    producer.send("second".to_owned()).await?;

    let consumer = topic_api
        .consumer(group)
        .with_instance_id(instance_id)
        .with_auto_offset_reset(AutoOffsetReset::Earliest)
        .connect()
        .await?;
    let second_batch = poll_until_with_api(
        &consumer,
        |records| {
            records
                .iter()
                .any(|record| record.value.as_deref() == Some(&b"second"[..]))
        },
        Duration::from_secs(20),
    )
    .await?;

    assert!(
        second_batch
            .iter()
            .all(|record| record.value.as_deref() != Some(&b"first"[..]))
    );
    assert!(
        second_batch
            .iter()
            .any(|record| record.value.as_deref() == Some(&b"second"[..]))
    );

    consumer.shutdown().await?;
    producer.shutdown().await?;
    Ok(())
}