1use anyhow::{bail, Result};
2use command_group::AsyncCommandGroup;
3use std::path::{Path, PathBuf};
4use std::process::Stdio;
5use tokio::fs::{metadata, write};
6use tokio::process::{Child, Command};
7use tracing::warn;
8
9use crate::common::CommandGroupUsage;
10use crate::start::wait_for_server;
11
12use super::download_binary_from_github;
13
14const NATS_GITHUB_RELEASE_URL: &str = "https://github.com/nats-io/nats-server/releases/download";
15pub const NATS_SERVER_CONF: &str = "nats.conf";
16pub const NATS_SERVER_PID: &str = "nats.pid";
17#[cfg(target_family = "unix")]
18pub const NATS_SERVER_BINARY: &str = "nats-server";
19#[cfg(target_family = "windows")]
20pub const NATS_SERVER_BINARY: &str = "nats-server.exe";
21
22pub async fn ensure_nats_server<P>(version: &str, dir: P) -> Result<PathBuf>
40where
41 P: AsRef<Path>,
42{
43 ensure_nats_server_for_os_arch_pair(std::env::consts::OS, std::env::consts::ARCH, version, dir)
44 .await
45}
46
47pub async fn ensure_nats_server_for_os_arch_pair<P>(
70 os: &str,
71 arch: &str,
72 version: &str,
73 dir: P,
74) -> Result<PathBuf>
75where
76 P: AsRef<Path>,
77{
78 let nats_bin_path = dir.as_ref().join(NATS_SERVER_BINARY);
79 if let Ok(_md) = metadata(&nats_bin_path).await {
80 if let Ok(output) = Command::new(&nats_bin_path).arg("version").output().await {
82 let stdout = String::from_utf8_lossy(&output.stdout).to_string();
83 eprintln!(
84 "👀 Found nats-server version on the disk: {}",
85 stdout.trim_end()
86 );
87 let re = regex::Regex::new(r"^nats-server:[^\s]*").unwrap();
88 if re.replace(&stdout, "").to_string().trim() == version {
89 eprintln!("✅ Using nats-server version [{}]", &version);
91 return Ok(nats_bin_path);
92 }
93 }
94 }
95
96 eprintln!(
97 "🎣 Downloading new nats-server from {}",
98 &nats_url(os, arch, version)
99 );
100
101 let res =
103 download_binary_from_github(&nats_url(os, arch, version), dir, NATS_SERVER_BINARY).await;
104 if let Ok(ref path) = res {
105 eprintln!("🎯 Saved nats-server to {}", path.display());
106 }
107
108 res
109}
110
111pub async fn download_nats_server<P>(version: &str, dir: P) -> Result<PathBuf>
129where
130 P: AsRef<Path>,
131{
132 download_binary_from_github(
133 &nats_url(std::env::consts::OS, std::env::consts::ARCH, version),
134 dir,
135 NATS_SERVER_BINARY,
136 )
137 .await
138}
139
140#[derive(Clone, Debug, PartialEq, Eq)]
143pub struct NatsConfig {
144 pub host: String,
145 pub port: u16,
146 pub store_dir: PathBuf,
149 pub js_domain: Option<String>,
150 pub remote_url: Option<String>,
151 pub credentials: Option<PathBuf>,
152 pub websocket_port: u16,
153 pub config_path: Option<PathBuf>,
154}
155
156impl Default for NatsConfig {
164 fn default() -> Self {
165 NatsConfig {
166 host: "127.0.0.1".to_string(),
167 port: 4222,
168 store_dir: std::env::temp_dir().join("wash-jetstream-4222"),
169 js_domain: Some("core".to_string()),
170 remote_url: None,
171 credentials: None,
172 websocket_port: 4223,
173 config_path: None,
174 }
175 }
176}
177
178impl NatsConfig {
179 #[must_use]
191 pub fn new_leaf(
192 host: &str,
193 port: u16,
194 js_domain: Option<String>,
195 remote_url: String,
196 credentials: PathBuf,
197 websocket_port: u16,
198 config_path: Option<PathBuf>,
199 ) -> Self {
200 NatsConfig {
201 host: host.to_owned(),
202 port,
203 store_dir: std::env::temp_dir().join(format!("wash-jetstream-{port}")),
204 js_domain,
205 remote_url: Some(remote_url),
206 credentials: Some(credentials),
207 websocket_port,
208 config_path,
209 }
210 }
211 pub fn new_standalone(host: &str, port: u16, js_domain: Option<String>) -> Self {
219 if host == "0.0.0.0" {
220 warn!("Listening on 0.0.0.0 is unsupported on some platforms, use 127.0.0.1 for best results");
221 }
222 NatsConfig {
223 host: host.to_owned(),
224 port,
225 store_dir: std::env::temp_dir().join(format!("wash-jetstream-{port}")),
226 js_domain,
227 ..Default::default()
228 }
229 }
230
231 pub async fn write_to_path<P>(self, path: P) -> Result<()>
232 where
233 P: AsRef<Path>,
234 {
235 let leafnode_section = if let Some(url) = self.remote_url {
236 let url_line = format!(r#"url: "{url}""#);
237 let creds_line = self
238 .credentials
239 .as_ref()
240 .map(|c| format!("credentials: {c:?}"))
241 .unwrap_or_default();
242
243 format!(
244 r#"
245leafnodes {{
246 remotes = [
247 {{
248 {url_line}
249 {creds_line}
250 }}
251 ]
252}}
253 "#,
254 )
255 } else {
256 String::new()
257 };
258 let websocket_port = self.websocket_port;
259 let websocket_section = format!(
260 r#"
261websocket {{
262 port: {websocket_port}
263 no_tls: true
264}}
265 "#
266 );
267 let config = format!(
268 r#"
269jetstream {{
270 domain={}
271 store_dir={:?}
272}}
273{leafnode_section}
274{websocket_section}
275"#,
276 self.js_domain.unwrap_or_else(|| "core".to_string()),
277 self.store_dir.as_os_str().to_string_lossy()
278 );
279 write(path, config).await.map_err(anyhow::Error::from)
280 }
281}
282
283pub async fn start_nats_server<P, T>(
290 bin_path: P,
291 stderr: T,
292 config: NatsConfig,
293 command_group: CommandGroupUsage,
294) -> Result<Child>
295where
296 P: AsRef<Path>,
297 T: Into<Stdio>,
298{
299 let host_addr = format!("{}:{}", config.host, config.port);
300
301 if tokio::net::TcpStream::connect(&host_addr).await.is_ok() {
303 bail!(
304 "could not start NATS server, a process is already listening on {}:{}",
305 config.host,
306 config.port
307 );
308 }
309
310 let bin_path_ref = bin_path.as_ref();
311
312 let Some(parent_path) = bin_path_ref.parent() else {
313 bail!("could not write config to disk, couldn't find download directory")
314 };
315
316 let config_path = parent_path.join(NATS_SERVER_CONF);
317 let host = config.host.clone();
318 let port = config.port;
319
320 let mut cmd_args = vec![
321 "-js".to_string(),
322 "--addr".to_string(),
323 host,
324 "--port".to_string(),
325 port.to_string(),
326 "--pid".to_string(),
327 parent_path
328 .join(NATS_SERVER_PID)
329 .to_string_lossy()
330 .to_string(),
331 "--config".to_string(),
332 ];
333
334 if let Some(nats_cfg_path) = &config.config_path {
335 anyhow::ensure!(
336 nats_cfg_path.is_file(),
337 "The provided NATS config File [{:?}] is not a valid File",
338 nats_cfg_path
339 );
340
341 cmd_args.push(nats_cfg_path.to_string_lossy().to_string());
342 } else {
343 config.write_to_path(&config_path).await?;
344 cmd_args.push(config_path.to_string_lossy().to_string());
345 }
346
347 let mut cmd = Command::new(bin_path_ref);
348 cmd.stderr(stderr.into())
349 .stdin(Stdio::null())
350 .args(&cmd_args);
351 let child = if command_group == CommandGroupUsage::CreateNew {
352 cmd.group_spawn().map_err(anyhow::Error::from)?.into_inner()
353 } else {
354 cmd.spawn().map_err(anyhow::Error::from)?
355 };
356
357 wait_for_server(&host_addr, "NATS server")
358 .await
359 .map(|()| child)
360}
361
362pub fn nats_pid_path<P>(install_dir: P) -> PathBuf
364where
365 P: AsRef<Path>,
366{
367 install_dir.as_ref().join(NATS_SERVER_PID)
368}
369
370fn nats_url(os: &str, arch: &str, version: &str) -> String {
372 let os = if os == "macos" { "darwin" } else { os };
374 let arch = match arch {
376 "aarch64" => "arm64",
377 "x86_64" => "amd64",
378 _ => arch,
379 };
380 format!("{NATS_GITHUB_RELEASE_URL}/{version}/nats-server-{version}-{os}-{arch}.tar.gz")
381}
382
383#[cfg(test)]
384mod test {
385 use anyhow::Result;
386 use tokio::io::AsyncReadExt;
387
388 use crate::start::NatsConfig;
389
390 #[tokio::test]
391 async fn can_write_properly_formed_credsfile() -> Result<()> {
392 let creds = etcetera::home_dir().unwrap().join("nats.creds");
393 let config: NatsConfig = NatsConfig::new_leaf(
394 "127.0.0.1",
395 4243,
396 None,
397 "connect.ngs.global".to_string(),
398 creds.clone(),
399 4204,
400 None,
401 );
402
403 config.write_to_path(creds.clone()).await?;
404
405 let mut credsfile = tokio::fs::File::open(creds.clone()).await?;
406 let mut contents = String::new();
407 credsfile.read_to_string(&mut contents).await?;
408
409 assert_eq!(contents, format!("\njetstream {{\n domain={}\n store_dir={:?}\n}}\n\nleafnodes {{\n remotes = [\n {{\n url: \"{}\"\n credentials: {:?}\n }}\n ]\n}}\n \n\nwebsocket {{\n port: 4204\n no_tls: true\n}}\n \n", "core", std::env::temp_dir().join("wash-jetstream-4243").display(), "connect.ngs.global", creds.to_string_lossy()));
410 #[cfg(target_family = "windows")]
412 assert!(creds.to_string_lossy().contains('\\'));
413
414 Ok(())
415 }
416}