#![allow(dead_code)]
use std::collections::{BTreeSet, HashMap};
use std::env;
use std::net::TcpListener;
use std::time::{Duration, Instant};
use anyhow::{Context, Result, anyhow, bail};
use bytes::{BufMut, Bytes, BytesMut};
use kafka_protocol::error::ParseResponseErrorCode;
use kafka_protocol::messages::create_topics_request::CreatableTopic;
use kafka_protocol::messages::fetch_request::{FetchPartition, FetchTopic};
use kafka_protocol::messages::metadata_request::MetadataRequestTopic;
use kafka_protocol::messages::{
ApiVersionsRequest, CreateTopicsRequest, CreateTopicsResponse, FetchRequest, FetchResponse,
MetadataRequest, MetadataResponse, RequestHeader, ResponseHeader,
};
use kafka_protocol::protocol::{
Decodable, HeaderVersion, Request, StrBytes, VersionRange, encode_request_header_into_buffer,
};
use kafka_protocol::records::RecordBatchDecoder;
use kafkit_client::{AdminConfig, KafkaAdmin, KafkaConsumer, SaslMechanism, TopicPartition};
use testcontainers::{
ContainerAsync, GenericImage, ImageExt,
core::{CmdWaitFor, ExecCommand, IntoContainerPort, WaitFor},
runners::AsyncRunner,
};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use tokio::time::sleep;
use uuid::Uuid;
const KAFKA_IMAGE: &str = "apache/kafka";
const KAFKA_TAG: &str = "4.2.0";
const EXTERNAL_PORT: u16 = 9092;
const ADMIN_EXTERNAL_PORT: u16 = 9093;
const INTERNAL_BROKER_PORT: u16 = 19092;
const CONTROLLER_PORT: u16 = 29093;
const CLUSTER_ID: &str = "4L6g3nShT-eMCtK--X86sw";
const TEST_CLIENT_ID: &str = "kafkit-client-integration-tests";
const TEST_TIMEOUT: Duration = Duration::from_secs(5);
const API_VERSIONS_PROBE_VERSION: i16 = 3;
const API_VERSIONS_FALLBACK_VERSION: i16 = 0;
const METADATA_VERSION_CAP: i16 = 12;
const CREATE_TOPICS_VERSION_CAP: i16 = 7;
const FETCH_VERSION_CAP: i16 = 16;
const READ_UNCOMMITTED: i8 = 0;
const READ_COMMITTED: i8 = 1;
const SASL_USERNAME: &str = "integration";
const SASL_PASSWORD: &str = "integration-secret";
pub struct KafkaHarness {
backend: KafkaHarnessBackend,
bootstrap_server: String,
sasl_username: Option<String>,
sasl_password: Option<String>,
}
enum KafkaHarnessBackend {
Container(Box<ContainerAsync<GenericImage>>),
External,
}
impl KafkaHarness {
pub async fn start() -> Result<Self> {
if let Some(bootstrap_server) = env::var("TEST_KAFKA_BROKER")
.ok()
.map(|value| value.trim().to_owned())
.filter(|value| !value.is_empty())
{
return Ok(Self {
backend: KafkaHarnessBackend::External,
bootstrap_server,
sasl_username: None,
sasl_password: None,
});
}
let host_port = reserve_host_port()?;
let container = GenericImage::new(KAFKA_IMAGE, KAFKA_TAG)
.with_wait_for(WaitFor::message_on_either_std("Kafka Server started"))
.with_wait_for(WaitFor::message_on_either_std("Awaiting socket connections on"))
.with_mapped_port(host_port, EXTERNAL_PORT.tcp())
.with_env_var("KAFKA_NODE_ID", "1")
.with_env_var(
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP",
"CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT",
)
.with_env_var(
"KAFKA_ADVERTISED_LISTENERS",
format!(
"PLAINTEXT_HOST://127.0.0.1:{host_port},PLAINTEXT://127.0.0.1:{INTERNAL_BROKER_PORT}"
),
)
.with_env_var("KAFKA_PROCESS_ROLES", "broker,controller")
.with_env_var(
"KAFKA_CONTROLLER_QUORUM_VOTERS",
format!("1@127.0.0.1:{CONTROLLER_PORT}"),
)
.with_env_var(
"KAFKA_LISTENERS",
format!(
"CONTROLLER://:{CONTROLLER_PORT},PLAINTEXT_HOST://:{EXTERNAL_PORT},PLAINTEXT://:{INTERNAL_BROKER_PORT}"
),
)
.with_env_var("KAFKA_INTER_BROKER_LISTENER_NAME", "PLAINTEXT")
.with_env_var("KAFKA_CONTROLLER_LISTENER_NAMES", "CONTROLLER")
.with_env_var("CLUSTER_ID", CLUSTER_ID)
.with_env_var("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1")
.with_env_var("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0")
.with_env_var(
"KAFKA_GROUP_COORDINATOR_REBALANCE_PROTOCOLS",
"classic,consumer,share",
)
.with_env_var("KAFKA_GROUP_CONSUMER_ASSIGNORS", "uniform,range")
.with_env_var("KAFKA_GROUP_SHARE_ENABLE", "true")
.with_env_var("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1")
.with_env_var("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1")
.with_env_var("KAFKA_SHARE_COORDINATOR_STATE_TOPIC_REPLICATION_FACTOR", "1")
.with_env_var("KAFKA_SHARE_COORDINATOR_STATE_TOPIC_MIN_ISR", "1")
.with_env_var("KAFKA_LOG_DIRS", "/tmp/kraft-combined-logs")
.start()
.await
.context("failed to start Kafka 4.2 test container")?;
enable_share_groups_in_container(&container).await?;
Ok(Self {
backend: KafkaHarnessBackend::Container(Box::new(container)),
bootstrap_server: format!("127.0.0.1:{host_port}"),
sasl_username: None,
sasl_password: None,
})
}
pub async fn start_sasl_plain() -> Result<Self> {
if let Some(bootstrap_server) = env::var("TEST_KAFKA_SASL_BROKER")
.ok()
.map(|value| value.trim().to_owned())
.filter(|value| !value.is_empty())
{
let username =
env::var("TEST_KAFKA_SASL_USERNAME").unwrap_or_else(|_| SASL_USERNAME.to_owned());
let password =
env::var("TEST_KAFKA_SASL_PASSWORD").unwrap_or_else(|_| SASL_PASSWORD.to_owned());
return Ok(Self {
backend: KafkaHarnessBackend::External,
bootstrap_server,
sasl_username: Some(username),
sasl_password: Some(password),
});
}
let host_port = reserve_host_port()?;
let container = GenericImage::new(KAFKA_IMAGE, KAFKA_TAG)
.with_wait_for(WaitFor::message_on_either_std("Kafka Server started"))
.with_wait_for(WaitFor::message_on_either_std("Awaiting socket connections on"))
.with_mapped_port(host_port, EXTERNAL_PORT.tcp())
.with_env_var("KAFKA_NODE_ID", "1")
.with_env_var(
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP",
"CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SASL:SASL_PLAINTEXT",
)
.with_env_var(
"KAFKA_ADVERTISED_LISTENERS",
format!(
"SASL://127.0.0.1:{host_port},PLAINTEXT://127.0.0.1:{INTERNAL_BROKER_PORT}"
),
)
.with_env_var("KAFKA_PROCESS_ROLES", "broker,controller")
.with_env_var(
"KAFKA_CONTROLLER_QUORUM_VOTERS",
format!("1@127.0.0.1:{CONTROLLER_PORT}"),
)
.with_env_var(
"KAFKA_LISTENERS",
format!(
"CONTROLLER://:{CONTROLLER_PORT},SASL://:{EXTERNAL_PORT},PLAINTEXT://:{INTERNAL_BROKER_PORT}"
),
)
.with_env_var("KAFKA_INTER_BROKER_LISTENER_NAME", "PLAINTEXT")
.with_env_var("KAFKA_CONTROLLER_LISTENER_NAMES", "CONTROLLER")
.with_env_var("CLUSTER_ID", CLUSTER_ID)
.with_env_var("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1")
.with_env_var("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0")
.with_env_var("KAFKA_GROUP_COORDINATOR_REBALANCE_PROTOCOLS", "classic,consumer")
.with_env_var("KAFKA_GROUP_CONSUMER_ASSIGNORS", "uniform,range")
.with_env_var("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1")
.with_env_var("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1")
.with_env_var("KAFKA_SHARE_COORDINATOR_STATE_TOPIC_REPLICATION_FACTOR", "1")
.with_env_var("KAFKA_SHARE_COORDINATOR_STATE_TOPIC_MIN_ISR", "1")
.with_env_var("KAFKA_LOG_DIRS", "/tmp/kraft-combined-logs")
.with_env_var("KAFKA_SASL_ENABLED_MECHANISMS", "PLAIN")
.with_env_var("KAFKA_LISTENER_NAME_SASL_SASL_ENABLED_MECHANISMS", "PLAIN")
.with_env_var(
"KAFKA_LISTENER_NAME_SASL_PLAIN_SASL_JAAS_CONFIG",
format!(
"org.apache.kafka.common.security.plain.PlainLoginModule required user_{SASL_USERNAME}=\"{SASL_PASSWORD}\";"
),
)
.start()
.await
.context("failed to start SASL Kafka 4.2 test container")?;
Ok(Self {
backend: KafkaHarnessBackend::Container(Box::new(container)),
bootstrap_server: format!("127.0.0.1:{host_port}"),
sasl_username: Some(SASL_USERNAME.to_owned()),
sasl_password: Some(SASL_PASSWORD.to_owned()),
})
}
pub async fn start_sasl_scram_sha_256() -> Result<Self> {
if let Some(bootstrap_server) = env::var("TEST_KAFKA_SCRAM_BROKER")
.ok()
.map(|value| value.trim().to_owned())
.filter(|value| !value.is_empty())
{
let username =
env::var("TEST_KAFKA_SCRAM_USERNAME").unwrap_or_else(|_| SASL_USERNAME.to_owned());
let password =
env::var("TEST_KAFKA_SCRAM_PASSWORD").unwrap_or_else(|_| SASL_PASSWORD.to_owned());
return Ok(Self {
backend: KafkaHarnessBackend::External,
bootstrap_server,
sasl_username: Some(username),
sasl_password: Some(password),
});
}
let host_port = reserve_host_port()?;
let admin_host_port = reserve_host_port()?;
let container = GenericImage::new(KAFKA_IMAGE, KAFKA_TAG)
.with_wait_for(WaitFor::message_on_either_std("Kafka Server started"))
.with_wait_for(WaitFor::message_on_either_std("Awaiting socket connections on"))
.with_mapped_port(host_port, EXTERNAL_PORT.tcp())
.with_mapped_port(admin_host_port, ADMIN_EXTERNAL_PORT.tcp())
.with_env_var("KAFKA_NODE_ID", "1")
.with_env_var(
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP",
"CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,SASL:SASL_PLAINTEXT",
)
.with_env_var(
"KAFKA_ADVERTISED_LISTENERS",
format!(
"SASL://127.0.0.1:{host_port},PLAINTEXT_HOST://127.0.0.1:{admin_host_port},PLAINTEXT://127.0.0.1:{INTERNAL_BROKER_PORT}"
),
)
.with_env_var("KAFKA_PROCESS_ROLES", "broker,controller")
.with_env_var(
"KAFKA_CONTROLLER_QUORUM_VOTERS",
format!("1@127.0.0.1:{CONTROLLER_PORT}"),
)
.with_env_var(
"KAFKA_LISTENERS",
format!(
"CONTROLLER://:{CONTROLLER_PORT},SASL://:{EXTERNAL_PORT},PLAINTEXT_HOST://:{ADMIN_EXTERNAL_PORT},PLAINTEXT://:{INTERNAL_BROKER_PORT}"
),
)
.with_env_var("KAFKA_INTER_BROKER_LISTENER_NAME", "PLAINTEXT")
.with_env_var("KAFKA_CONTROLLER_LISTENER_NAMES", "CONTROLLER")
.with_env_var("CLUSTER_ID", CLUSTER_ID)
.with_env_var("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1")
.with_env_var("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0")
.with_env_var("KAFKA_GROUP_COORDINATOR_REBALANCE_PROTOCOLS", "classic,consumer")
.with_env_var("KAFKA_GROUP_CONSUMER_ASSIGNORS", "uniform,range")
.with_env_var("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1")
.with_env_var("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1")
.with_env_var("KAFKA_SHARE_COORDINATOR_STATE_TOPIC_REPLICATION_FACTOR", "1")
.with_env_var("KAFKA_SHARE_COORDINATOR_STATE_TOPIC_MIN_ISR", "1")
.with_env_var("KAFKA_LOG_DIRS", "/tmp/kraft-combined-logs")
.with_env_var("KAFKA_SASL_ENABLED_MECHANISMS", "SCRAM-SHA-256")
.with_env_var(
"KAFKA_LISTENER_NAME_SASL_SASL_ENABLED_MECHANISMS",
"SCRAM-SHA-256",
)
.with_env_var(
"KAFKA_LISTENER_NAME_SASL_SCRAM-SHA-256_SASL_JAAS_CONFIG",
"org.apache.kafka.common.security.scram.ScramLoginModule required;",
)
.start()
.await
.context("failed to start SCRAM Kafka 4.2 test container")?;
let admin = KafkaAdmin::connect(AdminConfig::new(format!("127.0.0.1:{admin_host_port}")))
.await
.context("failed to connect admin client to create SCRAM user")?;
admin
.upsert_scram_credential(SASL_USERNAME, SaslMechanism::ScramSha256, SASL_PASSWORD)
.await
.context("failed to create SCRAM user with admin client")?;
Ok(Self {
backend: KafkaHarnessBackend::Container(Box::new(container)),
bootstrap_server: format!("127.0.0.1:{host_port}"),
sasl_username: Some(SASL_USERNAME.to_owned()),
sasl_password: Some(SASL_PASSWORD.to_owned()),
})
}
pub fn bootstrap_server(&self) -> &str {
&self.bootstrap_server
}
pub fn sasl_username(&self) -> Result<&str> {
self.sasl_username
.as_deref()
.context("Kafka harness was not started with SASL")
}
pub fn sasl_password(&self) -> Result<&str> {
self.sasl_password
.as_deref()
.context("Kafka harness was not started with SASL")
}
pub async fn create_topic(&self, topic: &str) -> Result<()> {
match &self.backend {
KafkaHarnessBackend::Container(container) => {
let mut exec = container
.exec(
ExecCommand::new([
"/opt/kafka/bin/kafka-topics.sh",
"--bootstrap-server",
"127.0.0.1:19092",
"--create",
"--if-not-exists",
"--topic",
topic,
"--partitions",
"1",
"--replication-factor",
"1",
])
.with_cmd_ready_condition(CmdWaitFor::exit_code(0)),
)
.await
.with_context(|| {
format!("failed to create topic '{topic}' in Kafka container")
})?;
if exec.exit_code().await? != Some(0) {
let stderr = String::from_utf8(exec.stderr_to_vec().await.unwrap_or_default())
.unwrap_or_else(|_| "<non-utf8 stderr>".to_owned());
bail!("topic creation failed for '{topic}': {stderr}");
}
Ok(())
}
KafkaHarnessBackend::External => {
create_topic_via_protocol(&self.bootstrap_server, topic).await
}
}
}
pub async fn consume_values(
&self,
topic: &str,
isolation_level: &str,
max_messages: usize,
timeout_ms: u64,
) -> Result<Vec<String>> {
match &self.backend {
KafkaHarnessBackend::Container(container) => {
let command = vec![
"/opt/kafka/bin/kafka-console-consumer.sh".to_owned(),
"--bootstrap-server".to_owned(),
"127.0.0.1:19092".to_owned(),
"--topic".to_owned(),
topic.to_owned(),
"--partition".to_owned(),
"0".to_owned(),
"--from-beginning".to_owned(),
"--timeout-ms".to_owned(),
timeout_ms.to_string(),
"--max-messages".to_owned(),
max_messages.to_string(),
"--command-property".to_owned(),
format!("isolation.level={isolation_level}"),
];
let mut exec = container
.exec(
ExecCommand::new(command)
.with_cmd_ready_condition(CmdWaitFor::exit_code(0)),
)
.await
.with_context(|| {
format!("failed to consume topic '{topic}' from Kafka container")
})?;
if exec.exit_code().await? != Some(0) {
let stderr = String::from_utf8(exec.stderr_to_vec().await.unwrap_or_default())
.unwrap_or_else(|_| "<non-utf8 stderr>".to_owned());
bail!("topic consume failed for '{topic}': {stderr}");
}
let stdout = String::from_utf8(exec.stdout_to_vec().await.unwrap_or_default())
.unwrap_or_else(|_| String::new());
Ok(stdout
.lines()
.map(str::trim)
.filter(|line| !line.is_empty())
.filter(|line| !line.starts_with("Option --consumer-property is deprecated"))
.map(ToOwned::to_owned)
.collect())
}
KafkaHarnessBackend::External => {
consume_values_via_protocol(
&self.bootstrap_server,
topic,
isolation_level,
max_messages,
timeout_ms,
)
.await
}
}
}
}
pub fn unique_topic(prefix: &str) -> String {
format!("{prefix}-{}", Uuid::new_v4().simple())
}
pub fn unique_group(prefix: &str) -> String {
format!("{prefix}-{}", Uuid::new_v4().simple())
}
async fn enable_share_groups_in_container(container: &ContainerAsync<GenericImage>) -> Result<()> {
let mut exec = container
.exec(
ExecCommand::new([
"/opt/kafka/bin/kafka-features.sh",
"--bootstrap-server",
"127.0.0.1:19092",
"upgrade",
"--feature",
"share.version=1",
])
.with_cmd_ready_condition(CmdWaitFor::exit()),
)
.await
.context("failed to run kafka-features.sh for share.version")?;
if exec.exit_code().await? != Some(0) {
let stderr = String::from_utf8(exec.stderr_to_vec().await.unwrap_or_default())
.unwrap_or_else(|_| "<non-utf8 stderr>".to_owned());
bail!("failed to enable share.version=1: {stderr}");
}
Ok(())
}
fn reserve_host_port() -> Result<u16> {
let listener = TcpListener::bind("127.0.0.1:0")
.context("failed to reserve an ephemeral host port for Kafka test container")?;
let port = listener
.local_addr()
.map_err(|error| anyhow!("failed to read reserved Kafka test port: {error}"))?
.port();
drop(listener);
Ok(port)
}
async fn create_topic_via_protocol(bootstrap_server: &str, topic: &str) -> Result<()> {
let mut connection =
TestBrokerConnection::connect(bootstrap_server, TEST_CLIENT_ID, TEST_TIMEOUT).await?;
let version = connection.version_with_cap::<CreateTopicsRequest>(CREATE_TOPICS_VERSION_CAP)?;
let request = CreateTopicsRequest::default()
.with_topics(vec![
CreatableTopic::default()
.with_name(StrBytes::from_string(topic.to_owned()).into())
.with_num_partitions(1)
.with_replication_factor(1),
])
.with_timeout_ms(duration_to_i32_ms(TEST_TIMEOUT)?)
.with_validate_only(false);
let response: CreateTopicsResponse = connection
.send_request(TEST_CLIENT_ID, version, &request)
.await?;
for result in response.topics {
if let Some(error) = result.error_code.err() {
let message = format!("create topic failed for '{topic}': {error}");
if !message.contains("TopicAlreadyExists") {
bail!(message);
}
}
}
let deadline = Instant::now() + TEST_TIMEOUT;
loop {
if fetch_topic_metadata(bootstrap_server, topic).await.is_ok() {
return Ok(());
}
if Instant::now() >= deadline {
bail!("timed out waiting for topic '{topic}' metadata after create");
}
sleep(Duration::from_millis(100)).await;
}
}
async fn consume_values_via_protocol(
bootstrap_server: &str,
topic: &str,
isolation_level: &str,
max_messages: usize,
timeout_ms: u64,
) -> Result<Vec<String>> {
let isolation_level = match isolation_level {
"read_uncommitted" => READ_UNCOMMITTED,
"read_committed" => READ_COMMITTED,
other => bail!("unsupported test isolation level '{other}'"),
};
let deadline = Instant::now() + Duration::from_millis(timeout_ms);
let mut offset = 0_i64;
let mut values = Vec::new();
while Instant::now() < deadline && values.len() < max_messages {
let metadata = fetch_topic_metadata(bootstrap_server, topic).await?;
let mut connection =
TestBrokerConnection::connect(&metadata.leader_address, TEST_CLIENT_ID, TEST_TIMEOUT)
.await?;
let version = connection.version_with_cap::<FetchRequest>(FETCH_VERSION_CAP)?;
let request = build_fetch_request(
topic,
metadata.topic_id,
metadata.leader_epoch,
offset,
isolation_level,
version,
)?;
let response: FetchResponse = connection
.send_request(TEST_CLIENT_ID, version, &request)
.await?;
if let Some(error) = response.error_code.err() {
bail!("fetch failed for topic '{topic}': {error}");
}
let mut advanced = false;
for topic_response in response.responses {
let topic_name = if version >= 13 && !topic_response.topic_id.is_nil() {
topic.to_owned()
} else {
topic_response.topic.0.to_string()
};
if topic_name != topic {
continue;
}
for partition in topic_response.partitions {
if let Some(error) = partition.error_code.err() {
bail!(
"fetch failed for {topic}:{partition_index}: {error}",
partition_index = partition.partition_index
);
}
let Some(records) = partition.records else {
continue;
};
if records.is_empty() {
continue;
}
let mut bytes = records.clone();
let batches = RecordBatchDecoder::decode_all(&mut bytes)?;
for batch in batches {
for record in batch.records {
if record.control {
continue;
}
if let Some(value) = record.value {
values.push(String::from_utf8_lossy(&value).into_owned());
offset = record.offset + 1;
advanced = true;
if values.len() >= max_messages {
return Ok(values);
}
}
}
}
}
}
if !advanced {
sleep(Duration::from_millis(100)).await;
}
}
Ok(values)
}
async fn fetch_topic_metadata(bootstrap_server: &str, topic: &str) -> Result<TopicFetchMetadata> {
let mut connection =
TestBrokerConnection::connect(bootstrap_server, TEST_CLIENT_ID, TEST_TIMEOUT).await?;
let version = connection.version_with_cap::<MetadataRequest>(METADATA_VERSION_CAP)?;
let request = MetadataRequest::default()
.with_topics(Some(vec![
MetadataRequestTopic::default()
.with_name(Some(StrBytes::from_string(topic.to_owned()).into())),
]))
.with_allow_auto_topic_creation(false)
.with_include_cluster_authorized_operations(false)
.with_include_topic_authorized_operations(false);
let response: MetadataResponse = connection
.send_request(TEST_CLIENT_ID, version, &request)
.await?;
let brokers = response
.brokers
.into_iter()
.map(|broker| (broker.node_id.0, format!("{}:{}", broker.host, broker.port)))
.collect::<HashMap<_, _>>();
for topic_response in response.topics {
let topic_name = topic_response
.name
.as_ref()
.map(|name| name.0.to_string())
.unwrap_or_default();
if topic_name != topic {
continue;
}
if let Some(error) = topic_response.error_code.err() {
bail!("metadata failed for topic '{topic}': {error}");
}
let partition = topic_response
.partitions
.into_iter()
.find(|partition| partition.partition_index == 0)
.with_context(|| format!("metadata for topic '{topic}' did not include partition 0"))?;
if let Some(error) = partition.error_code.err() {
bail!("metadata failed for topic '{topic}' partition 0: {error}");
}
let leader_id = partition.leader_id.0;
let leader_address = brokers
.get(&leader_id)
.cloned()
.with_context(|| format!("metadata for topic '{topic}' missing leader {leader_id}"))?;
return Ok(TopicFetchMetadata {
topic_id: topic_response.topic_id,
leader_epoch: partition.leader_epoch,
leader_address,
});
}
bail!("topic '{topic}' was not present in metadata response")
}
fn build_fetch_request(
topic: &str,
topic_id: Uuid,
leader_epoch: i32,
offset: i64,
isolation_level: i8,
version: i16,
) -> Result<FetchRequest> {
let partition = FetchPartition::default()
.with_partition(0)
.with_current_leader_epoch(leader_epoch)
.with_fetch_offset(offset)
.with_last_fetched_epoch(-1)
.with_log_start_offset(-1)
.with_partition_max_bytes(1024 * 1024);
let topic = if version >= 13 {
FetchTopic::default()
.with_topic_id(topic_id)
.with_partitions(vec![partition])
} else {
FetchTopic::default()
.with_topic(StrBytes::from_string(topic.to_owned()).into())
.with_partitions(vec![partition])
};
Ok(FetchRequest::default()
.with_replica_id((-1).into())
.with_max_wait_ms(250)
.with_min_bytes(1)
.with_max_bytes(50 * 1024 * 1024)
.with_isolation_level(isolation_level)
.with_session_id(0)
.with_session_epoch(-1)
.with_topics(vec![topic])
.with_forgotten_topics_data(Vec::new())
.with_rack_id(StrBytes::from_string(String::new())))
}
struct TopicFetchMetadata {
topic_id: Uuid,
leader_epoch: i32,
leader_address: String,
}
struct TestBrokerConnection {
stream: TcpStream,
next_correlation_id: i32,
api_versions: HashMap<i16, VersionRange>,
}
impl TestBrokerConnection {
async fn connect(address: &str, client_id: &str, timeout: Duration) -> Result<Self> {
let stream = tokio::time::timeout(timeout, TcpStream::connect(address))
.await
.with_context(|| format!("timed out connecting to {address}"))?
.with_context(|| format!("failed to connect to {address}"))?;
let mut connection = Self {
stream,
next_correlation_id: 1,
api_versions: HashMap::new(),
};
connection.negotiate_versions(client_id).await?;
Ok(connection)
}
fn version_with_cap<Req>(&self, cap: i16) -> Result<i16>
where
Req: Request,
{
let broker_range = self
.api_versions
.get(&Req::KEY)
.copied()
.with_context(|| format!("broker did not advertise API key {}", Req::KEY))?;
select_api_version(Req::KEY, broker_range, Req::VERSIONS, cap)
}
async fn negotiate_versions(&mut self, client_id: &str) -> Result<()> {
let modern_request = ApiVersionsRequest::default()
.with_client_software_name(StrBytes::from_static_str("kafkit-client-tests"))
.with_client_software_version(StrBytes::from_static_str("0.1.0"));
let response = match self
.send_request::<ApiVersionsRequest>(
client_id,
API_VERSIONS_PROBE_VERSION,
&modern_request,
)
.await
{
Ok(response) => response,
Err(_) => {
self.send_request::<ApiVersionsRequest>(
client_id,
API_VERSIONS_FALLBACK_VERSION,
&ApiVersionsRequest::default(),
)
.await?
}
};
if let Some(error) = response.error_code.err() {
bail!("ApiVersions failed: {error}");
}
self.api_versions = response
.api_keys
.into_iter()
.map(|api| {
(
api.api_key,
VersionRange {
min: api.min_version,
max: api.max_version,
},
)
})
.collect();
Ok(())
}
async fn send_request<Req>(
&mut self,
client_id: &str,
version: i16,
request: &Req,
) -> Result<Req::Response>
where
Req: Request,
{
let correlation_id = self.next_correlation_id;
self.next_correlation_id += 1;
let mut body = BytesMut::new();
let header = RequestHeader::default()
.with_request_api_key(Req::KEY)
.with_request_api_version(version)
.with_correlation_id(correlation_id)
.with_client_id(Some(StrBytes::from_string(client_id.to_owned())));
encode_request_header_into_buffer(&mut body, &header)?;
request.encode(&mut body, version)?;
let mut frame = BytesMut::with_capacity(body.len() + 4);
frame.put_i32(i32::try_from(body.len()).context("request frame is too large")?);
frame.extend_from_slice(&body);
self.stream.write_all(&frame).await?;
let response_frame = read_frame(&mut self.stream).await?;
let mut response_body = Bytes::from(response_frame);
let header_version = Req::Response::header_version(version);
let response_header = ResponseHeader::decode(&mut response_body, header_version)?;
if response_header.correlation_id != correlation_id {
bail!(
"response correlation mismatch: expected {}, got {}",
correlation_id,
response_header.correlation_id
);
}
Req::Response::decode(&mut response_body, version)
}
}
fn select_api_version(
api_key: i16,
broker_range: VersionRange,
local_range: VersionRange,
cap: i16,
) -> Result<i16> {
let local_range = VersionRange {
min: local_range.min,
max: local_range.max.min(cap),
};
let negotiated = broker_range.intersect(&local_range);
if negotiated.is_empty() {
bail!(
"no shared version for API key {}: broker={}, local={}",
api_key,
broker_range,
local_range
);
}
Ok(negotiated.max)
}
fn duration_to_i32_ms(duration: Duration) -> Result<i32> {
i32::try_from(duration.as_millis())
.context("duration does not fit in Kafka's i32 millisecond field")
}
async fn read_frame(stream: &mut TcpStream) -> Result<Vec<u8>> {
let mut header = [0_u8; 4];
stream.read_exact(&mut header).await?;
let frame_len = i32::from_be_bytes(header);
if frame_len < 0 {
bail!("broker returned a negative frame length: {frame_len}");
}
let mut payload = vec![0_u8; usize::try_from(frame_len)?];
stream.read_exact(&mut payload).await?;
Ok(payload)
}
pub async fn poll_until<F>(
consumer: &KafkaConsumer,
predicate: F,
timeout: Duration,
) -> Result<kafkit_client::ConsumerRecords>
where
F: Fn(&kafkit_client::ConsumerRecords) -> bool,
{
let deadline = Instant::now() + timeout;
loop {
let records = consumer.poll_for(Duration::from_secs(1)).await?;
if predicate(&records) {
return Ok(records);
}
if Instant::now() >= deadline {
bail!("timed out waiting for Kafka records");
}
}
}
pub async fn poll_until_assignment(
consumer: &KafkaConsumer,
expected: BTreeSet<TopicPartition>,
timeout: Duration,
) -> Result<()> {
let deadline = Instant::now() + timeout;
loop {
let _ = consumer.poll_for(Duration::from_millis(200)).await?;
let assignment = consumer.assignment().await?;
if assignment == expected {
return Ok(());
}
if Instant::now() >= deadline {
bail!(
"timed out waiting for assignment {:?}; last assignment was {:?}",
expected,
assignment
);
}
}
}
pub async fn poll_until_consumer_error(
consumer: &KafkaConsumer,
timeout: Duration,
) -> Result<kafkit_client::Error> {
let deadline = Instant::now() + timeout;
loop {
match consumer.poll_for(Duration::from_millis(200)).await {
Ok(_) if Instant::now() >= deadline => {
bail!("timed out waiting for consumer error");
}
Ok(_) => {}
Err(error) => return Ok(error),
}
}
}
pub async fn collect_values(
consumer: &KafkaConsumer,
timeout: Duration,
target_count: usize,
) -> Result<Vec<String>> {
let deadline = Instant::now() + timeout;
let mut values = Vec::new();
loop {
let records = consumer.poll_for(Duration::from_millis(200)).await?;
values.extend(records.iter().filter_map(|record| {
record
.value
.as_ref()
.and_then(|value| std::str::from_utf8(value).ok())
.map(ToOwned::to_owned)
}));
if values.len() >= target_count || Instant::now() >= deadline {
values.sort();
values.dedup();
return Ok(values);
}
}
}
pub fn expected_assignment(topic: &str, partition_count: i32) -> BTreeSet<TopicPartition> {
(0..partition_count)
.map(|partition| TopicPartition::new(topic.to_owned(), partition))
.collect()
}
pub async fn poll_until_with_api<F>(
consumer: &KafkaConsumer,
predicate: F,
timeout: Duration,
) -> Result<kafkit_client::ConsumerRecords>
where
F: Fn(&kafkit_client::ConsumerRecords) -> bool,
{
let deadline = Instant::now() + timeout;
loop {
let records = consumer.poll().await?;
if predicate(&records) {
return Ok(records);
}
if Instant::now() >= deadline {
bail!("timed out waiting for Kafka records");
}
}
}
pub async fn poll_until_with_admin<F>(
admin: &KafkaAdmin,
predicate: F,
timeout: Duration,
) -> Result<Vec<kafkit_client::TopicListing>>
where
F: Fn(&[kafkit_client::TopicListing]) -> bool,
{
let deadline = Instant::now() + timeout;
loop {
let topics = admin.list_topics().await?;
if predicate(&topics) {
return Ok(topics);
}
if Instant::now() >= deadline {
bail!("timed out waiting for Kafka topic metadata");
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
pub async fn poll_describe_until<F>(
admin: &KafkaAdmin,
topics: &[String],
predicate: F,
timeout: Duration,
) -> Result<Vec<kafkit_client::TopicDescription>>
where
F: Fn(&[kafkit_client::TopicDescription]) -> bool,
{
let deadline = Instant::now() + timeout;
loop {
match admin.describe_topics(topics.iter().cloned()).await {
Ok(described) if predicate(&described) => return Ok(described),
Ok(_) => {}
Err(_) if Instant::now() < deadline => {}
Err(error) => return Err(error.into()),
}
if Instant::now() >= deadline {
bail!("timed out waiting for Kafka topic descriptions");
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
}