testcontainers_modules/nats/
mod.rs

1use std::borrow::Cow;
2
3use testcontainers::{core::WaitFor, Image};
4
5const NAME: &str = "nats";
6const TAG: &str = "2.10.14";
7
8/// Nats image for [testcontainers](https://crates.io/crates/testcontainers).
9///
10/// This image is based on the official [Nats](https://hub.docker.com/_/nats) image.
11#[derive(Debug, Default, Clone)]
12pub struct Nats {
13    cmd: NatsServerCmd,
14}
15
16#[allow(missing_docs)]
17// not having docs here is currently allowed to address the missing docs problem one place at a time. Helping us by documenting just one of these places helps other devs tremendously
18#[derive(Default, Debug, Clone)]
19pub struct NatsServerCmd {
20    user: Option<String>,
21    pass: Option<String>,
22
23    jetstream: Option<bool>,
24}
25
26impl NatsServerCmd {
27    // not having docs here is currently allowed to address the missing docs problem one place at a time. Helping us by documenting just one of these places helps other devs tremendously
28    #[allow(missing_docs)]
29    pub fn with_user(mut self, user: &str) -> Self {
30        self.user = Some(user.to_owned());
31        self
32    }
33
34    // not having docs here is currently allowed to address the missing docs problem one place at a time. Helping us by documenting just one of these places helps other devs tremendously
35    #[allow(missing_docs)]
36    pub fn with_password(mut self, password: &str) -> Self {
37        self.pass = Some(password.to_owned());
38        self
39    }
40
41    /// Enable JetStream in the Nats server to use the built-in persistence
42    /// features of NATS.
43    ///
44    /// See: https://docs.nats.io/nats-concepts/jetstream
45    ///
46    /// Example:
47    /// ```rust,ignore
48    /// # use testcontainers_modules::nats::{Nats, NatsServerCmd};
49    /// let nats_cmd = NatsServerCmd::default().with_jetstream();
50    /// let node = Nats::default().with_cmd(&nats_cmd).start().await?;
51    /// ```
52    pub fn with_jetstream(mut self) -> Self {
53        self.jetstream = Some(true);
54        self
55    }
56}
57
58impl IntoIterator for &NatsServerCmd {
59    type Item = String;
60    type IntoIter = <Vec<String> as IntoIterator>::IntoIter;
61
62    fn into_iter(self) -> Self::IntoIter {
63        let mut args = Vec::new();
64
65        if let Some(ref user) = self.user {
66            args.push("--user".to_owned());
67            args.push(user.to_owned())
68        }
69        if let Some(ref pass) = self.pass {
70            args.push("--pass".to_owned());
71            args.push(pass.to_owned())
72        }
73
74        if let Some(ref jetstream) = self.jetstream {
75            if *jetstream {
76                args.push("--jetstream".to_owned());
77            }
78        }
79
80        args.into_iter()
81    }
82}
83
84impl Image for Nats {
85    fn name(&self) -> &str {
86        NAME
87    }
88
89    fn tag(&self) -> &str {
90        TAG
91    }
92
93    fn ready_conditions(&self) -> Vec<WaitFor> {
94        vec![
95            WaitFor::message_on_stderr("Listening for client connections on 0.0.0.0:4222"),
96            WaitFor::message_on_stderr("Server is ready"),
97        ]
98    }
99
100    fn cmd(&self) -> impl IntoIterator<Item = impl Into<Cow<'_, str>>> {
101        &self.cmd
102    }
103}
104
105#[cfg(test)]
106mod tests {
107    use std::time::Duration;
108
109    use async_nats::jetstream::{self, consumer::PushConsumer};
110    use futures::StreamExt;
111    use testcontainers::{runners::AsyncRunner, ImageExt};
112
113    use crate::nats::{Nats, NatsServerCmd};
114
115    #[test]
116    fn set_user() {
117        let nats_cmd_args = NatsServerCmd::default().with_user("custom_user");
118        assert_eq!(nats_cmd_args.user, Some("custom_user".into()));
119        let _image_with_cmd = Nats::default().with_cmd(&nats_cmd_args);
120    }
121
122    #[test]
123    fn set_password() {
124        let nats_cmd_args = NatsServerCmd::default().with_password("custom_password");
125        assert_eq!(nats_cmd_args.pass, Some("custom_password".into()));
126        let _image_with_cmd = Nats::default().with_cmd(&nats_cmd_args);
127    }
128
129    #[test]
130    fn enable_jetstream() {
131        let nats_cmd_args = NatsServerCmd::default().with_jetstream();
132        assert_eq!(nats_cmd_args.jetstream, Some(true));
133        let _image_with_cmd = Nats::default().with_cmd(&nats_cmd_args);
134    }
135
136    #[tokio::test]
137    async fn it_works() -> Result<(), Box<dyn std::error::Error + 'static>> {
138        let container = Nats::default().start().await?;
139
140        let host = container.get_host().await?;
141        let host_port = container.get_host_port_ipv4(4222).await?;
142        let url = format!("{host}:{host_port}");
143
144        let nats_client = async_nats::ConnectOptions::default()
145            .connect(url)
146            .await
147            .expect("failed to connect to nats server");
148
149        let mut subscriber = nats_client
150            .subscribe("messages")
151            .await
152            .expect("failed to subscribe to nats subject");
153        nats_client
154            .publish("messages", "data".into())
155            .await
156            .expect("failed to publish to nats subject");
157        let message = subscriber
158            .next()
159            .await
160            .expect("failed to fetch nats message");
161        assert_eq!(message.payload, "data");
162        Ok(())
163    }
164
165    #[tokio::test]
166    /// Show how to use the Nats module with the Jetstream feature.
167    /// See: https://github.com/nats-io/nats.rs/blob/main/async-nats/examples/jetstream_push.rs
168    async fn it_works_with_jetstream() -> Result<(), Box<dyn std::error::Error + 'static>> {
169        let nats_cmd = NatsServerCmd::default().with_jetstream();
170        let container = Nats::default().with_cmd(&nats_cmd).start().await?;
171
172        let host = container.get_host().await?;
173        let host_port = container.get_host_port_ipv4(4222).await?;
174        let url = format!("{host}:{host_port}");
175
176        let nats_client = async_nats::ConnectOptions::default()
177            .connect(url)
178            .await
179            .expect("failed to connect to nats server");
180
181        let inbox = nats_client.new_inbox();
182
183        let jetstream = jetstream::new(nats_client);
184
185        let stream_name = String::from("EVENTS");
186
187        let consumer: PushConsumer = jetstream
188            .create_stream(jetstream::stream::Config {
189                name: stream_name,
190                subjects: vec!["events.>".to_string()],
191                ..Default::default()
192            })
193            .await?
194            .create_consumer(jetstream::consumer::push::Config {
195                deliver_subject: inbox.clone(),
196                inactive_threshold: Duration::from_secs(60),
197                ..Default::default()
198            })
199            .await?;
200
201        // Publish a few messages for the example.
202        for i in 0..10 {
203            jetstream
204                .publish(format!("events.{i}"), "data".into())
205                .await?
206                .await?;
207        }
208
209        let mut messages_processed = 0;
210
211        let mut messages = consumer.messages().await?.take(10);
212
213        // Iterate over messages.
214        while let Some(message) = messages.next().await {
215            let message = message?;
216
217            assert_eq!(
218                message.subject.to_string(),
219                format!("events.{messages_processed}")
220            );
221
222            // acknowledge the message
223            message.ack().await.unwrap();
224
225            messages_processed += 1;
226        }
227
228        assert_eq!(messages_processed, 10);
229
230        Ok(())
231    }
232}