testcontainers_modules/pulsar/
mod.rs

1use std::{borrow::Cow, collections::BTreeMap};
2
3use testcontainers::{
4    core::{CmdWaitFor, ContainerPort, ContainerState, ExecCommand, Mount, WaitFor},
5    Image, TestcontainersError,
6};
7
8const NAME: &str = "apachepulsar/pulsar";
9const TAG: &str = "2.10.6";
10
11const PULSAR_PORT: ContainerPort = ContainerPort::Tcp(6650);
12const ADMIN_PORT: ContainerPort = ContainerPort::Tcp(8080);
13
14/// Module to work with [`Apache Pulsar`] inside of tests.
15/// **Requires protoc to be installed, otherwise will not build.**
16///
17/// This module is based on the official [`Apache Pulsar docker image`].
18///
19/// # Example
20/// ```
21/// use testcontainers_modules::{pulsar, testcontainers::runners::SyncRunner};
22///
23/// let pulsar = pulsar::Pulsar::default().start().unwrap();
24/// let http_port = pulsar.get_host_port_ipv4(6650).unwrap();
25///
26/// // do something with the running pulsar instance..
27/// ```
28///
29/// [`Apache Pulsar`]: https://github.com/apache/pulsar
30/// [`Apache Pulsar docker image`]: https://hub.docker.com/r/apachepulsar/pulsar/
31#[derive(Debug, Clone)]
32pub struct Pulsar {
33    data_mount: Mount,
34    env: BTreeMap<String, String>,
35    admin_commands: Vec<Vec<String>>,
36}
37
38impl Default for Pulsar {
39    /**
40     * Creates new standalone pulsar container, with `/pulsar/data` as a temporary volume
41     */
42    fn default() -> Self {
43        Self {
44            data_mount: Mount::tmpfs_mount("/pulsar/data"),
45            env: BTreeMap::new(),
46            admin_commands: vec![],
47        }
48    }
49}
50
51impl Pulsar {
52    /// Add configuration parameter to Pulsar `conf/standalone.conf` through setting environment variable.
53    ///
54    /// Container will rewrite `conf/standalone.conf` file using these variables during startup
55    /// with help of `bin/apply-config-from-env.py` script
56    pub fn with_config_env(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
57        self.env
58            .insert(format!("PULSAR_PREFIX_{}", name.into()), value.into());
59        self
60    }
61
62    /// Runs admin command after container start
63    pub fn with_admin_command(
64        mut self,
65        command: impl IntoIterator<Item = impl Into<String>>,
66    ) -> Self {
67        let mut vec: Vec<String> = command.into_iter().map(Into::into).collect();
68        vec.insert(0, "bin/pulsar-admin".to_string());
69        self.admin_commands.push(vec);
70        self
71    }
72
73    /// Creates tenant after container start
74    pub fn with_tenant(self, tenant: impl Into<String>) -> Self {
75        let tenant = tenant.into();
76        self.with_admin_command(["tenants", "create", &tenant])
77    }
78
79    /// Creates namespace after container start
80    pub fn with_namespace(self, namespace: impl Into<String>) -> Self {
81        let namespace = namespace.into();
82        self.with_admin_command(["namespaces", "create", &namespace])
83    }
84
85    /// Creates topic after container start
86    pub fn with_topic(self, topic: impl Into<String>) -> Self {
87        let topic = topic.into();
88        self.with_admin_command(["topics", "create", &topic])
89    }
90}
91
92impl Image for Pulsar {
93    fn name(&self) -> &str {
94        NAME
95    }
96
97    fn tag(&self) -> &str {
98        TAG
99    }
100
101    fn ready_conditions(&self) -> Vec<WaitFor> {
102        vec![
103            WaitFor::message_on_stdout("HTTP Service started at"),
104            WaitFor::message_on_stdout("messaging service is ready"),
105        ]
106    }
107
108    fn mounts(&self) -> impl IntoIterator<Item = &Mount> {
109        [&self.data_mount]
110    }
111
112    fn env_vars(
113        &self,
114    ) -> impl IntoIterator<Item = (impl Into<Cow<'_, str>>, impl Into<Cow<'_, str>>)> {
115        &self.env
116    }
117
118    fn cmd(&self) -> impl IntoIterator<Item = impl Into<Cow<'_, str>>> {
119        [
120            "sh",
121            "-c",
122            "bin/apply-config-from-env.py conf/standalone.conf && bin/pulsar standalone",
123        ]
124    }
125
126    fn exec_after_start(
127        &self,
128        _cs: ContainerState,
129    ) -> Result<Vec<ExecCommand>, TestcontainersError> {
130        Ok(self
131            .admin_commands
132            .iter()
133            .map(|cmd| ExecCommand::new(cmd).with_cmd_ready_condition(CmdWaitFor::exit_code(0)))
134            .collect())
135    }
136
137    fn expose_ports(&self) -> &[ContainerPort] {
138        &[PULSAR_PORT, ADMIN_PORT]
139    }
140}
141
142#[cfg(test)]
143mod tests {
144    use futures::StreamExt;
145    use pulsar::{
146        producer::Message, Consumer, DeserializeMessage, Error, Payload, SerializeMessage,
147        TokioExecutor,
148    };
149    use serde::{Deserialize, Serialize};
150
151    use super::*;
152    use crate::testcontainers::runners::AsyncRunner;
153
154    #[derive(Serialize, Deserialize)]
155    struct TestData {
156        data: String,
157    }
158
159    impl DeserializeMessage for TestData {
160        type Output = Result<TestData, serde_json::Error>;
161
162        fn deserialize_message(payload: &Payload) -> Self::Output {
163            serde_json::from_slice(&payload.data)
164        }
165    }
166
167    impl SerializeMessage for TestData {
168        fn serialize_message(input: Self) -> Result<Message, Error> {
169            Ok(Message {
170                payload: serde_json::to_vec(&input).map_err(|e| Error::Custom(e.to_string()))?,
171                ..Default::default()
172            })
173        }
174    }
175
176    #[tokio::test]
177    async fn pulsar_subscribe_and_publish() -> Result<(), Box<dyn std::error::Error + 'static>> {
178        let topic = "persistent://test/test-ns/test-topic";
179
180        let pulsar = Pulsar::default()
181            .with_tenant("test")
182            .with_namespace("test/test-ns")
183            .with_topic(topic)
184            .start()
185            .await
186            .unwrap();
187
188        let endpoint = format!(
189            "pulsar://0.0.0.0:{}",
190            pulsar.get_host_port_ipv4(6650).await?
191        );
192        let client = pulsar::Pulsar::builder(endpoint, TokioExecutor)
193            .build()
194            .await?;
195
196        let mut consumer: Consumer<TestData, _> =
197            client.consumer().with_topic(topic).build().await?;
198
199        let mut producer = client.producer().with_topic(topic).build().await?;
200
201        producer
202            .send_non_blocking(TestData {
203                data: "test".to_string(),
204            })
205            .await?
206            .await?;
207
208        let data = consumer.next().await.unwrap()?.deserialize()?;
209        assert_eq!("test", data.data);
210
211        Ok(())
212    }
213
214    #[tokio::test]
215    async fn pulsar_config() -> Result<(), Box<dyn std::error::Error + 'static>> {
216        let topic = "persistent://test/test-ns/test-topic";
217
218        let pulsar = Pulsar::default()
219            .with_tenant("test")
220            .with_namespace("test/test-ns")
221            .with_config_env("allowAutoTopicCreation", "false")
222            .start()
223            .await
224            .unwrap();
225
226        let endpoint = format!(
227            "pulsar://0.0.0.0:{}",
228            pulsar.get_host_port_ipv4(6650).await?
229        );
230        let client = pulsar::Pulsar::builder(endpoint, TokioExecutor)
231            .build()
232            .await?;
233
234        let producer = client.producer().with_topic(topic).build().await;
235
236        match producer {
237            Ok(_) => panic!("Producer should return error"),
238            Err(e) => assert_eq!("Connection error: Server error (Some(TopicNotFound)): Topic not found persistent://test/test-ns/test-topic", e.to_string()),
239        }
240
241        Ok(())
242    }
243}