testcontainers_modules/nats/
mod.rs1use std::borrow::Cow;
2
3use testcontainers::{core::WaitFor, Image};
4
5const NAME: &str = "nats";
6const TAG: &str = "2.10.14";
7
8#[derive(Debug, Default, Clone)]
12pub struct Nats {
13 cmd: NatsServerCmd,
14}
15
16#[derive(Default, Debug, Clone)]
32pub struct NatsServerCmd {
33 user: Option<String>,
34 pass: Option<String>,
35
36 jetstream: Option<bool>,
37}
38
39impl NatsServerCmd {
40 pub fn with_user(mut self, user: &str) -> Self {
52 self.user = Some(user.to_owned());
53 self
54 }
55
56 pub fn with_password(mut self, password: &str) -> Self {
70 self.pass = Some(password.to_owned());
71 self
72 }
73
74 pub fn with_jetstream(mut self) -> Self {
86 self.jetstream = Some(true);
87 self
88 }
89}
90
91impl IntoIterator for &NatsServerCmd {
92 type Item = String;
93 type IntoIter = <Vec<String> as IntoIterator>::IntoIter;
94
95 fn into_iter(self) -> Self::IntoIter {
96 let mut args = Vec::new();
97
98 if let Some(ref user) = self.user {
99 args.push("--user".to_owned());
100 args.push(user.to_owned())
101 }
102 if let Some(ref pass) = self.pass {
103 args.push("--pass".to_owned());
104 args.push(pass.to_owned())
105 }
106
107 if let Some(ref jetstream) = self.jetstream {
108 if *jetstream {
109 args.push("--jetstream".to_owned());
110 }
111 }
112
113 args.into_iter()
114 }
115}
116
117impl Image for Nats {
118 fn name(&self) -> &str {
119 NAME
120 }
121
122 fn tag(&self) -> &str {
123 TAG
124 }
125
126 fn ready_conditions(&self) -> Vec<WaitFor> {
127 vec![
128 WaitFor::message_on_stderr("Listening for client connections on 0.0.0.0:4222"),
129 WaitFor::message_on_stderr("Server is ready"),
130 ]
131 }
132
133 fn cmd(&self) -> impl IntoIterator<Item = impl Into<Cow<'_, str>>> {
134 &self.cmd
135 }
136}
137
138#[cfg(test)]
139mod tests {
140 use std::time::Duration;
141
142 use async_nats::jetstream::{self, consumer::PushConsumer};
143 use futures::StreamExt;
144 use testcontainers::{runners::AsyncRunner, ImageExt};
145
146 use crate::nats::{Nats, NatsServerCmd};
147
148 #[test]
149 fn set_user() {
150 let nats_cmd_args = NatsServerCmd::default().with_user("custom_user");
151 assert_eq!(nats_cmd_args.user, Some("custom_user".into()));
152 let _image_with_cmd = Nats::default().with_cmd(&nats_cmd_args);
153 }
154
155 #[test]
156 fn set_password() {
157 let nats_cmd_args = NatsServerCmd::default().with_password("custom_password");
158 assert_eq!(nats_cmd_args.pass, Some("custom_password".into()));
159 let _image_with_cmd = Nats::default().with_cmd(&nats_cmd_args);
160 }
161
162 #[test]
163 fn enable_jetstream() {
164 let nats_cmd_args = NatsServerCmd::default().with_jetstream();
165 assert_eq!(nats_cmd_args.jetstream, Some(true));
166 let _image_with_cmd = Nats::default().with_cmd(&nats_cmd_args);
167 }
168
169 #[tokio::test]
170 async fn it_works() -> Result<(), Box<dyn std::error::Error + 'static>> {
171 let container = Nats::default().start().await?;
172
173 let host = container.get_host().await?;
174 let host_port = container.get_host_port_ipv4(4222).await?;
175 let url = format!("{host}:{host_port}");
176
177 let nats_client = async_nats::ConnectOptions::default()
178 .connect(url)
179 .await
180 .expect("failed to connect to nats server");
181
182 let mut subscriber = nats_client
183 .subscribe("messages")
184 .await
185 .expect("failed to subscribe to nats subject");
186 nats_client
187 .publish("messages", "data".into())
188 .await
189 .expect("failed to publish to nats subject");
190 let message = subscriber
191 .next()
192 .await
193 .expect("failed to fetch nats message");
194 assert_eq!(message.payload, "data");
195 Ok(())
196 }
197
198 #[tokio::test]
199 async fn it_works_with_jetstream() -> Result<(), Box<dyn std::error::Error + 'static>> {
202 let nats_cmd = NatsServerCmd::default().with_jetstream();
203 let container = Nats::default().with_cmd(&nats_cmd).start().await?;
204
205 let host = container.get_host().await?;
206 let host_port = container.get_host_port_ipv4(4222).await?;
207 let url = format!("{host}:{host_port}");
208
209 let nats_client = async_nats::ConnectOptions::default()
210 .connect(url)
211 .await
212 .expect("failed to connect to nats server");
213
214 let inbox = nats_client.new_inbox();
215
216 let jetstream = jetstream::new(nats_client);
217
218 let stream_name = String::from("EVENTS");
219
220 let consumer: PushConsumer = jetstream
221 .create_stream(jetstream::stream::Config {
222 name: stream_name,
223 subjects: vec!["events.>".to_string()],
224 ..Default::default()
225 })
226 .await?
227 .create_consumer(jetstream::consumer::push::Config {
228 deliver_subject: inbox.clone(),
229 inactive_threshold: Duration::from_secs(60),
230 ..Default::default()
231 })
232 .await?;
233
234 for i in 0..10 {
236 jetstream
237 .publish(format!("events.{i}"), "data".into())
238 .await?
239 .await?;
240 }
241
242 let mut messages_processed = 0;
243
244 let mut messages = consumer.messages().await?.take(10);
245
246 while let Some(message) = messages.next().await {
248 let message = message?;
249
250 assert_eq!(
251 message.subject.to_string(),
252 format!("events.{messages_processed}")
253 );
254
255 message.ack().await.unwrap();
257
258 messages_processed += 1;
259 }
260
261 assert_eq!(messages_processed, 10);
262
263 Ok(())
264 }
265}