use std::collections::HashMap;
use futures::stream::iter;
use futures::StreamExt;
use samsa::prelude::{self, ClusterMetadata};
use samsa::prelude::{
ConsumerBuilder, Error, KafkaCode, ProduceMessage, ProducerBuilder, TcpConnection,
TopicPartitionsBuilder,
};
mod testsupport;
const CLIENT_ID: &str = "multi partition read and write test";
const CORRELATION_ID: i32 = 1;
const NUMBER_OF_PARTITIONS: i32 = 10;
const CHUNK_SIZE: usize = 10;
#[tokio::test]
async fn multi_partition_writing_and_reading() -> Result<(), Box<Error>> {
let (skip, brokers) = testsupport::get_brokers()?;
if skip {
return Ok(());
}
let mut metadata = ClusterMetadata::new(
brokers.clone(),
CORRELATION_ID,
CLIENT_ID.to_owned(),
vec![],
)
.await?;
let conn: &mut TcpConnection = metadata
.broker_connections
.get_mut(&metadata.controller_id)
.unwrap();
let topic_name = testsupport::create_topic_from_file_path(file!())?;
let create_res = prelude::create_topics(
conn.clone(),
CORRELATION_ID,
CLIENT_ID,
HashMap::from([(topic_name.as_str(), NUMBER_OF_PARTITIONS)]),
)
.await?;
if create_res.topics[0].error_code != KafkaCode::TopicAlreadyExists {
assert_eq!(create_res.topics[0].error_code, KafkaCode::None);
}
let inner_topic = topic_name.clone();
let stream = iter(0..NUMBER_OF_PARTITIONS).map(move |i| ProduceMessage {
topic: inner_topic.clone(),
partition_id: i,
key: None,
value: Some(bytes::Bytes::from_static(b"0123456789")),
headers: vec![],
});
let output_stream =
ProducerBuilder::<TcpConnection>::new(brokers.clone(), vec![topic_name.clone()])
.await?
.required_acks(1)
.clone()
.build_from_stream(stream.chunks(CHUNK_SIZE))
.await;
tokio::pin!(output_stream);
while let Some(message) = output_stream.next().await {
let res = message[0].as_ref().unwrap();
assert_eq!(res.responses.len(), 1);
assert_eq!(
res.responses[0].name,
bytes::Bytes::from(topic_name.clone())
);
assert_eq!(
res.responses[0].partition_responses[0].error_code,
KafkaCode::None
);
}
let partitions: Vec<i32> = (0..NUMBER_OF_PARTITIONS).collect();
let stream = ConsumerBuilder::<TcpConnection>::new(
brokers.clone(),
TopicPartitionsBuilder::new()
.assign(topic_name.to_string(), partitions)
.build(),
)
.await?
.build()
.into_stream();
tokio::pin!(stream);
while let Some(message) = stream.next().await {
let mut res = message.unwrap();
match res.next() {
None => break,
Some(r) => {
assert_eq!(r.topic_name, topic_name.to_string());
assert_eq!(r.value, bytes::Bytes::from_static(b"0123456789"));
}
}
}
prelude::delete_topics(
conn.clone(),
CORRELATION_ID,
CLIENT_ID,
vec![topic_name.as_str()],
)
.await?;
Ok(())
}