use anyhow::{bail, Result};
use std::path::{Path, PathBuf};
use std::process::Stdio;
use tokio::fs::{metadata, write};
use tokio::process::{Child, Command};
use tracing::warn;
use crate::start::wait_for_server;
use super::download_binary_from_github;
const NATS_GITHUB_RELEASE_URL: &str = "https://github.com/nats-io/nats-server/releases/download";
pub const NATS_SERVER_CONF: &str = "nats.conf";
pub const NATS_SERVER_PID: &str = "nats.pid";
#[cfg(target_family = "unix")]
pub const NATS_SERVER_BINARY: &str = "nats-server";
#[cfg(target_family = "windows")]
pub const NATS_SERVER_BINARY: &str = "nats-server.exe";
pub async fn ensure_nats_server<P>(version: &str, dir: P) -> Result<PathBuf>
where
P: AsRef<Path>,
{
ensure_nats_server_for_os_arch_pair(std::env::consts::OS, std::env::consts::ARCH, version, dir)
.await
}
pub async fn ensure_nats_server_for_os_arch_pair<P>(
os: &str,
arch: &str,
version: &str,
dir: P,
) -> Result<PathBuf>
where
P: AsRef<Path>,
{
let nats_bin_path = dir.as_ref().join(NATS_SERVER_BINARY);
if let Ok(_md) = metadata(&nats_bin_path).await {
return Ok(nats_bin_path);
}
download_binary_from_github(&nats_url(os, arch, version), dir, NATS_SERVER_BINARY).await
}
pub async fn download_nats_server<P>(version: &str, dir: P) -> Result<PathBuf>
where
P: AsRef<Path>,
{
download_binary_from_github(
&nats_url(std::env::consts::OS, std::env::consts::ARCH, version),
dir,
NATS_SERVER_BINARY,
)
.await
}
#[derive(Clone)]
pub struct NatsConfig {
pub host: String,
pub port: u16,
pub store_dir: PathBuf,
pub js_domain: Option<String>,
pub remote_url: Option<String>,
pub credentials: Option<PathBuf>,
pub websocket_port: u16,
pub config_path: Option<PathBuf>,
}
impl Default for NatsConfig {
fn default() -> Self {
NatsConfig {
host: "127.0.0.1".to_string(),
port: 4222,
store_dir: std::env::temp_dir().join("wash-jetstream-4222"),
js_domain: Some("core".to_string()),
remote_url: None,
credentials: None,
websocket_port: 4223,
config_path: None,
}
}
}
impl NatsConfig {
#[must_use]
pub fn new_leaf(
host: &str,
port: u16,
js_domain: Option<String>,
remote_url: String,
credentials: PathBuf,
websocket_port: u16,
config_path: Option<PathBuf>,
) -> Self {
NatsConfig {
host: host.to_owned(),
port,
store_dir: std::env::temp_dir().join(format!("wash-jetstream-{port}")),
js_domain,
remote_url: Some(remote_url),
credentials: Some(credentials),
websocket_port,
config_path,
}
}
pub fn new_standalone(host: &str, port: u16, js_domain: Option<String>) -> Self {
if host == "0.0.0.0" {
warn!("Listening on 0.0.0.0 is unsupported on some platforms, use 127.0.0.1 for best results");
}
NatsConfig {
host: host.to_owned(),
port,
store_dir: std::env::temp_dir().join(format!("wash-jetstream-{port}")),
js_domain,
..Default::default()
}
}
async fn write_to_path<P>(self, path: P) -> Result<()>
where
P: AsRef<Path>,
{
let leafnode_section = if let Some(url) = self.remote_url {
let url_line = format!(r#"url: "{url}""#);
let creds_line = self
.credentials
.as_ref()
.map(|c| format!("credentials: {c:?}"))
.unwrap_or_default();
format!(
r#"
leafnodes {{
remotes = [
{{
{url_line}
{creds_line}
}}
]
}}
"#,
)
} else {
String::new()
};
let websocket_port = self.websocket_port;
let websocket_section = format!(
r#"
websocket {{
port: {websocket_port}
no_tls: true
}}
"#
);
let config = format!(
r#"
jetstream {{
domain={}
store_dir={:?}
}}
{leafnode_section}
{websocket_section}
"#,
self.js_domain.unwrap_or_else(|| "core".to_string()),
self.store_dir.as_os_str().to_string_lossy()
);
write(path, config).await.map_err(anyhow::Error::from)
}
}
pub async fn start_nats_server<P, T>(bin_path: P, stderr: T, config: NatsConfig) -> Result<Child>
where
P: AsRef<Path>,
T: Into<Stdio>,
{
let host_addr = format!("{}:{}", config.host, config.port);
if tokio::net::TcpStream::connect(&host_addr).await.is_ok() {
bail!(
"could not start NATS server, a process is already listening on {}:{}",
config.host,
config.port
);
}
let bin_path_ref = bin_path.as_ref();
if let Some(parent_path) = bin_path_ref.parent() {
let config_path = parent_path.join(NATS_SERVER_CONF);
let host = config.host.clone();
let port = config.port;
let mut cmd_args = vec![
"-js".to_string(),
"--addr".to_string(),
host,
"--port".to_string(),
port.to_string(),
"--pid".to_string(),
parent_path
.join(NATS_SERVER_PID)
.to_string_lossy()
.to_string(),
"--config".to_string(),
];
if let Some(nats_cfg_path) = &config.config_path {
anyhow::ensure!(
nats_cfg_path.is_file(),
"The provided NATS config File [{:?}] is not a valid File",
nats_cfg_path
);
cmd_args.push(nats_cfg_path.to_string_lossy().to_string());
} else {
config.write_to_path(&config_path).await?;
cmd_args.push(config_path.to_string_lossy().to_string());
}
let child = Command::new(bin_path_ref)
.stderr(stderr.into())
.stdin(Stdio::null())
.args(&cmd_args)
.spawn()
.map_err(anyhow::Error::from)?;
wait_for_server(&host_addr, "NATS server")
.await
.map(|()| child)
} else {
bail!("could not write config to disk, couldn't find download directory")
}
}
pub fn nats_pid_path<P>(install_dir: P) -> PathBuf
where
P: AsRef<Path>,
{
install_dir.as_ref().join(NATS_SERVER_PID)
}
fn nats_url(os: &str, arch: &str, version: &str) -> String {
let os = if os == "macos" { "darwin" } else { os };
let arch = match arch {
"aarch64" => "arm64",
"x86_64" => "amd64",
_ => arch,
};
format!("{NATS_GITHUB_RELEASE_URL}/{version}/nats-server-{version}-{os}-{arch}.tar.gz")
}
#[cfg(test)]
mod test {
use crate::start::{
ensure_nats_server, is_bin_installed, start_nats_server, NatsConfig, NATS_SERVER_BINARY,
};
use anyhow::Result;
use std::env::temp_dir;
use tokio::{
fs::{create_dir_all, remove_dir_all},
io::AsyncReadExt,
};
const NATS_SERVER_VERSION: &str = "v2.10.7";
#[tokio::test]
#[cfg_attr(not(can_reach_github_com), ignore = "github.com is not reachable")]
async fn can_handle_missing_nats_version() -> Result<()> {
let install_dir = temp_dir().join("can_handle_missing_nats_version");
let _ = remove_dir_all(&install_dir).await;
create_dir_all(&install_dir).await?;
assert!(!is_bin_installed(&install_dir, NATS_SERVER_BINARY).await);
let res = ensure_nats_server("v300.22.1111223", &install_dir).await;
assert!(res.is_err());
let _ = remove_dir_all(install_dir).await;
Ok(())
}
#[tokio::test]
#[cfg_attr(not(can_reach_github_com), ignore = "github.com is not reachable")]
async fn can_download_and_start_nats() -> Result<()> {
let install_dir = temp_dir().join("can_download_and_start_nats");
let _ = remove_dir_all(&install_dir).await;
create_dir_all(&install_dir).await?;
assert!(!is_bin_installed(&install_dir, NATS_SERVER_BINARY).await);
let res = ensure_nats_server(NATS_SERVER_VERSION, &install_dir).await;
assert!(res.is_ok());
let log_path = install_dir.join("nats.log");
let log_file = tokio::fs::File::create(&log_path).await?.into_std().await;
let config = NatsConfig::new_standalone("127.0.0.1", 10000, None);
let child_res =
start_nats_server(&install_dir.join(NATS_SERVER_BINARY), log_file, config).await;
assert!(child_res.is_ok());
for _ in 0..4 {
let log_contents = tokio::fs::read_to_string(&log_path).await?;
if log_contents.is_empty() {
println!("NATS server hasn't started up yet, waiting 1 second");
tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
} else {
tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
assert!(log_contents.contains("Starting nats-server"));
assert!(log_contents.contains("Starting JetStream"));
assert!(log_contents.contains("Server is ready"));
break;
}
}
child_res.unwrap().kill().await?;
let _ = remove_dir_all(install_dir).await;
Ok(())
}
#[tokio::test]
#[cfg_attr(not(can_reach_github_com), ignore = "github.com is not reachable")]
async fn can_gracefully_fail_running_nats() -> Result<()> {
let install_dir = temp_dir().join("can_gracefully_fail_running_nats");
let _ = remove_dir_all(&install_dir).await;
create_dir_all(&install_dir).await?;
assert!(!is_bin_installed(&install_dir, NATS_SERVER_BINARY).await);
let res = ensure_nats_server(NATS_SERVER_VERSION, &install_dir).await;
assert!(res.is_ok());
let config = NatsConfig::new_standalone("127.0.0.1", 10003, Some("extender".to_string()));
let nats_one = start_nats_server(
&install_dir.join(NATS_SERVER_BINARY),
std::process::Stdio::null(),
config.clone(),
)
.await;
assert!(nats_one.is_ok());
tokio::time::sleep(std::time::Duration::from_millis(5000)).await;
let log_path = install_dir.join("nats.log");
let log = std::fs::File::create(&log_path)?;
let nats_two = start_nats_server(&install_dir.join(NATS_SERVER_BINARY), log, config).await;
assert!(nats_two.is_err());
nats_one.unwrap().kill().await?;
let _ = remove_dir_all(install_dir).await;
Ok(())
}
#[tokio::test]
#[cfg_attr(not(can_reach_github_com), ignore = "github.com is not reachable")]
async fn can_write_properly_formed_credsfile() -> Result<()> {
let install_dir = temp_dir().join("can_write_properly_formed_credsfile");
let _ = remove_dir_all(&install_dir).await;
create_dir_all(&install_dir).await?;
assert!(
!is_bin_installed(&install_dir, NATS_SERVER_BINARY).await,
"NATS should not be installed"
);
let res = ensure_nats_server(NATS_SERVER_VERSION, &install_dir).await;
assert!(res.is_ok(), "NATS should be able to start");
let creds = dirs::home_dir().unwrap().join("nats.creds");
let config: NatsConfig = NatsConfig::new_leaf(
"127.0.0.1",
4243,
None,
"connect.ngs.global".to_string(),
creds.clone(),
4204,
None,
);
config.write_to_path(creds.clone()).await?;
let mut credsfile = tokio::fs::File::open(creds.clone()).await?;
let mut contents = String::new();
credsfile.read_to_string(&mut contents).await?;
assert_eq!(contents, format!("\njetstream {{\n domain={}\n store_dir={:?}\n}}\n\nleafnodes {{\n remotes = [\n {{\n url: \"{}\"\n credentials: {:?}\n }}\n ]\n}}\n \n\nwebsocket {{\n port: 4204\n no_tls: true\n}}\n \n", "core", std::env::temp_dir().join("wash-jetstream-4243").display(), "connect.ngs.global", creds.to_string_lossy()));
#[cfg(target_family = "windows")]
assert!(creds.to_string_lossy().contains('\\'));
let _ = remove_dir_all(install_dir).await;
Ok(())
}
}