testcontainers_modules/kafka/
confluent.rs

1use std::{borrow::Cow, collections::HashMap};
2
3use testcontainers::{
4    core::{ContainerPort, ContainerState, ExecCommand, WaitFor},
5    Image,
6};
7
8const NAME: &str = "confluentinc/cp-kafka";
9const TAG: &str = "6.1.1";
10/// Port that the [`Kafka`] part of the container has internally
11/// Can be rebound externally via [`testcontainers::core::ImageExt::with_mapped_port`]
12///
13/// [`Kafka`]: https://kafka.apache.org/
14pub const KAFKA_PORT: ContainerPort = ContainerPort::Tcp(9093);
15/// Port that the [`Zookeeper`] part of the container has internally
16/// Can be rebound externally via [`testcontainers::core::ImageExt::with_mapped_port`]
17///
18/// [`Zookeeper`]: https://zookeeper.apache.org/
19pub const ZOOKEEPER_PORT: ContainerPort = ContainerPort::Tcp(2181);
20
21#[allow(missing_docs)]
22// not having docs here is currently allowed to address the missing docs problem one place at a time. Helping us by documenting just one of these places helps other devs tremendously
23#[derive(Debug, Clone)]
24pub struct Kafka {
25    env_vars: HashMap<String, String>,
26}
27
28impl Default for Kafka {
29    fn default() -> Self {
30        let mut env_vars = HashMap::new();
31
32        env_vars.insert(
33            "KAFKA_ZOOKEEPER_CONNECT".to_owned(),
34            format!("localhost:{}", ZOOKEEPER_PORT.as_u16()),
35        );
36        env_vars.insert(
37            "KAFKA_LISTENERS".to_owned(),
38            format!(
39                "PLAINTEXT://0.0.0.0:{port},BROKER://0.0.0.0:9092",
40                port = KAFKA_PORT.as_u16(),
41            ),
42        );
43        env_vars.insert(
44            "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP".to_owned(),
45            "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT".to_owned(),
46        );
47        env_vars.insert(
48            "KAFKA_INTER_BROKER_LISTENER_NAME".to_owned(),
49            "BROKER".to_owned(),
50        );
51        env_vars.insert(
52            "KAFKA_ADVERTISED_LISTENERS".to_owned(),
53            format!(
54                "PLAINTEXT://localhost:{port},BROKER://localhost:9092",
55                port = KAFKA_PORT.as_u16(),
56            ),
57        );
58        env_vars.insert("KAFKA_BROKER_ID".to_owned(), "1".to_owned());
59        env_vars.insert(
60            "KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR".to_owned(),
61            "1".to_owned(),
62        );
63
64        Self { env_vars }
65    }
66}
67
68impl Image for Kafka {
69    fn name(&self) -> &str {
70        NAME
71    }
72
73    fn tag(&self) -> &str {
74        TAG
75    }
76
77    fn ready_conditions(&self) -> Vec<WaitFor> {
78        vec![WaitFor::message_on_stdout("Creating new log file")]
79    }
80
81    fn env_vars(
82        &self,
83    ) -> impl IntoIterator<Item = (impl Into<Cow<'_, str>>, impl Into<Cow<'_, str>>)> {
84        &self.env_vars
85    }
86
87    fn cmd(&self) -> impl IntoIterator<Item = impl Into<Cow<'_, str>>> {
88        vec![
89            "/bin/bash".to_owned(),
90            "-c".to_owned(),
91            format!(
92                r#"
93echo 'clientPort={ZOOKEEPER_PORT}' > zookeeper.properties;
94echo 'dataDir=/var/lib/zookeeper/data' >> zookeeper.properties;
95echo 'dataLogDir=/var/lib/zookeeper/log' >> zookeeper.properties;
96zookeeper-server-start zookeeper.properties &
97. /etc/confluent/docker/bash-config &&
98/etc/confluent/docker/configure &&
99/etc/confluent/docker/launch"#,
100                ZOOKEEPER_PORT = ZOOKEEPER_PORT.as_u16()
101            ),
102        ]
103    }
104
105    fn expose_ports(&self) -> &[ContainerPort] {
106        &[KAFKA_PORT]
107    }
108
109    fn exec_after_start(
110        &self,
111        cs: ContainerState,
112    ) -> Result<Vec<ExecCommand>, testcontainers::TestcontainersError> {
113        let mut commands = vec![];
114        let cmd = vec![
115            "kafka-configs".to_string(),
116            "--alter".to_string(),
117            "--bootstrap-server".to_string(),
118            "0.0.0.0:9092".to_string(),
119            "--entity-type".to_string(),
120            "brokers".to_string(),
121            "--entity-name".to_string(),
122            "1".to_string(),
123            "--add-config".to_string(),
124            format!(
125                "advertised.listeners=[PLAINTEXT://127.0.0.1:{},BROKER://localhost:9092]",
126                cs.host_port_ipv4(KAFKA_PORT)?
127            ),
128        ];
129        let ready_conditions = vec![WaitFor::message_on_stdout(
130            "Checking need to trigger auto leader balancing",
131        )];
132        commands.push(ExecCommand::new(cmd).with_container_ready_conditions(ready_conditions));
133        Ok(commands)
134    }
135}
136
137#[cfg(test)]
138mod tests {
139    use std::time::Duration;
140
141    use futures::StreamExt;
142    use rdkafka::{
143        consumer::{Consumer, StreamConsumer},
144        producer::{FutureProducer, FutureRecord},
145        ClientConfig, Message,
146    };
147    use testcontainers::runners::AsyncRunner;
148
149    use crate::kafka;
150
151    #[tokio::test]
152    async fn produce_and_consume_messages() -> Result<(), Box<dyn std::error::Error + 'static>> {
153        let _ = pretty_env_logger::try_init();
154        let kafka_node = kafka::Kafka::default().start().await?;
155
156        let bootstrap_servers = format!(
157            "127.0.0.1:{}",
158            kafka_node.get_host_port_ipv4(kafka::KAFKA_PORT).await?
159        );
160
161        let producer = ClientConfig::new()
162            .set("bootstrap.servers", &bootstrap_servers)
163            .set("message.timeout.ms", "5000")
164            .create::<FutureProducer>()
165            .expect("Failed to create Kafka FutureProducer");
166
167        let consumer = ClientConfig::new()
168            .set("group.id", "testcontainer-rs")
169            .set("bootstrap.servers", &bootstrap_servers)
170            .set("session.timeout.ms", "6000")
171            .set("enable.auto.commit", "false")
172            .set("auto.offset.reset", "earliest")
173            .create::<StreamConsumer>()
174            .expect("Failed to create Kafka StreamConsumer");
175
176        let topic = "test-topic";
177
178        let number_of_messages_to_produce = 5_usize;
179        let expected: Vec<String> = (0..number_of_messages_to_produce)
180            .map(|i| format!("Message {i}"))
181            .collect();
182
183        for (i, message) in expected.iter().enumerate() {
184            producer
185                .send(
186                    FutureRecord::to(topic)
187                        .payload(message)
188                        .key(&format!("Key {i}")),
189                    Duration::from_secs(0),
190                )
191                .await
192                .unwrap();
193        }
194
195        consumer
196            .subscribe(&[topic])
197            .expect("Failed to subscribe to a topic");
198
199        let mut message_stream = consumer.stream();
200        for produced in expected {
201            let borrowed_message =
202                tokio::time::timeout(Duration::from_secs(10), message_stream.next())
203                    .await
204                    .unwrap()
205                    .unwrap();
206
207            assert_eq!(
208                produced,
209                borrowed_message
210                    .unwrap()
211                    .payload_view::<str>()
212                    .unwrap()
213                    .unwrap()
214            );
215        }
216
217        Ok(())
218    }
219}