testcontainers_modules/nats/
mod.rs

1use std::borrow::Cow;
2
3use testcontainers::{core::WaitFor, Image};
4
5const NAME: &str = "nats";
6const TAG: &str = "2.10.14";
7
8/// Nats image for [testcontainers](https://crates.io/crates/testcontainers).
9///
10/// This image is based on the official [Nats](https://hub.docker.com/_/nats) image.
11#[derive(Debug, Default, Clone)]
12pub struct Nats {
13    cmd: NatsServerCmd,
14}
15
16/// Configuration for the NATS server command-line arguments.
17///
18/// This struct allows you to customize the NATS server startup configuration
19/// by setting various options like authentication credentials and enabling features
20/// like JetStream.
21///
22/// # Example
23/// ```
24/// use testcontainers_modules::nats::NatsServerCmd;
25///
26/// let cmd = NatsServerCmd::default()
27///     .with_user("myuser")
28///     .with_password("mypass")
29///     .with_jetstream();
30/// ```
31#[derive(Default, Debug, Clone)]
32pub struct NatsServerCmd {
33    user: Option<String>,
34    pass: Option<String>,
35
36    jetstream: Option<bool>,
37}
38
39impl NatsServerCmd {
40    /// Sets the username for NATS server authentication.
41    ///
42    /// This configures the NATS server to require authentication with the specified username.
43    /// Should be used together with [`with_password`](Self::with_password) for complete authentication setup.
44    ///
45    /// # Example
46    /// ```
47    /// use testcontainers_modules::nats::NatsServerCmd;
48    ///
49    /// let cmd = NatsServerCmd::default().with_user("myuser");
50    /// ```
51    pub fn with_user(mut self, user: &str) -> Self {
52        self.user = Some(user.to_owned());
53        self
54    }
55
56    /// Sets the password for NATS server authentication.
57    ///
58    /// This configures the NATS server to require authentication with the specified password.
59    /// Should be used together with [`with_user`](Self::with_user) for complete authentication setup.
60    ///
61    /// # Example
62    /// ```
63    /// use testcontainers_modules::nats::NatsServerCmd;
64    ///
65    /// let cmd = NatsServerCmd::default()
66    ///     .with_user("myuser")
67    ///     .with_password("mypass");
68    /// ```
69    pub fn with_password(mut self, password: &str) -> Self {
70        self.pass = Some(password.to_owned());
71        self
72    }
73
74    /// Enable JetStream in the Nats server to use the built-in persistence
75    /// features of NATS.
76    ///
77    /// See: https://docs.nats.io/nats-concepts/jetstream
78    ///
79    /// Example:
80    /// ```rust,ignore
81    /// # use testcontainers_modules::nats::{Nats, NatsServerCmd};
82    /// let nats_cmd = NatsServerCmd::default().with_jetstream();
83    /// let node = Nats::default().with_cmd(&nats_cmd).start().await?;
84    /// ```
85    pub fn with_jetstream(mut self) -> Self {
86        self.jetstream = Some(true);
87        self
88    }
89}
90
91impl IntoIterator for &NatsServerCmd {
92    type Item = String;
93    type IntoIter = <Vec<String> as IntoIterator>::IntoIter;
94
95    fn into_iter(self) -> Self::IntoIter {
96        let mut args = Vec::new();
97
98        if let Some(ref user) = self.user {
99            args.push("--user".to_owned());
100            args.push(user.to_owned())
101        }
102        if let Some(ref pass) = self.pass {
103            args.push("--pass".to_owned());
104            args.push(pass.to_owned())
105        }
106
107        if let Some(ref jetstream) = self.jetstream {
108            if *jetstream {
109                args.push("--jetstream".to_owned());
110            }
111        }
112
113        args.into_iter()
114    }
115}
116
117impl Image for Nats {
118    fn name(&self) -> &str {
119        NAME
120    }
121
122    fn tag(&self) -> &str {
123        TAG
124    }
125
126    fn ready_conditions(&self) -> Vec<WaitFor> {
127        vec![
128            WaitFor::message_on_stderr("Listening for client connections on 0.0.0.0:4222"),
129            WaitFor::message_on_stderr("Server is ready"),
130        ]
131    }
132
133    fn cmd(&self) -> impl IntoIterator<Item = impl Into<Cow<'_, str>>> {
134        &self.cmd
135    }
136}
137
138#[cfg(test)]
139mod tests {
140    use std::time::Duration;
141
142    use async_nats::jetstream::{self, consumer::PushConsumer};
143    use futures::StreamExt;
144    use testcontainers::{runners::AsyncRunner, ImageExt};
145
146    use crate::nats::{Nats, NatsServerCmd};
147
148    #[test]
149    fn set_user() {
150        let nats_cmd_args = NatsServerCmd::default().with_user("custom_user");
151        assert_eq!(nats_cmd_args.user, Some("custom_user".into()));
152        let _image_with_cmd = Nats::default().with_cmd(&nats_cmd_args);
153    }
154
155    #[test]
156    fn set_password() {
157        let nats_cmd_args = NatsServerCmd::default().with_password("custom_password");
158        assert_eq!(nats_cmd_args.pass, Some("custom_password".into()));
159        let _image_with_cmd = Nats::default().with_cmd(&nats_cmd_args);
160    }
161
162    #[test]
163    fn enable_jetstream() {
164        let nats_cmd_args = NatsServerCmd::default().with_jetstream();
165        assert_eq!(nats_cmd_args.jetstream, Some(true));
166        let _image_with_cmd = Nats::default().with_cmd(&nats_cmd_args);
167    }
168
169    #[tokio::test]
170    async fn it_works() -> Result<(), Box<dyn std::error::Error + 'static>> {
171        let container = Nats::default().start().await?;
172
173        let host = container.get_host().await?;
174        let host_port = container.get_host_port_ipv4(4222).await?;
175        let url = format!("{host}:{host_port}");
176
177        let nats_client = async_nats::ConnectOptions::default()
178            .connect(url)
179            .await
180            .expect("failed to connect to nats server");
181
182        let mut subscriber = nats_client
183            .subscribe("messages")
184            .await
185            .expect("failed to subscribe to nats subject");
186        nats_client
187            .publish("messages", "data".into())
188            .await
189            .expect("failed to publish to nats subject");
190        let message = subscriber
191            .next()
192            .await
193            .expect("failed to fetch nats message");
194        assert_eq!(message.payload, "data");
195        Ok(())
196    }
197
198    #[tokio::test]
199    /// Show how to use the Nats module with the Jetstream feature.
200    /// See: https://github.com/nats-io/nats.rs/blob/main/async-nats/examples/jetstream_push.rs
201    async fn it_works_with_jetstream() -> Result<(), Box<dyn std::error::Error + 'static>> {
202        let nats_cmd = NatsServerCmd::default().with_jetstream();
203        let container = Nats::default().with_cmd(&nats_cmd).start().await?;
204
205        let host = container.get_host().await?;
206        let host_port = container.get_host_port_ipv4(4222).await?;
207        let url = format!("{host}:{host_port}");
208
209        let nats_client = async_nats::ConnectOptions::default()
210            .connect(url)
211            .await
212            .expect("failed to connect to nats server");
213
214        let inbox = nats_client.new_inbox();
215
216        let jetstream = jetstream::new(nats_client);
217
218        let stream_name = String::from("EVENTS");
219
220        let consumer: PushConsumer = jetstream
221            .create_stream(jetstream::stream::Config {
222                name: stream_name,
223                subjects: vec!["events.>".to_string()],
224                ..Default::default()
225            })
226            .await?
227            .create_consumer(jetstream::consumer::push::Config {
228                deliver_subject: inbox.clone(),
229                inactive_threshold: Duration::from_secs(60),
230                ..Default::default()
231            })
232            .await?;
233
234        // Publish a few messages for the example.
235        for i in 0..10 {
236            jetstream
237                .publish(format!("events.{i}"), "data".into())
238                .await?
239                .await?;
240        }
241
242        let mut messages_processed = 0;
243
244        let mut messages = consumer.messages().await?.take(10);
245
246        // Iterate over messages.
247        while let Some(message) = messages.next().await {
248            let message = message?;
249
250            assert_eq!(
251                message.subject.to_string(),
252                format!("events.{messages_processed}")
253            );
254
255            // acknowledge the message
256            message.ack().await.unwrap();
257
258            messages_processed += 1;
259        }
260
261        assert_eq!(messages_processed, 10);
262
263        Ok(())
264    }
265}