use assert2::assert;
use std::time::Duration;
use testcontainers::runners::AsyncRunner;
use testcontainers_modules::kafka::{KAFKA_PORT, Kafka};
use crabka_client_core::{Client, ClientError};
use crabka_protocol::owned::api_versions_request::ApiVersionsRequest;
const BOOTSTRAP_MAX_ATTEMPTS: u32 = 15;
const BOOTSTRAP_RETRY_DELAY: Duration = Duration::from_secs(1);
fn init_tracing() {
let _ = tracing_subscriber::fmt()
.with_env_filter("crabka_client_core=debug,info")
.with_test_writer()
.try_init();
}
async fn start_kafka() -> (testcontainers::ContainerAsync<Kafka>, String) {
let kafka = Kafka::default()
.start()
.await
.expect("kafka container failed to start");
let port = kafka
.get_host_port_ipv4(KAFKA_PORT)
.await
.expect("failed to get mapped port");
(kafka, format!("127.0.0.1:{port}"))
}
async fn bootstrap_client(bootstrap: &str) -> Client {
let mut last_err: Option<ClientError> = None;
for attempt in 1..=BOOTSTRAP_MAX_ATTEMPTS {
let client = Client::builder()
.bootstrap(bootstrap)
.client_id("crabka-integration")
.build()
.await
.expect("client build failed");
match client.send(ApiVersionsRequest::default()).await {
Ok(_) => {
tracing::info!(attempt, "broker ready, bootstrap ApiVersions succeeded");
return client;
}
Err(ClientError::Disconnected) => {
tracing::warn!(
attempt,
max = BOOTSTRAP_MAX_ATTEMPTS,
"bootstrap ApiVersions returned Disconnected; broker likely still warming up"
);
client.close();
last_err = Some(ClientError::Disconnected);
tokio::time::sleep(BOOTSTRAP_RETRY_DELAY).await;
}
Err(e) => panic!("bootstrap ApiVersions failed with non-retryable error: {e}"),
}
}
panic!(
"broker never accepted ApiVersions after {BOOTSTRAP_MAX_ATTEMPTS} attempts; \
last error: {last_err:?}"
);
}
#[tokio::test]
#[ignore = "requires Docker"]
async fn api_versions_against_real_broker() {
init_tracing();
let (kafka, bootstrap) = start_kafka().await;
let client = bootstrap_client(&bootstrap).await;
let resp = client
.send(ApiVersionsRequest {
client_software_name: "crabka".into(),
client_software_version: env!("CARGO_PKG_VERSION").into(),
..Default::default()
})
.await
.expect("ApiVersions failed");
assert!(
resp.error_code == 0,
"ApiVersions returned error: {}",
resp.error_code
);
assert!(!resp.api_keys.is_empty(), "broker advertised no APIs");
assert!(
resp.api_keys.iter().any(|k| k.api_key == 18),
"ApiVersions key 18 not found in response"
);
client.close();
drop(kafka);
}
#[tokio::test]
#[ignore = "requires Docker"]
async fn metadata_against_real_broker() {
init_tracing();
let (kafka, bootstrap) = start_kafka().await;
let client = bootstrap_client(&bootstrap).await;
let resp = client
.refresh_metadata()
.await
.expect("refresh_metadata failed");
assert!(
!resp.brokers.is_empty(),
"expected at least one broker in metadata"
);
client.close();
drop(kafka);
}
#[tokio::test]
#[ignore = "requires Docker"]
async fn create_then_delete_topic() {
use crabka_protocol::owned::create_topics_request::{CreatableTopic, CreateTopicsRequest};
use crabka_protocol::owned::delete_topics_request::{DeleteTopicState, DeleteTopicsRequest};
init_tracing();
let (kafka, bootstrap) = start_kafka().await;
let client = bootstrap_client(&bootstrap).await;
let create = CreateTopicsRequest {
topics: vec![CreatableTopic {
name: "crabka-test-topic".into(),
num_partitions: 1,
replication_factor: 1,
..Default::default()
}],
timeout_ms: 5_000,
..Default::default()
};
let create_resp = client.send(create).await.expect("CreateTopics failed");
let topic_result = &create_resp.topics[0];
assert!(
topic_result.error_code == 0,
"CreateTopics error: {topic_result:?}"
);
let delete = DeleteTopicsRequest {
topics: vec![DeleteTopicState {
name: Some("crabka-test-topic".into()),
..Default::default()
}],
topic_names: vec!["crabka-test-topic".into()],
timeout_ms: 5_000,
..Default::default()
};
let delete_resp = client.send(delete).await.expect("DeleteTopics failed");
let del_result = &delete_resp.responses[0];
assert!(
del_result.error_code == 0,
"DeleteTopics error: {del_result:?}"
);
client.close();
drop(kafka);
}
#[tokio::test]
#[ignore = "requires Docker"]
async fn list_topics() {
use crabka_protocol::owned::metadata_request::MetadataRequest;
init_tracing();
let (kafka, bootstrap) = start_kafka().await;
let client = bootstrap_client(&bootstrap).await;
let resp = client
.send(MetadataRequest::default())
.await
.expect("Metadata failed");
let _ = resp.topics;
client.close();
drop(kafka);
}