testcontainers_modules/nats/
mod.rs1use std::borrow::Cow;
2
3use testcontainers::{core::WaitFor, Image};
4
5const NAME: &str = "nats";
6const TAG: &str = "2.10.14";
7
8#[derive(Debug, Default, Clone)]
12pub struct Nats {
13 cmd: NatsServerCmd,
14}
15
16#[allow(missing_docs)]
17#[derive(Default, Debug, Clone)]
19pub struct NatsServerCmd {
20 user: Option<String>,
21 pass: Option<String>,
22
23 jetstream: Option<bool>,
24}
25
26impl NatsServerCmd {
27 #[allow(missing_docs)]
29 pub fn with_user(mut self, user: &str) -> Self {
30 self.user = Some(user.to_owned());
31 self
32 }
33
34 #[allow(missing_docs)]
36 pub fn with_password(mut self, password: &str) -> Self {
37 self.pass = Some(password.to_owned());
38 self
39 }
40
41 pub fn with_jetstream(mut self) -> Self {
53 self.jetstream = Some(true);
54 self
55 }
56}
57
58impl IntoIterator for &NatsServerCmd {
59 type Item = String;
60 type IntoIter = <Vec<String> as IntoIterator>::IntoIter;
61
62 fn into_iter(self) -> Self::IntoIter {
63 let mut args = Vec::new();
64
65 if let Some(ref user) = self.user {
66 args.push("--user".to_owned());
67 args.push(user.to_owned())
68 }
69 if let Some(ref pass) = self.pass {
70 args.push("--pass".to_owned());
71 args.push(pass.to_owned())
72 }
73
74 if let Some(ref jetstream) = self.jetstream {
75 if *jetstream {
76 args.push("--jetstream".to_owned());
77 }
78 }
79
80 args.into_iter()
81 }
82}
83
84impl Image for Nats {
85 fn name(&self) -> &str {
86 NAME
87 }
88
89 fn tag(&self) -> &str {
90 TAG
91 }
92
93 fn ready_conditions(&self) -> Vec<WaitFor> {
94 vec![
95 WaitFor::message_on_stderr("Listening for client connections on 0.0.0.0:4222"),
96 WaitFor::message_on_stderr("Server is ready"),
97 ]
98 }
99
100 fn cmd(&self) -> impl IntoIterator<Item = impl Into<Cow<'_, str>>> {
101 &self.cmd
102 }
103}
104
105#[cfg(test)]
106mod tests {
107 use std::time::Duration;
108
109 use async_nats::jetstream::{self, consumer::PushConsumer};
110 use futures::StreamExt;
111 use testcontainers::{runners::AsyncRunner, ImageExt};
112
113 use crate::nats::{Nats, NatsServerCmd};
114
115 #[test]
116 fn set_user() {
117 let nats_cmd_args = NatsServerCmd::default().with_user("custom_user");
118 assert_eq!(nats_cmd_args.user, Some("custom_user".into()));
119 let _image_with_cmd = Nats::default().with_cmd(&nats_cmd_args);
120 }
121
122 #[test]
123 fn set_password() {
124 let nats_cmd_args = NatsServerCmd::default().with_password("custom_password");
125 assert_eq!(nats_cmd_args.pass, Some("custom_password".into()));
126 let _image_with_cmd = Nats::default().with_cmd(&nats_cmd_args);
127 }
128
129 #[test]
130 fn enable_jetstream() {
131 let nats_cmd_args = NatsServerCmd::default().with_jetstream();
132 assert_eq!(nats_cmd_args.jetstream, Some(true));
133 let _image_with_cmd = Nats::default().with_cmd(&nats_cmd_args);
134 }
135
136 #[tokio::test]
137 async fn it_works() -> Result<(), Box<dyn std::error::Error + 'static>> {
138 let container = Nats::default().start().await?;
139
140 let host = container.get_host().await?;
141 let host_port = container.get_host_port_ipv4(4222).await?;
142 let url = format!("{host}:{host_port}");
143
144 let nats_client = async_nats::ConnectOptions::default()
145 .connect(url)
146 .await
147 .expect("failed to connect to nats server");
148
149 let mut subscriber = nats_client
150 .subscribe("messages")
151 .await
152 .expect("failed to subscribe to nats subject");
153 nats_client
154 .publish("messages", "data".into())
155 .await
156 .expect("failed to publish to nats subject");
157 let message = subscriber
158 .next()
159 .await
160 .expect("failed to fetch nats message");
161 assert_eq!(message.payload, "data");
162 Ok(())
163 }
164
165 #[tokio::test]
166 async fn it_works_with_jetstream() -> Result<(), Box<dyn std::error::Error + 'static>> {
169 let nats_cmd = NatsServerCmd::default().with_jetstream();
170 let container = Nats::default().with_cmd(&nats_cmd).start().await?;
171
172 let host = container.get_host().await?;
173 let host_port = container.get_host_port_ipv4(4222).await?;
174 let url = format!("{host}:{host_port}");
175
176 let nats_client = async_nats::ConnectOptions::default()
177 .connect(url)
178 .await
179 .expect("failed to connect to nats server");
180
181 let inbox = nats_client.new_inbox();
182
183 let jetstream = jetstream::new(nats_client);
184
185 let stream_name = String::from("EVENTS");
186
187 let consumer: PushConsumer = jetstream
188 .create_stream(jetstream::stream::Config {
189 name: stream_name,
190 subjects: vec!["events.>".to_string()],
191 ..Default::default()
192 })
193 .await?
194 .create_consumer(jetstream::consumer::push::Config {
195 deliver_subject: inbox.clone(),
196 inactive_threshold: Duration::from_secs(60),
197 ..Default::default()
198 })
199 .await?;
200
201 for i in 0..10 {
203 jetstream
204 .publish(format!("events.{i}"), "data".into())
205 .await?
206 .await?;
207 }
208
209 let mut messages_processed = 0;
210
211 let mut messages = consumer.messages().await?.take(10);
212
213 while let Some(message) = messages.next().await {
215 let message = message?;
216
217 assert_eq!(
218 message.subject.to_string(),
219 format!("events.{messages_processed}")
220 );
221
222 message.ack().await.unwrap();
224
225 messages_processed += 1;
226 }
227
228 assert_eq!(messages_processed, 10);
229
230 Ok(())
231 }
232}