wash_cli/down/
mod.rs

1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3use std::process::{Output, Stdio};
4
5use anyhow::{bail, Context as _, Result};
6use clap::Parser;
7use futures::future::join_all;
8use serde_json::json;
9use tokio::process::Command;
10use tracing::warn;
11use wash_lib::cli::{stop::stop_hosts, CommandOutput, OutputKind};
12use wash_lib::config::{
13    create_nats_client_from_opts, downloads_dir, host_pid_file, DEFAULT_NATS_HOST,
14    DEFAULT_NATS_PORT,
15};
16use wash_lib::id::ServerId;
17use wash_lib::start::{nats_pid_path, NATS_SERVER_BINARY, WADM_PID};
18
19use crate::appearance::spinner::Spinner;
20use crate::config::{
21    DEFAULT_LATTICE, WASMCLOUD_CTL_CREDSFILE, WASMCLOUD_CTL_HOST, WASMCLOUD_CTL_JWT,
22    WASMCLOUD_CTL_PORT, WASMCLOUD_CTL_SEED, WASMCLOUD_CTL_TLS_CA_FILE, WASMCLOUD_CTL_TLS_FIRST,
23    WASMCLOUD_LATTICE,
24};
25
26#[derive(Parser, Debug, Clone, Default, clap::ValueEnum, Eq, PartialEq)]
27pub enum PurgeJetstream {
28    /// Don't purge any Jetstream data, the default
29    #[default]
30    None,
31    /// Purge all streams and KV buckets for wasmCloud and wadm
32    All,
33    /// Purge all streams and KV buckets for wadm, removing all application manifests
34    Wadm,
35    /// Purge all KV buckets for wasmCloud, removing all links and configuration data
36    Wasmcloud,
37}
38
39#[derive(Parser, Debug, Clone, Default)]
40pub struct DownCommand {
41    /// A lattice prefix is a unique identifier for a lattice, and is frequently used within NATS topics to isolate messages from different lattices
42    #[clap(
43            short = 'x',
44            long = "lattice",
45            default_value = DEFAULT_LATTICE,
46            env = WASMCLOUD_LATTICE,
47        )]
48    pub lattice: String,
49
50    /// An IP address or DNS name to use to connect to NATS for Control Interface (CTL) messages, defaults to the value supplied to --nats-host if not supplied
51    #[clap(long = "ctl-host", env = WASMCLOUD_CTL_HOST)]
52    pub ctl_host: Option<String>,
53
54    /// A port to use to connect to NATS for CTL messages, defaults to the value supplied to --nats-port if not supplied
55    #[clap(long = "ctl-port", env = WASMCLOUD_CTL_PORT)]
56    pub ctl_port: Option<u16>,
57
58    /// Convenience flag for CTL authentication, internally this parses the JWT and seed from the credsfile
59    #[clap(long = "ctl-credsfile", env = WASMCLOUD_CTL_CREDSFILE)]
60    pub ctl_credsfile: Option<PathBuf>,
61
62    /// A seed nkey to use to authenticate to NATS for CTL messages
63    #[clap(long = "ctl-seed", env = WASMCLOUD_CTL_SEED, requires = "ctl_jwt")]
64    pub ctl_seed: Option<String>,
65
66    /// A user JWT to use to authenticate to NATS for CTL messages
67    #[clap(long = "ctl-jwt", env = WASMCLOUD_CTL_JWT, requires = "ctl_seed")]
68    pub ctl_jwt: Option<String>,
69
70    /// A TLS CA file to use to authenticate to NATS for CTL messages
71    #[clap(long = "ctl-tls-ca-file", env = WASMCLOUD_CTL_TLS_CA_FILE)]
72    pub ctl_tls_ca_file: Option<PathBuf>,
73
74    /// Perform TLS handshake before expecting the server greeting
75    #[clap(long = "ctl-tls-first", env = WASMCLOUD_CTL_TLS_FIRST)]
76    pub ctl_tls_first: Option<bool>,
77
78    #[clap(long = "host-id")]
79    pub host_id: Option<ServerId>,
80
81    /// Shutdown all hosts running locally if launched with --multi-local
82    #[clap(long = "all")]
83    pub all: bool,
84
85    /// Purge NATS Jetstream storage and streams that persist when wasmCloud is stopped
86    #[clap(
87        long = "purge-jetstream",
88        alias = "flush-jetstream",
89        default_value = "none"
90    )]
91    pub purge: PurgeJetstream,
92}
93
94pub async fn handle_command(
95    command: DownCommand,
96    output_kind: OutputKind,
97) -> Result<CommandOutput> {
98    handle_down(command, output_kind).await
99}
100
101pub async fn handle_down(cmd: DownCommand, output_kind: OutputKind) -> Result<CommandOutput> {
102    let install_dir = downloads_dir()?;
103    let sp = Spinner::new(&output_kind)?;
104    sp.update_spinner_message(" Stopping wasmCloud ...".to_string());
105
106    let mut out_json = HashMap::new();
107    let mut out_text = String::from("");
108
109    let nats_client = create_nats_client_from_opts(
110        &cmd.ctl_host
111            .unwrap_or_else(|| DEFAULT_NATS_HOST.to_string()),
112        &cmd.ctl_port
113            .map(|port| port.to_string())
114            .unwrap_or_else(|| DEFAULT_NATS_PORT.to_string()),
115        cmd.ctl_jwt,
116        cmd.ctl_seed,
117        cmd.ctl_credsfile,
118        cmd.ctl_tls_ca_file,
119        cmd.ctl_tls_first.unwrap_or_default(),
120    )
121    .await;
122
123    if let Ok(client) = nats_client.as_ref() {
124        let ctl_client = wasmcloud_control_interface::ClientBuilder::new(client.clone())
125            .lattice(&cmd.lattice)
126            .auction_timeout(std::time::Duration::from_secs(2))
127            .build();
128        let host_id_string = cmd.host_id.map(|id| id.to_string());
129        let (hosts, hosts_remain) =
130            stop_hosts(ctl_client, host_id_string.as_ref(), cmd.all).await?;
131        out_json.insert("hosts_stopped".to_string(), json!(hosts));
132        out_text.push_str("✅ wasmCloud hosts stopped successfully\n");
133        if hosts_remain {
134            out_json.insert("nats_stopped".to_string(), json!(false));
135            out_json.insert("wadm_stopped".to_string(), json!(false));
136            out_text.push_str(
137                "🛁 Exiting without stopping NATS or wadm, there are still hosts running",
138            );
139            return Ok(CommandOutput::new(out_text, out_json));
140        } else {
141            let wasmcloud_pid_file_path = host_pid_file()?;
142            // Failing to find the pid file is not an error that should prevent stopping other resources
143            if let Err(e) = tokio::fs::remove_file(&wasmcloud_pid_file_path)
144                .await
145                .with_context(|| {
146                    format!(
147                        "failed to find wasmcloud pid file [{}]",
148                        wasmcloud_pid_file_path.display()
149                    )
150                })
151            {
152                warn!("{}", e.to_string());
153            }
154        }
155    } else {
156        warn!("Couldn't connect to NATS, unable to stop running hosts")
157    }
158
159    match stop_wadm(&install_dir).await {
160        Ok(_) => {
161            let pid_file_path = &install_dir.join(WADM_PID);
162            // Failing to find the pid file is not an error that should prevent stopping other resources
163            if let Err(e) = tokio::fs::remove_file(&pid_file_path)
164                .await
165                .with_context(|| {
166                    format!("failed to find WADM pid file [{}]", pid_file_path.display())
167                })
168            {
169                warn!("{}", e.to_string());
170            }
171
172            out_json.insert("wadm_stopped".to_string(), json!(true));
173            out_text.push_str("✅ wadm stopped successfully\n");
174        }
175        Err(e) => {
176            out_json.insert("wadm_stopped".to_string(), json!(false));
177            out_text.push_str(&format!("❌ Could not stop wadm: {e:?}\n"));
178        }
179    }
180
181    if nats_client
182        .as_ref()
183        .is_ok_and(|_| cmd.purge != PurgeJetstream::None)
184    {
185        sp.update_spinner_message(" Purging NATS Jetstream ...".to_string());
186        // SAFETY: nats_client is checked to be Ok() above
187        let client = nats_client.unwrap();
188        let js_client = async_nats::jetstream::new(client);
189
190        if cmd.purge == PurgeJetstream::All || cmd.purge == PurgeJetstream::Wasmcloud {
191            join_all(vec![
192                delete_kv_idempotent(&js_client, format!("CONFIGDATA_{}", &cmd.lattice)),
193                delete_kv_idempotent(&js_client, format!("LATTICEDATA_{}", &cmd.lattice)),
194            ])
195            .await
196            .iter()
197            .for_each(|result| {
198                if let Err(e) = result {
199                    out_text.push_str(&format!("❌ Error removing stream: {e:?}\n"));
200                }
201            });
202        }
203
204        if cmd.purge == PurgeJetstream::All || cmd.purge == PurgeJetstream::Wadm {
205            let kvs = join_all(vec![
206                delete_kv_idempotent(&js_client, "wadm_manifests"),
207                delete_kv_idempotent(&js_client, "wadm_state"),
208            ])
209            .await;
210
211            let streams = join_all(vec![
212                delete_stream_idempotent(&js_client, "wadm_commands"),
213                delete_stream_idempotent(&js_client, "wadm_events"),
214                delete_stream_idempotent(&js_client, "wadm_mirror"),
215                delete_stream_idempotent(&js_client, "wadm_notify"),
216                delete_stream_idempotent(&js_client, "wadm_status"),
217            ])
218            .await;
219
220            kvs.iter().chain(streams.iter()).for_each(|result| {
221                if let Err(e) = result {
222                    out_text.push_str(&format!("❌ Error removing stream: {e:?}\n"));
223                }
224            });
225        }
226
227        out_text.push_str("✅ NATS Jetstream purged successfully\n");
228    }
229
230    let nats_bin = install_dir.join(NATS_SERVER_BINARY);
231    if nats_bin.is_file() {
232        sp.update_spinner_message(" Stopping NATS server ...".to_string());
233        if let Err(e) = stop_nats(&install_dir, &nats_bin).await {
234            out_json.insert("nats_stopped".to_string(), json!(false));
235            out_text.push_str(&format!(
236                "❌ NATS server did not stop successfully: {e:?}\n"
237            ));
238        } else {
239            out_json.insert("nats_stopped".to_string(), json!(true));
240            out_text.push_str("✅ NATS server stopped successfully\n");
241        }
242    }
243
244    out_json.insert("success".to_string(), json!(true));
245    out_text.push_str("🛁 wash down completed successfully");
246
247    sp.finish_and_clear();
248    Ok(CommandOutput::new(out_text, out_json))
249}
250
251/// Helper function to send the nats-server the stop command
252pub async fn stop_nats<P>(work_dir: P, bin_path: P) -> Result<Output>
253where
254    P: AsRef<Path>,
255{
256    let bin_path = bin_path.as_ref();
257    let pid_file = nats_pid_path(work_dir.as_ref());
258    let signal = if pid_file.is_file() {
259        format!("stop={}", &pid_file.display())
260    } else {
261        return Err(anyhow::anyhow!(
262            "No pidfile found for nats-server, assuming it's managed externally"
263        ));
264    };
265    let output = Command::new(bin_path)
266        .arg("--signal")
267        .arg(signal)
268        .stdin(Stdio::null())
269        .output()
270        .await
271        .map_err(anyhow::Error::from);
272
273    // remove PID file
274    if pid_file.is_file() {
275        let _ = tokio::fs::remove_file(&pid_file).await;
276    }
277    output
278}
279
280/// Helper function to kill the wadm process
281pub async fn stop_wadm<P>(install_dir: P) -> Result<Output>
282where
283    P: AsRef<Path>,
284{
285    let wadm_pidfile_path = install_dir.as_ref().join(WADM_PID);
286    if let Ok(pid) = tokio::fs::read_to_string(&wadm_pidfile_path).await {
287        tokio::process::Command::new("kill")
288            .arg(pid)
289            .output()
290            .await
291            .map_err(|e| anyhow::anyhow!(e))
292    } else {
293        bail!("No pidfile found at [{}]", wadm_pidfile_path.display())
294    }
295}
296
297/// Delete a Jetstream stream, ignoring errors if the stream doesn't exist
298async fn delete_stream_idempotent(
299    js: &async_nats::jetstream::Context,
300    stream_name: impl AsRef<str>,
301) -> Result<()> {
302    match js.delete_stream(stream_name).await {
303        Ok(_) => Ok(()),
304        Err(e) if e.to_string().contains("stream not found") => Ok(()),
305        Err(e) => Err(anyhow::anyhow!(e)),
306    }
307}
308
309/// Delete a Jetstream key-value bucket, ignoring errors if the bucket doesn't exist
310async fn delete_kv_idempotent(
311    js: &async_nats::jetstream::Context,
312    key: impl AsRef<str>,
313) -> Result<()> {
314    match js.delete_key_value(key).await {
315        Ok(_) => Ok(()),
316        Err(e) if e.to_string().contains("stream not found") => Ok(()),
317        Err(e) => Err(anyhow::anyhow!(e)),
318    }
319}