use rand::distributions::{Alphanumeric, DistString};
use crate::config::RDKafkaLogLevel;
use crate::consumer::Consumer;
use crate::message::Message;
use crate::types::RDKafkaErrorCode;
use tracing::{event, Level};
use tracing_test::traced_test;
use crate::{builder::RedpandaBuilder, producer::RedpandaRecord};
pub fn gen_test_builder() -> RedpandaBuilder {
let mut b = RedpandaBuilder::default();
let group_id = Alphanumeric.sample_string(&mut rand::thread_rng(), 16);
b.set_group_id(&group_id);
b
}
#[tokio::test]
#[traced_test]
pub async fn test_consumer_invalid_server() {
let mut b = gen_test_builder();
b.set_bootstrap_servers("localhost:9000");
b.set_socket_timeout_ms(3000);
b.set_socket_connection_setup_timeout_ms(3000);
b.set_rdkafka_log_level(RDKafkaLogLevel::Info);
let err = b.build_consumer();
assert!(err.is_err());
let err_code = err.err().unwrap().rdkafka_error_code().unwrap();
assert!(err_code == RDKafkaErrorCode::BrokerTransportFailure);
}
#[tokio::test]
#[traced_test]
pub async fn test_consumer_some_bad_servers() {
let mut b = gen_test_builder();
b.set_bootstrap_servers("localhost:9000,localhost:9010");
b.set_creation_timeout_ms(3000);
b.set_rdkafka_log_level(RDKafkaLogLevel::Info);
let consumer = b.build_consumer();
assert!(consumer.is_ok());
}
#[tokio::test]
#[traced_test]
pub async fn test_consumer_valid_server() {
let mut b = gen_test_builder();
b.set_bootstrap_servers("localhost:9010,localhost:9011,localhost:9012");
b.set_creation_timeout_ms(3000);
b.set_rdkafka_log_level(RDKafkaLogLevel::Info);
let consumer = b.build_consumer();
assert!(consumer.is_ok());
}
#[tokio::test]
#[traced_test]
pub async fn test_consumer_invalid_topic() {
let mut b = gen_test_builder();
b.set_bootstrap_servers("localhost:9010");
b.set_creation_timeout_ms(3000);
b.set_rdkafka_log_level(RDKafkaLogLevel::Info);
let invalid_topic = "i_do_not_exist";
let consumer = b.build_consumer().unwrap();
let err = consumer.subscribe(&[invalid_topic]);
assert!(err.is_err());
}
#[tokio::test]
#[traced_test]
pub async fn test_metadata_topic_names() {
let mut b = gen_test_builder();
b.set_bootstrap_servers("localhost:9010");
b.set_creation_timeout_ms(3000);
b.set_rdkafka_log_level(RDKafkaLogLevel::Info);
let consumer = b.build_consumer().unwrap();
let metadata = consumer.fetch_metadata().unwrap();
let topic_names = metadata.topic_names();
assert!(topic_names.contains(&"_schemas".to_owned()));
assert!(topic_names.contains(&"__consumer_offsets".to_owned()));
event!(Level::INFO, "{:?}", topic_names);
}
#[tokio::test]
#[traced_test]
pub async fn test_consumer_subscription() {
let mut b = gen_test_builder();
b.set_bootstrap_servers("localhost:9010");
b.set_creation_timeout_ms(3000);
b.set_rdkafka_log_level(RDKafkaLogLevel::Info);
let consumer = b.build_consumer().unwrap();
consumer.subscribe(&["_schemas"]).unwrap();
let subscriptions = consumer.get_subscription_topic_names();
assert_eq!(subscriptions, vec!["_schemas"]);
consumer
.subscribe(&["__consumer_offsets"].to_owned())
.unwrap();
let subscriptions = consumer.get_subscription_topic_names();
assert_eq!(subscriptions, vec!["__consumer_offsets"]);
}
#[tokio::test]
#[traced_test]
pub async fn test_producer_invalid_server() {
let mut b = gen_test_builder();
b.set_bootstrap_servers("localhost:9000");
let err = b.build_producer();
assert!(err.is_err());
let err_code = err.err().unwrap().rdkafka_error_code().unwrap();
assert!(err_code == RDKafkaErrorCode::BrokerTransportFailure);
}
#[tokio::test]
#[traced_test]
pub async fn test_producer_some_bad_servers() {
let mut b = gen_test_builder();
b.set_bootstrap_servers("localhost:9000,localhost:9010");
let consumer = b.build_consumer();
assert!(consumer.is_ok());
}
#[tokio::test]
#[traced_test]
pub async fn test_producer_valid_server() {
let mut b = gen_test_builder();
b.set_bootstrap_servers("localhost:9010,localhost:9011,localhost:9012");
let producer = b.build_producer();
assert!(producer.is_ok());
}
#[tokio::test]
#[traced_test]
pub async fn test_producer_consumer_valid_topic() {
let mut b = gen_test_builder();
b.set_bootstrap_servers("localhost:9010,localhost:9011,localhost:9012");
let producer = b.build_producer().unwrap();
let consumer = b.build_consumer().unwrap();
let admin_client = b.build_admin_client().await.unwrap();
let topic_name = "test_producer_topic";
admin_client.create_topic(topic_name, 3, 3).await.unwrap();
let key = Some(1_u32.to_le_bytes().to_vec());
let payload = 2_u32.to_le_bytes().to_vec();
let record = RedpandaRecord::new(topic_name, key.clone(), payload.clone(), None);
let r = producer.send_result(&record).unwrap();
r.await.unwrap().unwrap();
event!(Level::INFO, "{:?}", consumer.consumer.position().unwrap());
consumer.subscribe(&[topic_name]).unwrap();
event!(Level::INFO, "{:?}", consumer.consumer.position().unwrap());
let msg = consumer.recv().await.unwrap();
event!(Level::INFO, "Got message");
assert_eq!(msg.key().unwrap(), key.unwrap());
assert_eq!(msg.payload().unwrap(), payload);
event!(Level::INFO, "{:?}", consumer.consumer.position().unwrap());
admin_client.delete_topic(topic_name).await.unwrap();
event!(Level::INFO, "Deleted test topic");
}
#[tokio::test]
#[traced_test]
pub async fn test_producer_record() {
let mut b = gen_test_builder();
b.set_bootstrap_servers("localhost:9010,localhost:9011,localhost:9012");
let producer = b.build_producer().unwrap();
let consumer = b.build_consumer().unwrap();
let admin_client = b.build_admin_client().await.unwrap();
let topic_name = "test_record_topic";
admin_client.create_topic(topic_name, 3, 3).await.unwrap();
let key = 1_u32.to_le_bytes().to_vec();
let payload = 2_u32.to_le_bytes().to_vec();
let r = RedpandaRecord::new(topic_name, Some(key.clone()), payload.clone(), None);
let delivery_future = producer.send_result(&r);
delivery_future.unwrap().await.unwrap().unwrap();
let r = RedpandaRecord::new(topic_name, None, payload.clone(), None);
let delivery_future = producer.send_result(&r);
delivery_future.unwrap().await.unwrap().unwrap();
event!(Level::INFO, "{:?}", consumer.consumer.position().unwrap());
consumer.subscribe(&[topic_name]).unwrap();
event!(Level::INFO, "{:?}", consumer.consumer.position().unwrap());
let msg = consumer.recv().await.unwrap();
event!(Level::INFO, "Got message");
assert_eq!(msg.key().unwrap(), key);
assert_eq!(msg.payload().unwrap(), payload);
event!(Level::INFO, "{:?}", consumer.consumer.position().unwrap());
admin_client.delete_topic(topic_name).await.unwrap();
event!(Level::INFO, "Deleted test topic");
}
#[tokio::test]
#[traced_test]
pub async fn test_admin_invalid_server() {
let mut b = gen_test_builder();
b.set_bootstrap_servers("localhost:9000");
let err = b.build_admin_client().await;
assert!(err.is_err());
}
#[tokio::test]
#[traced_test]
pub async fn test_admin_some_bad_servers() {
let mut b = gen_test_builder();
b.set_bootstrap_servers("localhost:9000,localhost:9010");
let admin_client = b.build_admin_client().await;
assert!(admin_client.is_ok());
}
#[tokio::test]
#[traced_test]
pub async fn test_admin_valid_server() {
let mut b = gen_test_builder();
b.set_bootstrap_servers("localhost:9010,localhost:9011,localhost:9012");
let admin_client = b.build_admin_client().await;
assert!(admin_client.is_ok());
}
#[tokio::test]
#[traced_test]
pub async fn test_admin_create_delete_topic() {
let mut b = gen_test_builder();
b.set_bootstrap_servers("localhost:9010,localhost:9011,localhost:9012");
let admin_client = b.build_admin_client().await.unwrap();
let topic_name = "test_test_test";
let create_topic = admin_client.create_topic(topic_name, 3, 3).await;
assert!(create_topic.is_ok());
let consumer = b.build_consumer().unwrap();
let topic_names = consumer.fetch_metadata().unwrap().topic_names();
assert!(topic_names.contains(&topic_name.to_owned()));
let delete_topic = admin_client.delete_topic(topic_name).await;
assert!(delete_topic.is_ok());
let topic_names = consumer.fetch_metadata().unwrap().topic_names();
assert!(!topic_names.contains(&topic_name.to_owned()));
}