#![allow(unused_imports)]
mod common;
use std::sync::Arc;
use std::time::{Duration, Instant};
use anyhow::{Result, anyhow, bail};
use bytes::Bytes;
use kafkit_client::{
AdminConfig, AlterConfigOp, AutoOffsetReset, ConfigResource, ConsumerConfig, ConsumerError,
IsolationLevel, KafkaAdmin, KafkaClient, KafkaConsumer, KafkaMessage, KafkaProducer,
NewPartitions, NewTopic, ProduceRecord, ProducerCompression, ProducerConfig, RecordHeader,
SubscriptionPattern, TopicPartition, TopicPartitionTimestamp,
};
use crate::common::{
KafkaHarness, collect_values, expected_assignment, poll_describe_until, poll_until,
poll_until_assignment, poll_until_consumer_error, poll_until_with_admin, poll_until_with_api,
unique_group, unique_topic,
};
#[tokio::test(flavor = "multi_thread")]
async fn ergonomic_api_round_trip_hides_topic_and_subscription_setup() -> Result<()> {
let kafka = KafkaHarness::start().await?;
let topic = unique_topic("api-roundtrip");
kafka.create_topic(&topic).await?;
let client = KafkaClient::new(kafka.bootstrap_server());
let topic_api = client.topic(topic.clone());
let consumer = topic_api
.consumer(unique_group("api-roundtrip"))
.with_auto_offset_reset(AutoOffsetReset::Earliest)
.with_poll_timeout(Duration::from_millis(250))
.connect()
.await?;
let producer = topic_api
.producer()
.with_client_id("integration-api-producer")
.with_compression(ProducerCompression::Gzip)
.connect()
.await?;
producer.send("hello from ergonomic api".to_owned()).await?;
let records = poll_until_with_api(
&consumer,
|records| {
records.iter().any(|record| {
record.topic == topic
&& record.partition == 0
&& record.value.as_deref() == Some(&b"hello from ergonomic api"[..])
})
},
Duration::from_secs(20),
)
.await?;
assert!(records.iter().any(|record| {
record.topic == topic
&& record.partition == 0
&& record.value.as_deref() == Some(&b"hello from ergonomic api"[..])
}));
consumer.commit(&records).await?;
consumer.shutdown().await?;
producer.shutdown().await?;
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn ergonomic_api_message_builder_preserves_headers_and_timestamps() -> Result<()> {
let kafka = KafkaHarness::start().await?;
let topic = unique_topic("api-message");
let produced_timestamp = 1_700_000_123_456i64;
kafka.create_topic(&topic).await?;
let client = KafkaClient::new(kafka.bootstrap_server());
let topic_api = client.topic(topic.clone());
let consumer = topic_api
.consumer(unique_group("api-message"))
.with_auto_offset_reset(AutoOffsetReset::Earliest)
.connect()
.await?;
let producer = topic_api.producer().connect().await?;
producer
.send_message(
KafkaMessage::new("payload".to_owned())
.with_key("record-key".to_owned())
.with_timestamp(produced_timestamp)
.with_headers([
RecordHeader::new("trace-id", Bytes::from_static(b"trace-123")),
RecordHeader::null("nullable"),
]),
)
.await?;
let records = poll_until_with_api(
&consumer,
|records| {
records.iter().any(|record| {
record.topic == topic && record.value.as_deref() == Some(&b"payload"[..])
})
},
Duration::from_secs(20),
)
.await?;
let record = records
.iter()
.find(|record| record.topic == topic && record.value.as_deref() == Some(&b"payload"[..]))
.ok_or_else(|| anyhow!("expected produced record in fetched batch"))?;
assert_eq!(record.key.as_deref(), Some(&b"record-key"[..]));
assert_eq!(record.timestamp, produced_timestamp);
assert_eq!(record.headers.len(), 2);
assert_eq!(record.headers[0].key, "trace-id");
assert_eq!(record.headers[0].value.as_deref(), Some(&b"trace-123"[..]));
assert_eq!(record.headers[1].key, "nullable");
assert!(record.headers[1].value.is_none());
consumer.shutdown().await?;
producer.shutdown().await?;
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn ergonomic_api_commit_survives_consumer_restart() -> Result<()> {
let kafka = KafkaHarness::start().await?;
let topic = unique_topic("api-commit-restart");
let group = unique_group("api-commit-restart");
kafka.create_topic(&topic).await?;
let client = KafkaClient::new(kafka.bootstrap_server());
let topic_api = client.topic(topic.clone());
let producer = topic_api
.producer()
.with_compression(ProducerCompression::None)
.connect()
.await?;
producer.send("first".to_owned()).await?;
let consumer = topic_api
.consumer(group.clone())
.with_auto_offset_reset(AutoOffsetReset::Earliest)
.connect()
.await?;
let first_batch = poll_until_with_api(
&consumer,
|records| {
records
.iter()
.any(|record| record.value.as_deref() == Some(&b"first"[..]))
},
Duration::from_secs(20),
)
.await?;
consumer.commit(&first_batch).await?;
consumer.shutdown().await?;
producer.send("second".to_owned()).await?;
let consumer = topic_api
.consumer(group)
.with_auto_offset_reset(AutoOffsetReset::Earliest)
.connect()
.await?;
let second_batch = poll_until_with_api(
&consumer,
|records| {
records
.iter()
.any(|record| record.value.as_deref() == Some(&b"second"[..]))
},
Duration::from_secs(20),
)
.await?;
assert!(
second_batch
.iter()
.all(|record| record.value.as_deref() != Some(&b"first"[..]))
);
assert!(
second_batch
.iter()
.any(|record| record.value.as_deref() == Some(&b"second"[..]))
);
consumer.shutdown().await?;
producer.shutdown().await?;
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn ergonomic_api_supports_static_membership_via_instance_id() -> Result<()> {
let kafka = KafkaHarness::start().await?;
let topic = unique_topic("api-static-member");
let group = unique_group("api-static-member");
let instance_id = unique_group("api-static-instance");
kafka.create_topic(&topic).await?;
let client = KafkaClient::new(kafka.bootstrap_server());
let topic_api = client.topic(topic.clone());
let producer = topic_api.producer().connect().await?;
producer.send("first".to_owned()).await?;
let consumer = topic_api
.consumer(group.clone())
.with_instance_id(instance_id.clone())
.with_auto_offset_reset(AutoOffsetReset::Earliest)
.connect()
.await?;
let first_batch = poll_until_with_api(
&consumer,
|records| {
records
.iter()
.any(|record| record.value.as_deref() == Some(&b"first"[..]))
},
Duration::from_secs(20),
)
.await?;
let group_metadata = consumer.group_metadata().await?;
assert_eq!(group_metadata.group_id, group);
assert_eq!(
group_metadata.group_instance_id.as_deref(),
Some(instance_id.as_str())
);
assert!(group_metadata.generation_id > 0);
consumer.commit(&first_batch).await?;
consumer.shutdown().await?;
producer.send("second".to_owned()).await?;
let consumer = topic_api
.consumer(group)
.with_instance_id(instance_id)
.with_auto_offset_reset(AutoOffsetReset::Earliest)
.connect()
.await?;
let second_batch = poll_until_with_api(
&consumer,
|records| {
records
.iter()
.any(|record| record.value.as_deref() == Some(&b"second"[..]))
},
Duration::from_secs(20),
)
.await?;
assert!(
second_batch
.iter()
.all(|record| record.value.as_deref() != Some(&b"first"[..]))
);
assert!(
second_batch
.iter()
.any(|record| record.value.as_deref() == Some(&b"second"[..]))
);
consumer.shutdown().await?;
producer.shutdown().await?;
Ok(())
}