#![allow(clippy::doc_markdown)]
use assert2::assert;
use std::time::Duration;
use bytes::BytesMut;
use crabka_client_core::{ClientError, Connection, ConnectionOptions, MockBroker};
use crabka_protocol::Encode;
use crabka_protocol::owned::api_versions_response::{ApiVersion, ApiVersionsResponse};
use crabka_protocol::owned::metadata_request::MetadataRequest;
use crabka_protocol::owned::metadata_response::MetadataResponse;
use crabka_protocol::owned::api_versions_request;
use crabka_protocol::owned::metadata_request as metadata_request_mod;
fn api_versions_response_v0() -> Vec<u8> {
let resp = ApiVersionsResponse {
error_code: 0,
api_keys: vec![
ApiVersion {
api_key: api_versions_request::API_KEY,
min_version: 0,
max_version: 3,
..Default::default()
},
ApiVersion {
api_key: metadata_request_mod::API_KEY,
min_version: 0,
max_version: 12,
..Default::default()
},
],
..Default::default()
};
let mut buf = BytesMut::new();
resp.encode(&mut buf, 0).unwrap();
buf.to_vec()
}
fn metadata_response_at(version: i16) -> Vec<u8> {
metadata_response_with_throttle(version, 0)
}
fn metadata_response_with_throttle(version: i16, throttle_time_ms: i32) -> Vec<u8> {
use crabka_protocol::owned::metadata_response::FLEXIBLE_MIN;
let resp = MetadataResponse {
throttle_time_ms,
..Default::default()
};
let mut buf = BytesMut::new();
if version >= FLEXIBLE_MIN {
buf.extend_from_slice(&[0x00u8]); }
resp.encode(&mut buf, version).unwrap();
buf.to_vec()
}
#[tokio::test]
async fn connect_negotiates_api_versions() {
let mock = MockBroker::start(|api_key, _version, _corr_id, _body| {
assert!(api_key == api_versions_request::API_KEY);
Some(api_versions_response_v0())
})
.await;
let conn = Connection::connect(mock.addr, ConnectionOptions::default())
.await
.unwrap();
assert!(!conn.versions().is_empty());
assert!(conn.versions().broker_range(api_versions_request::API_KEY) == Some((0, 3)));
assert!(conn.versions().broker_range(metadata_request_mod::API_KEY) == Some((0, 12)));
conn.close();
mock.stop();
}
#[tokio::test]
async fn timeout_when_handler_silent() {
let mock = MockBroker::start(|_api_key, _version, _corr_id, _body| {
None
})
.await;
let opts = ConnectionOptions {
connect_timeout: Duration::from_millis(200),
request_timeout: Duration::from_secs(30),
..ConnectionOptions::default()
};
match Connection::connect(mock.addr, opts).await {
Err(ClientError::Timeout(_)) => { }
Err(other) => panic!("expected Timeout, got: {other:?}"),
Ok(_conn) => panic!("connect should have timed out but succeeded"),
}
mock.stop();
}
#[tokio::test]
async fn round_trip_metadata_request() {
let mock = MockBroker::start(|api_key, version, _corr_id, _body| {
if api_key == api_versions_request::API_KEY {
return Some(api_versions_response_v0());
}
if api_key == metadata_request_mod::API_KEY {
return Some(metadata_response_at(version));
}
None
})
.await;
let conn = Connection::connect(mock.addr, ConnectionOptions::default())
.await
.unwrap();
let _resp = conn.send(MetadataRequest::default()).await.unwrap();
conn.close();
mock.stop();
}
#[tokio::test]
async fn concurrent_sends_get_correct_responses() {
use std::sync::Arc;
use std::sync::atomic::{AtomicI32, Ordering};
let counter = Arc::new(AtomicI32::new(0));
let counter_for_mock = Arc::clone(&counter);
let mock = MockBroker::start(move |api_key, version, _corr_id, _body| {
if api_key == api_versions_request::API_KEY {
return Some(api_versions_response_v0());
}
if api_key == metadata_request_mod::API_KEY {
let n = counter_for_mock.fetch_add(1, Ordering::Relaxed);
return Some(metadata_response_with_throttle(version, n));
}
None
})
.await;
let conn = Connection::connect(mock.addr, ConnectionOptions::default())
.await
.unwrap();
let (r1, r2, r3) = tokio::join!(
conn.send(MetadataRequest::default()),
conn.send(MetadataRequest::default()),
conn.send(MetadataRequest::default()),
);
let r1 = r1.unwrap();
let r2 = r2.unwrap();
let r3 = r3.unwrap();
let mut seen = [
r1.throttle_time_ms,
r2.throttle_time_ms,
r3.throttle_time_ms,
];
seen.sort_unstable();
assert!(
seen == [0, 1, 2],
"each concurrent send must get a distinct response"
);
conn.close();
mock.stop();
}
#[tokio::test]
async fn client_refresh_metadata_populates_pool() {
use crabka_protocol::owned::metadata_response::{
FLEXIBLE_MIN, MetadataResponse, MetadataResponseBroker,
};
let mock = MockBroker::start(move |api_key, version, _corr_id, _body| {
if api_key == api_versions_request::API_KEY {
return Some(api_versions_response_v0());
}
if api_key == metadata_request_mod::API_KEY {
let resp = MetadataResponse {
brokers: vec![
MetadataResponseBroker {
node_id: 1,
host: "127.0.0.1".into(),
port: 9092,
..Default::default()
},
MetadataResponseBroker {
node_id: 2,
host: "127.0.0.1".into(),
port: 9093,
..Default::default()
},
],
..Default::default()
};
let mut buf = BytesMut::new();
if version >= FLEXIBLE_MIN {
buf.extend_from_slice(&[0x00u8]);
}
resp.encode(&mut buf, version).unwrap();
return Some(buf.to_vec());
}
None
})
.await;
let client = crabka_client_core::Client::builder()
.bootstrap(mock.addr.to_string())
.build()
.await
.unwrap();
let metadata = client.refresh_metadata().await.unwrap();
assert!(
metadata.brokers.len() == 2,
"expected 2 brokers in metadata"
);
client.close();
mock.stop();
}