wash_lib/start/
nats.rs

1use anyhow::{bail, Result};
2use command_group::AsyncCommandGroup;
3use std::path::{Path, PathBuf};
4use std::process::Stdio;
5use tokio::fs::{metadata, write};
6use tokio::process::{Child, Command};
7use tracing::warn;
8
9use crate::common::CommandGroupUsage;
10use crate::start::wait_for_server;
11
12use super::download_binary_from_github;
13
14const NATS_GITHUB_RELEASE_URL: &str = "https://github.com/nats-io/nats-server/releases/download";
15pub const NATS_SERVER_CONF: &str = "nats.conf";
16pub const NATS_SERVER_PID: &str = "nats.pid";
17#[cfg(target_family = "unix")]
18pub const NATS_SERVER_BINARY: &str = "nats-server";
19#[cfg(target_family = "windows")]
20pub const NATS_SERVER_BINARY: &str = "nats-server.exe";
21
22/// Downloads the NATS binary for the architecture and operating system of the current host machine.
23///
24/// # Arguments
25///
26/// * `version` - Specifies the version of the binary to download in the form of `vX.Y.Z`
27/// * `dir` - Where to download the `nats-server` binary to
28/// # Examples
29///
30/// ```no_run
31/// # #[tokio::main]
32/// # async fn main() {
33/// use wash_lib::start::ensure_nats_server;
34/// let res = ensure_nats_server("v2.10.7", "/tmp/").await;
35/// assert!(res.is_ok());
36/// assert!(res.unwrap().to_string_lossy() == "/tmp/nats-server");
37/// # }
38/// ```
39pub async fn ensure_nats_server<P>(version: &str, dir: P) -> Result<PathBuf>
40where
41    P: AsRef<Path>,
42{
43    ensure_nats_server_for_os_arch_pair(std::env::consts::OS, std::env::consts::ARCH, version, dir)
44        .await
45}
46
47/// Ensures the `nats-server` binary is installed, returning the path to the executable early if it exists or
48/// downloading the specified GitHub release version of nats-server from <https://github.com/nats-io/nats-server/releases/>
49/// and unpacking the binary for a specified OS/ARCH pair to a directory. Returns the path to the NATS executable.
50/// # Arguments
51///
52/// * `os` - Specifies the operating system of the binary to download, e.g. `linux`
53/// * `arch` - Specifies the architecture of the binary to download, e.g. `amd64`
54/// * `version` - Specifies the version of the binary to download in the form of `vX.Y.Z`
55/// * `dir` - Where to download the `nats-server` binary to
56/// # Examples
57///
58/// ```no_run
59/// # #[tokio::main]
60/// # async fn main() {
61/// use wash_lib::start::ensure_nats_server_for_os_arch_pair;
62/// let os = std::env::consts::OS;
63/// let arch = std::env::consts::ARCH;
64/// let res = ensure_nats_server_for_os_arch_pair(os, arch, "v2.10.7", "/tmp/").await;
65/// assert!(res.is_ok());
66/// assert!(res.unwrap().to_string_lossy() == "/tmp/nats-server");
67/// # }
68/// ```
69pub async fn ensure_nats_server_for_os_arch_pair<P>(
70    os: &str,
71    arch: &str,
72    version: &str,
73    dir: P,
74) -> Result<PathBuf>
75where
76    P: AsRef<Path>,
77{
78    let nats_bin_path = dir.as_ref().join(NATS_SERVER_BINARY);
79    if let Ok(_md) = metadata(&nats_bin_path).await {
80        // Check version to see if we need to update
81        if let Ok(output) = Command::new(&nats_bin_path).arg("version").output().await {
82            let stdout = String::from_utf8_lossy(&output.stdout).to_string();
83            eprintln!(
84                "👀 Found nats-server version on the disk: {}",
85                stdout.trim_end()
86            );
87            let re = regex::Regex::new(r"^nats-server:[^\s]*").unwrap();
88            if re.replace(&stdout, "").to_string().trim() == version {
89                // nats-server already at correct version, return early
90                eprintln!("✅ Using nats-server version [{}]", &version);
91                return Ok(nats_bin_path);
92            }
93        }
94    }
95
96    eprintln!(
97        "🎣 Downloading new nats-server from {}",
98        &nats_url(os, arch, version)
99    );
100
101    // Download NATS binary
102    let res =
103        download_binary_from_github(&nats_url(os, arch, version), dir, NATS_SERVER_BINARY).await;
104    if let Ok(ref path) = res {
105        eprintln!("🎯 Saved nats-server to {}", path.display());
106    }
107
108    res
109}
110
111/// Downloads the NATS binary for the architecture and operating system of the current host machine.
112///
113/// # Arguments
114///
115/// * `version` - Specifies the version of the binary to download in the form of `vX.Y.Z`
116/// * `dir` - Where to download the `nats-server` binary to
117/// # Examples
118///
119/// ```no_run
120/// # #[tokio::main]
121/// # async fn main() {
122/// use wash_lib::start::download_nats_server;
123/// let res = download_nats_server("v2.10.7", "/tmp/").await;
124/// assert!(res.is_ok());
125/// assert!(res.unwrap().to_string_lossy() == "/tmp/nats-server");
126/// # }
127/// ```
128pub async fn download_nats_server<P>(version: &str, dir: P) -> Result<PathBuf>
129where
130    P: AsRef<Path>,
131{
132    download_binary_from_github(
133        &nats_url(std::env::consts::OS, std::env::consts::ARCH, version),
134        dir,
135        NATS_SERVER_BINARY,
136    )
137    .await
138}
139
140/// Configuration for a NATS server that supports running either in "standalone" or "leaf" mode.
141/// See the respective [`NatsConfig::new_standalone`] and [`NatsConfig::new_leaf`] implementations below for more information.
142#[derive(Clone, Debug, PartialEq, Eq)]
143pub struct NatsConfig {
144    pub host: String,
145    pub port: u16,
146    /// The path where the NATS server will store its jetstream data. This must be different for
147    /// each NATS server you spin up, otherwise they will share stream data
148    pub store_dir: PathBuf,
149    pub js_domain: Option<String>,
150    pub remote_url: Option<String>,
151    pub credentials: Option<PathBuf>,
152    pub websocket_port: u16,
153    pub config_path: Option<PathBuf>,
154}
155
156/// Returns a standalone NATS config with the following values:
157/// * `host`: `127.0.0.1`
158/// * `port`: `4222`
159/// * `js_domain`: `Some("core")`
160/// * `remote_url`: `None`
161/// * `credentials`: `None`
162/// * `websocket_port`: `4223`
163impl Default for NatsConfig {
164    fn default() -> Self {
165        NatsConfig {
166            host: "127.0.0.1".to_string(),
167            port: 4222,
168            store_dir: std::env::temp_dir().join("wash-jetstream-4222"),
169            js_domain: Some("core".to_string()),
170            remote_url: None,
171            credentials: None,
172            websocket_port: 4223,
173            config_path: None,
174        }
175    }
176}
177
178impl NatsConfig {
179    /// Instantiates config for a NATS leaf node. Leaf nodes are meant to extend
180    /// an existing NATS infrastructure like [Synadia's NGS](https://synadia.com/ngs), but can
181    /// also be used to extend your own NATS infrastructure. For more information,
182    /// our [Working with Leaf Nodes](https://wasmcloud.dev/reference/lattice/leaf-nodes/) docs
183    ///
184    /// # Arguments
185    /// * `host`: NATS host to listen on, e.g. `127.0.0.1`
186    /// * `port`: NATS port to listen on, e.g. `4222`
187    /// * `js_domain`: Jetstream domain to use, defaults to `core`. See [Configuring Jetstream](https://wasmcloud.dev/reference/lattice/jetstream/) for more information
188    /// * `remote_url`: URL of NATS cluster to extend
189    /// * `credentials`: Credentials to authenticate to the existing NATS cluster
190    #[must_use]
191    pub fn new_leaf(
192        host: &str,
193        port: u16,
194        js_domain: Option<String>,
195        remote_url: String,
196        credentials: PathBuf,
197        websocket_port: u16,
198        config_path: Option<PathBuf>,
199    ) -> Self {
200        NatsConfig {
201            host: host.to_owned(),
202            port,
203            store_dir: std::env::temp_dir().join(format!("wash-jetstream-{port}")),
204            js_domain,
205            remote_url: Some(remote_url),
206            credentials: Some(credentials),
207            websocket_port,
208            config_path,
209        }
210    }
211    /// Instantiates config for a standalone NATS server. Unless you're looking to extend
212    /// existing NATS infrastructure, this is the preferred NATS server mode.
213    ///
214    /// # Arguments
215    /// * `host`: NATS host to listen on, e.g. `127.0.0.1`
216    /// * `port`: NATS port to listen on, e.g. `4222`
217    /// * `js_domain`: Jetstream domain to use, defaults to `core`. See [Configuring Jetstream](https://wasmcloud.dev/reference/lattice/jetstream/) for more information
218    pub fn new_standalone(host: &str, port: u16, js_domain: Option<String>) -> Self {
219        if host == "0.0.0.0" {
220            warn!("Listening on 0.0.0.0 is unsupported on some platforms, use 127.0.0.1 for best results");
221        }
222        NatsConfig {
223            host: host.to_owned(),
224            port,
225            store_dir: std::env::temp_dir().join(format!("wash-jetstream-{port}")),
226            js_domain,
227            ..Default::default()
228        }
229    }
230
231    pub async fn write_to_path<P>(self, path: P) -> Result<()>
232    where
233        P: AsRef<Path>,
234    {
235        let leafnode_section = if let Some(url) = self.remote_url {
236            let url_line = format!(r#"url: "{url}""#);
237            let creds_line = self
238                .credentials
239                .as_ref()
240                .map(|c| format!("credentials: {c:?}"))
241                .unwrap_or_default();
242
243            format!(
244                r#"
245leafnodes {{
246    remotes = [
247        {{
248            {url_line}
249            {creds_line}
250        }}
251    ]
252}}
253                "#,
254            )
255        } else {
256            String::new()
257        };
258        let websocket_port = self.websocket_port;
259        let websocket_section = format!(
260            r#"
261websocket {{
262    port: {websocket_port}
263    no_tls: true
264}}
265                "#
266        );
267        let config = format!(
268            r#"
269jetstream {{
270    domain={}
271    store_dir={:?}
272}}
273{leafnode_section}
274{websocket_section}
275"#,
276            self.js_domain.unwrap_or_else(|| "core".to_string()),
277            self.store_dir.as_os_str().to_string_lossy()
278        );
279        write(path, config).await.map_err(anyhow::Error::from)
280    }
281}
282
283/// Helper function to execute a NATS server binary with required wasmCloud arguments, e.g. `JetStream`
284/// # Arguments
285///
286/// * `bin_path` - Path to the nats-server binary to execute
287/// * `stderr` - Specify where NATS stderr logs should be written to. If logs aren't important, use `std::process::Stdio::null()`
288/// * `config` - Configuration for the NATS server, see [`NatsConfig`] for options. This config file is written alongside the nats-server binary as `nats.conf`
289pub async fn start_nats_server<P, T>(
290    bin_path: P,
291    stderr: T,
292    config: NatsConfig,
293    command_group: CommandGroupUsage,
294) -> Result<Child>
295where
296    P: AsRef<Path>,
297    T: Into<Stdio>,
298{
299    let host_addr = format!("{}:{}", config.host, config.port);
300
301    // If we can connect to the local port, NATS won't be able to listen on that port
302    if tokio::net::TcpStream::connect(&host_addr).await.is_ok() {
303        bail!(
304            "could not start NATS server, a process is already listening on {}:{}",
305            config.host,
306            config.port
307        );
308    }
309
310    let bin_path_ref = bin_path.as_ref();
311
312    let Some(parent_path) = bin_path_ref.parent() else {
313        bail!("could not write config to disk, couldn't find download directory")
314    };
315
316    let config_path = parent_path.join(NATS_SERVER_CONF);
317    let host = config.host.clone();
318    let port = config.port;
319
320    let mut cmd_args = vec![
321        "-js".to_string(),
322        "--addr".to_string(),
323        host,
324        "--port".to_string(),
325        port.to_string(),
326        "--pid".to_string(),
327        parent_path
328            .join(NATS_SERVER_PID)
329            .to_string_lossy()
330            .to_string(),
331        "--config".to_string(),
332    ];
333
334    if let Some(nats_cfg_path) = &config.config_path {
335        anyhow::ensure!(
336            nats_cfg_path.is_file(),
337            "The provided NATS config File [{:?}] is not a valid File",
338            nats_cfg_path
339        );
340
341        cmd_args.push(nats_cfg_path.to_string_lossy().to_string());
342    } else {
343        config.write_to_path(&config_path).await?;
344        cmd_args.push(config_path.to_string_lossy().to_string());
345    }
346
347    let mut cmd = Command::new(bin_path_ref);
348    cmd.stderr(stderr.into())
349        .stdin(Stdio::null())
350        .args(&cmd_args);
351    let child = if command_group == CommandGroupUsage::CreateNew {
352        cmd.group_spawn().map_err(anyhow::Error::from)?.into_inner()
353    } else {
354        cmd.spawn().map_err(anyhow::Error::from)?
355    };
356
357    wait_for_server(&host_addr, "NATS server")
358        .await
359        .map(|()| child)
360}
361
362/// Helper function to get the path to the NATS server pid file
363pub fn nats_pid_path<P>(install_dir: P) -> PathBuf
364where
365    P: AsRef<Path>,
366{
367    install_dir.as_ref().join(NATS_SERVER_PID)
368}
369
370/// Helper function to determine the NATS server release path given an os/arch and version
371fn nats_url(os: &str, arch: &str, version: &str) -> String {
372    // Replace "macos" with "darwin" to match NATS release scheme
373    let os = if os == "macos" { "darwin" } else { os };
374    // Replace architecture to match NATS release naming scheme
375    let arch = match arch {
376        "aarch64" => "arm64",
377        "x86_64" => "amd64",
378        _ => arch,
379    };
380    format!("{NATS_GITHUB_RELEASE_URL}/{version}/nats-server-{version}-{os}-{arch}.tar.gz")
381}
382
383#[cfg(test)]
384mod test {
385    use anyhow::Result;
386    use tokio::io::AsyncReadExt;
387
388    use crate::start::NatsConfig;
389
390    #[tokio::test]
391    async fn can_write_properly_formed_credsfile() -> Result<()> {
392        let creds = etcetera::home_dir().unwrap().join("nats.creds");
393        let config: NatsConfig = NatsConfig::new_leaf(
394            "127.0.0.1",
395            4243,
396            None,
397            "connect.ngs.global".to_string(),
398            creds.clone(),
399            4204,
400            None,
401        );
402
403        config.write_to_path(creds.clone()).await?;
404
405        let mut credsfile = tokio::fs::File::open(creds.clone()).await?;
406        let mut contents = String::new();
407        credsfile.read_to_string(&mut contents).await?;
408
409        assert_eq!(contents, format!("\njetstream {{\n    domain={}\n    store_dir={:?}\n}}\n\nleafnodes {{\n    remotes = [\n        {{\n            url: \"{}\"\n            credentials: {:?}\n        }}\n    ]\n}}\n                \n\nwebsocket {{\n    port: 4204\n    no_tls: true\n}}\n                \n", "core", std::env::temp_dir().join("wash-jetstream-4243").display(), "connect.ngs.global", creds.to_string_lossy()));
410        // A simple check to ensure we are properly escaping quotes, this is unescaped and checks for "\\"
411        #[cfg(target_family = "windows")]
412        assert!(creds.to_string_lossy().contains('\\'));
413
414        Ok(())
415    }
416}