mod testsupport;
use nom::AsBytes;
use samsa::prelude::{self, ClusterMetadata};
use samsa::prelude::{protocol, BrokerAddress, BrokerConnection, Error, KafkaCode, TcpConnection};
use std::collections::HashMap;
const CLIENT_ID: &str = "offset protocol integration test";
const CORRELATION_ID: i32 = 1;
const GROUP_ID: &str = "offset integration test";
const PARTITION_ID: i32 = 0;
const OFFSET: i64 = 42;
#[tokio::test]
async fn it_can_commit_and_fetch_offsets() -> Result<(), Box<Error>> {
let (skip, brokers, topic) = testsupport::get_brokers_and_topic()?;
if skip {
return Ok(());
}
let mut metadata = ClusterMetadata::new(
brokers.clone(),
CORRELATION_ID,
CLIENT_ID.to_owned(),
vec![],
)
.await?;
let conn: &mut TcpConnection = metadata
.broker_connections
.get_mut(&metadata.controller_id)
.unwrap();
testsupport::ensure_topic_creation(conn.clone(), &topic, CORRELATION_ID, CLIENT_ID).await?;
let coordinator_req =
protocol::FindCoordinatorRequest::new(CORRELATION_ID, CLIENT_ID, GROUP_ID);
conn.send_request(&coordinator_req).await?;
let coordinator_res =
protocol::FindCoordinatorResponse::try_from(conn.receive_response().await?.freeze())?;
assert_eq!(coordinator_res.error_code, KafkaCode::None);
let host = std::str::from_utf8(coordinator_res.host.as_bytes()).unwrap();
let port = coordinator_res.port;
let mut coordinator_conn = TcpConnection::new(vec![BrokerAddress {
host: host.to_owned(),
port: port.try_into().map_err(|err| {
tracing::error!(
"Error decoding Broker connection port from metadata {:?}",
err
);
Error::MetadataNeedsSync
})?,
}])
.await?;
tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
let mut offset_commit_request = protocol::OffsetCommitRequest::new(
CORRELATION_ID,
CLIENT_ID,
GROUP_ID,
-1,
bytes::Bytes::from(""),
1000,
)?;
offset_commit_request.add(&topic, PARTITION_ID, OFFSET, Some("metadata"));
coordinator_conn
.send_request(&offset_commit_request)
.await?;
let offset_commit_response = protocol::OffsetCommitResponse::try_from(
coordinator_conn.receive_response().await?.freeze(),
)?;
assert_eq!(offset_commit_response.topics.len(), 1);
assert_eq!(offset_commit_response.topics[0].partitions.len(), 1);
assert_eq!(
offset_commit_response.topics[0].partitions[0].error_code,
KafkaCode::None
);
assert!(offset_commit_response.is_error().is_ok());
let mut offset_fetch_req =
protocol::OffsetFetchRequest::new(CORRELATION_ID, CLIENT_ID, GROUP_ID);
offset_fetch_req.add(&topic, PARTITION_ID);
coordinator_conn.send_request(&offset_fetch_req).await?;
let offset_fetch_response = protocol::OffsetFetchResponse::try_from(
coordinator_conn.receive_response().await?.freeze(),
)?;
assert_eq!(offset_fetch_response.error_code, KafkaCode::None);
assert_eq!(offset_fetch_response.topics.len(), 1);
assert_eq!(offset_fetch_response.topics[0].partitions.len(), 1);
assert_eq!(
offset_fetch_response.topics[0].partitions[0].error_code,
KafkaCode::None
);
assert_eq!(
offset_fetch_response.topics[0].partitions[0].committed_offset,
OFFSET
);
let delete_res = prelude::delete_topics(
conn.clone(),
CORRELATION_ID,
CLIENT_ID,
vec![topic.as_str()],
)
.await?;
assert_eq!(delete_res.topics[0].error_code, KafkaCode::None);
Ok(())
}
#[tokio::test]
async fn it_can_commit_and_fetch_offsets_with_functions() -> Result<(), Box<Error>> {
let (skip, brokers, topic) = testsupport::get_brokers_and_topic()?;
if skip {
return Ok(());
}
let mut metadata = ClusterMetadata::new(
brokers.clone(),
CORRELATION_ID,
CLIENT_ID.to_owned(),
vec![],
)
.await?;
let conn: &mut TcpConnection = metadata
.broker_connections
.get_mut(&metadata.controller_id)
.unwrap();
testsupport::ensure_topic_creation(conn.clone(), &topic, CORRELATION_ID, CLIENT_ID).await?;
let coordinator_res =
samsa::prelude::find_coordinator(conn.clone(), CORRELATION_ID, CLIENT_ID, GROUP_ID).await?;
assert_eq!(coordinator_res.error_code, KafkaCode::None);
let host = std::str::from_utf8(coordinator_res.host.as_bytes()).unwrap();
let port = coordinator_res.port;
let coordinator_conn = TcpConnection::new(vec![BrokerAddress {
host: host.to_owned(),
port: port.try_into().map_err(|err| {
tracing::error!(
"Error decoding Broker connection port from metadata {:?}",
err
);
Error::MetadataNeedsSync
})?,
}])
.await?;
tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
let offsets = HashMap::from([((topic.clone(), PARTITION_ID), OFFSET)]);
let offset_commit_response = samsa::prelude::commit_offset(
CORRELATION_ID,
CLIENT_ID,
GROUP_ID,
coordinator_conn.clone(),
-1,
bytes::Bytes::from(""),
offsets,
1000,
)
.await?;
assert_eq!(offset_commit_response.topics.len(), 1);
assert_eq!(offset_commit_response.topics[0].partitions.len(), 1);
assert_eq!(
offset_commit_response.topics[0].partitions[0].error_code,
KafkaCode::None
);
assert!(offset_commit_response.is_error().is_ok());
let topic_partitions = HashMap::from([(topic.clone(), vec![PARTITION_ID])]);
let offset_fetch_response = samsa::prelude::fetch_offset(
CORRELATION_ID,
CLIENT_ID,
GROUP_ID,
coordinator_conn.clone(),
&topic_partitions,
)
.await?;
assert_eq!(offset_fetch_response.error_code, KafkaCode::None);
assert_eq!(offset_fetch_response.topics.len(), 1);
assert_eq!(offset_fetch_response.topics[0].partitions.len(), 1);
assert_eq!(
offset_fetch_response.topics[0].partitions[0].error_code,
KafkaCode::None
);
assert_eq!(
offset_fetch_response.topics[0].partitions[0].committed_offset,
OFFSET
);
prelude::delete_topics(
conn.clone(),
CORRELATION_ID,
CLIENT_ID,
vec![topic.as_str()],
)
.await?;
Ok(())
}