testcontainers_modules/pulsar/
mod.rs1use std::{borrow::Cow, collections::BTreeMap};
2
3use testcontainers::{
4 core::{CmdWaitFor, ContainerPort, ContainerState, ExecCommand, Mount, WaitFor},
5 Image, TestcontainersError,
6};
7
8const NAME: &str = "apachepulsar/pulsar";
9const TAG: &str = "2.10.6";
10
11const PULSAR_PORT: ContainerPort = ContainerPort::Tcp(6650);
12const ADMIN_PORT: ContainerPort = ContainerPort::Tcp(8080);
13
14#[derive(Debug, Clone)]
32pub struct Pulsar {
33 data_mount: Mount,
34 env: BTreeMap<String, String>,
35 admin_commands: Vec<Vec<String>>,
36}
37
38impl Default for Pulsar {
39 fn default() -> Self {
43 Self {
44 data_mount: Mount::tmpfs_mount("/pulsar/data"),
45 env: BTreeMap::new(),
46 admin_commands: vec![],
47 }
48 }
49}
50
51impl Pulsar {
52 pub fn with_config_env(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
57 self.env
58 .insert(format!("PULSAR_PREFIX_{}", name.into()), value.into());
59 self
60 }
61
62 pub fn with_admin_command(
64 mut self,
65 command: impl IntoIterator<Item = impl Into<String>>,
66 ) -> Self {
67 let mut vec: Vec<String> = command.into_iter().map(Into::into).collect();
68 vec.insert(0, "bin/pulsar-admin".to_string());
69 self.admin_commands.push(vec);
70 self
71 }
72
73 pub fn with_tenant(self, tenant: impl Into<String>) -> Self {
75 let tenant = tenant.into();
76 self.with_admin_command(["tenants", "create", &tenant])
77 }
78
79 pub fn with_namespace(self, namespace: impl Into<String>) -> Self {
81 let namespace = namespace.into();
82 self.with_admin_command(["namespaces", "create", &namespace])
83 }
84
85 pub fn with_topic(self, topic: impl Into<String>) -> Self {
87 let topic = topic.into();
88 self.with_admin_command(["topics", "create", &topic])
89 }
90}
91
92impl Image for Pulsar {
93 fn name(&self) -> &str {
94 NAME
95 }
96
97 fn tag(&self) -> &str {
98 TAG
99 }
100
101 fn ready_conditions(&self) -> Vec<WaitFor> {
102 vec![
103 WaitFor::message_on_stdout("HTTP Service started at"),
104 WaitFor::message_on_stdout("messaging service is ready"),
105 ]
106 }
107
108 fn mounts(&self) -> impl IntoIterator<Item = &Mount> {
109 [&self.data_mount]
110 }
111
112 fn env_vars(
113 &self,
114 ) -> impl IntoIterator<Item = (impl Into<Cow<'_, str>>, impl Into<Cow<'_, str>>)> {
115 &self.env
116 }
117
118 fn cmd(&self) -> impl IntoIterator<Item = impl Into<Cow<'_, str>>> {
119 [
120 "sh",
121 "-c",
122 "bin/apply-config-from-env.py conf/standalone.conf && bin/pulsar standalone",
123 ]
124 }
125
126 fn exec_after_start(
127 &self,
128 _cs: ContainerState,
129 ) -> Result<Vec<ExecCommand>, TestcontainersError> {
130 Ok(self
131 .admin_commands
132 .iter()
133 .map(|cmd| ExecCommand::new(cmd).with_cmd_ready_condition(CmdWaitFor::exit_code(0)))
134 .collect())
135 }
136
137 fn expose_ports(&self) -> &[ContainerPort] {
138 &[PULSAR_PORT, ADMIN_PORT]
139 }
140}
141
142#[cfg(test)]
143mod tests {
144 use futures::StreamExt;
145 use pulsar::{
146 producer::Message, Consumer, DeserializeMessage, Error, Payload, SerializeMessage,
147 TokioExecutor,
148 };
149 use serde::{Deserialize, Serialize};
150
151 use super::*;
152 use crate::testcontainers::runners::AsyncRunner;
153
154 #[derive(Serialize, Deserialize)]
155 struct TestData {
156 data: String,
157 }
158
159 impl DeserializeMessage for TestData {
160 type Output = Result<TestData, serde_json::Error>;
161
162 fn deserialize_message(payload: &Payload) -> Self::Output {
163 serde_json::from_slice(&payload.data)
164 }
165 }
166
167 impl SerializeMessage for TestData {
168 fn serialize_message(input: Self) -> Result<Message, Error> {
169 Ok(Message {
170 payload: serde_json::to_vec(&input).map_err(|e| Error::Custom(e.to_string()))?,
171 ..Default::default()
172 })
173 }
174 }
175
176 #[tokio::test]
177 async fn pulsar_subscribe_and_publish() -> Result<(), Box<dyn std::error::Error + 'static>> {
178 let topic = "persistent://test/test-ns/test-topic";
179
180 let pulsar = Pulsar::default()
181 .with_tenant("test")
182 .with_namespace("test/test-ns")
183 .with_topic(topic)
184 .start()
185 .await
186 .unwrap();
187
188 let endpoint = format!(
189 "pulsar://0.0.0.0:{}",
190 pulsar.get_host_port_ipv4(6650).await?
191 );
192 let client = pulsar::Pulsar::builder(endpoint, TokioExecutor)
193 .build()
194 .await?;
195
196 let mut consumer: Consumer<TestData, _> =
197 client.consumer().with_topic(topic).build().await?;
198
199 let mut producer = client.producer().with_topic(topic).build().await?;
200
201 producer
202 .send_non_blocking(TestData {
203 data: "test".to_string(),
204 })
205 .await?
206 .await?;
207
208 let data = consumer.next().await.unwrap()?.deserialize()?;
209 assert_eq!("test", data.data);
210
211 Ok(())
212 }
213
214 #[tokio::test]
215 async fn pulsar_config() -> Result<(), Box<dyn std::error::Error + 'static>> {
216 let topic = "persistent://test/test-ns/test-topic";
217
218 let pulsar = Pulsar::default()
219 .with_tenant("test")
220 .with_namespace("test/test-ns")
221 .with_config_env("allowAutoTopicCreation", "false")
222 .start()
223 .await
224 .unwrap();
225
226 let endpoint = format!(
227 "pulsar://0.0.0.0:{}",
228 pulsar.get_host_port_ipv4(6650).await?
229 );
230 let client = pulsar::Pulsar::builder(endpoint, TokioExecutor)
231 .build()
232 .await?;
233
234 let producer = client.producer().with_topic(topic).build().await;
235
236 match producer {
237 Ok(_) => panic!("Producer should return error"),
238 Err(e) => assert_eq!("Connection error: Server error (Some(TopicNotFound)): Topic not found persistent://test/test-ns/test-topic", e.to_string()),
239 }
240
241 Ok(())
242 }
243}