use super::*;
use crate::nodes::{NodeOperators, StreamOperators};
use crate::types::Burst;
use crate::{RunFor, RunMode, ValueAt, burst};
use rdkafka::Message;
use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication};
use rdkafka::config::ClientConfig;
use rdkafka::consumer::{Consumer, StreamConsumer};
use rdkafka::producer::{FutureProducer, FutureRecord};
use std::rc::Rc;
use std::time::Duration;
use testcontainers::{GenericImage, ImageExt, core::WaitFor, runners::SyncRunner};
const REDPANDA_IMAGE: &str = "docker.redpanda.com/redpandadata/redpanda";
const REDPANDA_TAG: &str = "v24.1.1";
fn free_port() -> anyhow::Result<u16> {
let listener = std::net::TcpListener::bind("127.0.0.1:0")?;
Ok(listener.local_addr()?.port())
}
fn start_redpanda() -> anyhow::Result<(impl Drop, String)> {
let port = free_port()?;
let container = GenericImage::new(REDPANDA_IMAGE, REDPANDA_TAG)
.with_wait_for(WaitFor::message_on_stderr("Started Kafka API server"))
.with_mapped_port(port, port.into())
.with_cmd(vec![
"redpanda".to_string(),
"start".to_string(),
"--overprovisioned".to_string(),
"--smp".to_string(),
"1".to_string(),
"--memory".to_string(),
"512M".to_string(),
"--reserve-memory".to_string(),
"0M".to_string(),
"--node-id".to_string(),
"0".to_string(),
"--check=false".to_string(),
"--kafka-addr".to_string(),
format!("0.0.0.0:{port}"),
"--advertise-kafka-addr".to_string(),
format!("127.0.0.1:{port}"),
])
.start()?;
let endpoint = format!("127.0.0.1:{port}");
Ok((container, endpoint))
}
fn create_topic(brokers: &str, topic: &str, partitions: i32) -> anyhow::Result<()> {
let rt = tokio::runtime::Runtime::new()?;
rt.block_on(async {
let admin: AdminClient<rdkafka::client::DefaultClientContext> = ClientConfig::new()
.set("bootstrap.servers", brokers)
.create()
.map_err(|e| anyhow::anyhow!("admin create failed: {e}"))?;
let new_topic = NewTopic::new(topic, partitions, TopicReplication::Fixed(1));
admin
.create_topics(&[new_topic], &AdminOptions::new())
.await
.map_err(|e| anyhow::anyhow!("create topic failed: {e}"))?;
tokio::time::sleep(Duration::from_millis(500)).await;
Ok(())
})
}
fn produce_messages(brokers: &str, topic: &str, messages: &[(&str, &str)]) -> anyhow::Result<()> {
let rt = tokio::runtime::Runtime::new()?;
rt.block_on(async {
let producer: FutureProducer = ClientConfig::new()
.set("bootstrap.servers", brokers)
.set("message.timeout.ms", "5000")
.create()
.map_err(|e| anyhow::anyhow!("producer create failed: {e}"))?;
for (key, value) in messages {
producer
.send(
FutureRecord::to(topic).key(*key).payload(*value),
Duration::from_secs(5),
)
.await
.map_err(|(e, _)| anyhow::anyhow!("produce failed: {e}"))?;
}
Ok(())
})
}
fn consume_messages(
brokers: &str,
topic: &str,
group_id: &str,
max: usize,
) -> anyhow::Result<Vec<(Option<Vec<u8>>, Vec<u8>)>> {
let rt = tokio::runtime::Runtime::new()?;
rt.block_on(async {
let consumer: StreamConsumer = ClientConfig::new()
.set("bootstrap.servers", brokers)
.set("group.id", group_id)
.set("auto.offset.reset", "earliest")
.set("session.timeout.ms", "6000")
.create()
.map_err(|e| anyhow::anyhow!("consumer create failed: {e}"))?;
consumer
.subscribe(&[topic])
.map_err(|e| anyhow::anyhow!("subscribe failed: {e}"))?;
let mut results = Vec::new();
let deadline = tokio::time::Instant::now() + Duration::from_secs(10);
loop {
if results.len() >= max || tokio::time::Instant::now() >= deadline {
break;
}
match tokio::time::timeout(Duration::from_secs(2), consumer.recv()).await {
Ok(Ok(msg)) => {
results.push((
msg.key().map(|k| k.to_vec()),
msg.payload().unwrap_or_default().to_vec(),
));
}
Ok(Err(e)) => return Err(anyhow::anyhow!("consume error: {e}")),
Err(_) => break, }
}
Ok(results)
})
}
fn make_burst(topic: &str, key: &[u8], value: &[u8]) -> Rc<dyn crate::Stream<Burst<KafkaRecord>>> {
crate::nodes::constant(burst![KafkaRecord {
topic: topic.to_string(),
key: Some(key.to_vec()),
value: value.to_vec(),
}])
}
#[test]
fn test_connection_refused() {
let conn = KafkaConnection::new("127.0.0.1:59999");
let result = kafka_sub(conn, "nonexistent", "test-group")
.collapse()
.collect()
.run(RunMode::RealTime, RunFor::Duration(Duration::from_secs(5)));
let _ = result;
}
fn flatten_events(ticks: &[ValueAt<Burst<KafkaEvent>>]) -> Vec<KafkaEvent> {
ticks.iter().flat_map(|t| t.value.iter().cloned()).collect()
}
#[test]
fn test_sub_receives_pre_seeded_messages() -> anyhow::Result<()> {
let (_container, brokers) = start_redpanda()?;
let topic = "test-sub-seeded";
create_topic(&brokers, topic, 1)?;
produce_messages(&brokers, topic, &[("k1", "v1"), ("k2", "v2")])?;
let conn = KafkaConnection::new(&brokers);
let collected = kafka_sub(conn, topic, "sub-seeded-group").collect();
collected
.clone()
.run(RunMode::RealTime, RunFor::Duration(Duration::from_secs(20)))?;
let events = flatten_events(&collected.peek_value());
assert!(
events.len() >= 2,
"expected at least 2 events, got {}",
events.len()
);
let values: Vec<Vec<u8>> = events.iter().map(|e| e.value.clone()).collect();
assert!(values.contains(&b"v1".to_vec()));
assert!(values.contains(&b"v2".to_vec()));
Ok(())
}
#[test]
fn test_sub_live_messages() -> anyhow::Result<()> {
let (_container, brokers) = start_redpanda()?;
let topic = "test-sub-live";
create_topic(&brokers, topic, 1)?;
let conn = KafkaConnection::new(&brokers);
let brokers_clone = brokers.clone();
let topic_owned = topic.to_string();
let handle = std::thread::spawn(move || {
std::thread::sleep(Duration::from_secs(3));
produce_messages(&brokers_clone, &topic_owned, &[("live-key", "live-value")]).unwrap();
});
let collected = kafka_sub(conn, topic, "sub-live-group").collect();
collected
.clone()
.run(RunMode::RealTime, RunFor::Duration(Duration::from_secs(20)))?;
handle.join().unwrap();
let events = flatten_events(&collected.peek_value());
assert!(!events.is_empty(), "expected at least 1 live event, got 0");
assert_eq!(events[0].value, b"live-value");
Ok(())
}
#[test]
fn test_pub_round_trip() -> anyhow::Result<()> {
let (_container, brokers) = start_redpanda()?;
let topic = "test-pub-rt";
create_topic(&brokers, topic, 1)?;
let conn = KafkaConnection::new(&brokers);
let source = make_burst(topic, b"rt-key", b"rt-value");
kafka_pub(conn, &source).run(RunMode::RealTime, RunFor::Cycles(1))?;
let messages = consume_messages(&brokers, topic, "rt-verify-group", 1)?;
assert_eq!(messages.len(), 1);
assert_eq!(messages[0].0.as_deref(), Some(b"rt-key".as_ref()));
assert_eq!(messages[0].1, b"rt-value");
Ok(())
}
#[test]
fn test_pub_multiple_records_in_burst() -> anyhow::Result<()> {
let (_container, brokers) = start_redpanda()?;
let topic = "test-pub-multi";
create_topic(&brokers, topic, 1)?;
let conn = KafkaConnection::new(&brokers);
let source = crate::nodes::constant(burst![
KafkaRecord {
topic: topic.to_string(),
key: Some(b"k1".to_vec()),
value: b"v1".to_vec(),
},
KafkaRecord {
topic: topic.to_string(),
key: Some(b"k2".to_vec()),
value: b"v2".to_vec(),
},
]);
kafka_pub(conn, &source).run(RunMode::RealTime, RunFor::Cycles(1))?;
let messages = consume_messages(&brokers, topic, "multi-verify-group", 2)?;
assert_eq!(messages.len(), 2);
let values: Vec<Vec<u8>> = messages.iter().map(|(_, v)| v.clone()).collect();
assert!(values.contains(&b"v1".to_vec()));
assert!(values.contains(&b"v2".to_vec()));
Ok(())
}
#[test]
fn test_sub_event_fields() -> anyhow::Result<()> {
let (_container, brokers) = start_redpanda()?;
let topic = "test-sub-fields";
create_topic(&brokers, topic, 1)?;
produce_messages(&brokers, topic, &[("field-key", "field-value")])?;
let conn = KafkaConnection::new(&brokers);
let collected = kafka_sub(conn, topic, "fields-group").collect();
collected
.clone()
.run(RunMode::RealTime, RunFor::Duration(Duration::from_secs(20)))?;
let events = flatten_events(&collected.peek_value());
assert!(!events.is_empty(), "expected at least 1 event, got 0");
let event = &events[0];
assert_eq!(event.topic, topic);
assert_eq!(event.partition, 0);
assert!(event.offset >= 0);
assert_eq!(event.key.as_deref(), Some(b"field-key".as_ref()));
assert_eq!(event.value, b"field-value");
assert_eq!(event.value_str().unwrap(), "field-value");
assert_eq!(event.key_str().unwrap().unwrap(), "field-key");
Ok(())
}
#[test]
fn test_kafka_record_value_str() {
let record = KafkaRecord {
topic: "t".to_string(),
key: Some(b"k".to_vec()),
value: b"hello".to_vec(),
};
assert_eq!(record.value_str().unwrap(), "hello");
}
#[test]
fn test_kafka_event_no_key() {
let event = KafkaEvent {
topic: "t".to_string(),
partition: 0,
offset: 0,
key: None,
value: b"val".to_vec(),
};
assert!(event.key_str().is_none());
}