1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3use std::process::{Output, Stdio};
4
5use anyhow::{bail, Context as _, Result};
6use clap::Parser;
7use futures::future::join_all;
8use serde_json::json;
9use tokio::process::Command;
10use tracing::warn;
11use wash_lib::cli::{stop::stop_hosts, CommandOutput, OutputKind};
12use wash_lib::config::{
13 create_nats_client_from_opts, downloads_dir, host_pid_file, DEFAULT_NATS_HOST,
14 DEFAULT_NATS_PORT,
15};
16use wash_lib::id::ServerId;
17use wash_lib::start::{nats_pid_path, NATS_SERVER_BINARY, WADM_PID};
18
19use crate::appearance::spinner::Spinner;
20use crate::config::{
21 DEFAULT_LATTICE, WASMCLOUD_CTL_CREDSFILE, WASMCLOUD_CTL_HOST, WASMCLOUD_CTL_JWT,
22 WASMCLOUD_CTL_PORT, WASMCLOUD_CTL_SEED, WASMCLOUD_CTL_TLS_CA_FILE, WASMCLOUD_CTL_TLS_FIRST,
23 WASMCLOUD_LATTICE,
24};
25
26#[derive(Parser, Debug, Clone, Default, clap::ValueEnum, Eq, PartialEq)]
27pub enum PurgeJetstream {
28 #[default]
30 None,
31 All,
33 Wadm,
35 Wasmcloud,
37}
38
39#[derive(Parser, Debug, Clone, Default)]
40pub struct DownCommand {
41 #[clap(
43 short = 'x',
44 long = "lattice",
45 default_value = DEFAULT_LATTICE,
46 env = WASMCLOUD_LATTICE,
47 )]
48 pub lattice: String,
49
50 #[clap(long = "ctl-host", env = WASMCLOUD_CTL_HOST)]
52 pub ctl_host: Option<String>,
53
54 #[clap(long = "ctl-port", env = WASMCLOUD_CTL_PORT)]
56 pub ctl_port: Option<u16>,
57
58 #[clap(long = "ctl-credsfile", env = WASMCLOUD_CTL_CREDSFILE)]
60 pub ctl_credsfile: Option<PathBuf>,
61
62 #[clap(long = "ctl-seed", env = WASMCLOUD_CTL_SEED, requires = "ctl_jwt")]
64 pub ctl_seed: Option<String>,
65
66 #[clap(long = "ctl-jwt", env = WASMCLOUD_CTL_JWT, requires = "ctl_seed")]
68 pub ctl_jwt: Option<String>,
69
70 #[clap(long = "ctl-tls-ca-file", env = WASMCLOUD_CTL_TLS_CA_FILE)]
72 pub ctl_tls_ca_file: Option<PathBuf>,
73
74 #[clap(long = "ctl-tls-first", env = WASMCLOUD_CTL_TLS_FIRST)]
76 pub ctl_tls_first: Option<bool>,
77
78 #[clap(long = "host-id")]
79 pub host_id: Option<ServerId>,
80
81 #[clap(long = "all")]
83 pub all: bool,
84
85 #[clap(
87 long = "purge-jetstream",
88 alias = "flush-jetstream",
89 default_value = "none"
90 )]
91 pub purge: PurgeJetstream,
92}
93
94pub async fn handle_command(
95 command: DownCommand,
96 output_kind: OutputKind,
97) -> Result<CommandOutput> {
98 handle_down(command, output_kind).await
99}
100
101pub async fn handle_down(cmd: DownCommand, output_kind: OutputKind) -> Result<CommandOutput> {
102 let install_dir = downloads_dir()?;
103 let sp = Spinner::new(&output_kind)?;
104 sp.update_spinner_message(" Stopping wasmCloud ...".to_string());
105
106 let mut out_json = HashMap::new();
107 let mut out_text = String::from("");
108
109 let nats_client = create_nats_client_from_opts(
110 &cmd.ctl_host
111 .unwrap_or_else(|| DEFAULT_NATS_HOST.to_string()),
112 &cmd.ctl_port
113 .map(|port| port.to_string())
114 .unwrap_or_else(|| DEFAULT_NATS_PORT.to_string()),
115 cmd.ctl_jwt,
116 cmd.ctl_seed,
117 cmd.ctl_credsfile,
118 cmd.ctl_tls_ca_file,
119 cmd.ctl_tls_first.unwrap_or_default(),
120 )
121 .await;
122
123 if let Ok(client) = nats_client.as_ref() {
124 let ctl_client = wasmcloud_control_interface::ClientBuilder::new(client.clone())
125 .lattice(&cmd.lattice)
126 .auction_timeout(std::time::Duration::from_secs(2))
127 .build();
128 let host_id_string = cmd.host_id.map(|id| id.to_string());
129 let (hosts, hosts_remain) =
130 stop_hosts(ctl_client, host_id_string.as_ref(), cmd.all).await?;
131 out_json.insert("hosts_stopped".to_string(), json!(hosts));
132 out_text.push_str("✅ wasmCloud hosts stopped successfully\n");
133 if hosts_remain {
134 out_json.insert("nats_stopped".to_string(), json!(false));
135 out_json.insert("wadm_stopped".to_string(), json!(false));
136 out_text.push_str(
137 "🛁 Exiting without stopping NATS or wadm, there are still hosts running",
138 );
139 return Ok(CommandOutput::new(out_text, out_json));
140 } else {
141 let wasmcloud_pid_file_path = host_pid_file()?;
142 if let Err(e) = tokio::fs::remove_file(&wasmcloud_pid_file_path)
144 .await
145 .with_context(|| {
146 format!(
147 "failed to find wasmcloud pid file [{}]",
148 wasmcloud_pid_file_path.display()
149 )
150 })
151 {
152 warn!("{}", e.to_string());
153 }
154 }
155 } else {
156 warn!("Couldn't connect to NATS, unable to stop running hosts")
157 }
158
159 match stop_wadm(&install_dir).await {
160 Ok(_) => {
161 let pid_file_path = &install_dir.join(WADM_PID);
162 if let Err(e) = tokio::fs::remove_file(&pid_file_path)
164 .await
165 .with_context(|| {
166 format!("failed to find WADM pid file [{}]", pid_file_path.display())
167 })
168 {
169 warn!("{}", e.to_string());
170 }
171
172 out_json.insert("wadm_stopped".to_string(), json!(true));
173 out_text.push_str("✅ wadm stopped successfully\n");
174 }
175 Err(e) => {
176 out_json.insert("wadm_stopped".to_string(), json!(false));
177 out_text.push_str(&format!("❌ Could not stop wadm: {e:?}\n"));
178 }
179 }
180
181 if nats_client
182 .as_ref()
183 .is_ok_and(|_| cmd.purge != PurgeJetstream::None)
184 {
185 sp.update_spinner_message(" Purging NATS Jetstream ...".to_string());
186 let client = nats_client.unwrap();
188 let js_client = async_nats::jetstream::new(client);
189
190 if cmd.purge == PurgeJetstream::All || cmd.purge == PurgeJetstream::Wasmcloud {
191 join_all(vec![
192 delete_kv_idempotent(&js_client, format!("CONFIGDATA_{}", &cmd.lattice)),
193 delete_kv_idempotent(&js_client, format!("LATTICEDATA_{}", &cmd.lattice)),
194 ])
195 .await
196 .iter()
197 .for_each(|result| {
198 if let Err(e) = result {
199 out_text.push_str(&format!("❌ Error removing stream: {e:?}\n"));
200 }
201 });
202 }
203
204 if cmd.purge == PurgeJetstream::All || cmd.purge == PurgeJetstream::Wadm {
205 let kvs = join_all(vec![
206 delete_kv_idempotent(&js_client, "wadm_manifests"),
207 delete_kv_idempotent(&js_client, "wadm_state"),
208 ])
209 .await;
210
211 let streams = join_all(vec![
212 delete_stream_idempotent(&js_client, "wadm_commands"),
213 delete_stream_idempotent(&js_client, "wadm_events"),
214 delete_stream_idempotent(&js_client, "wadm_mirror"),
215 delete_stream_idempotent(&js_client, "wadm_notify"),
216 delete_stream_idempotent(&js_client, "wadm_status"),
217 ])
218 .await;
219
220 kvs.iter().chain(streams.iter()).for_each(|result| {
221 if let Err(e) = result {
222 out_text.push_str(&format!("❌ Error removing stream: {e:?}\n"));
223 }
224 });
225 }
226
227 out_text.push_str("✅ NATS Jetstream purged successfully\n");
228 }
229
230 let nats_bin = install_dir.join(NATS_SERVER_BINARY);
231 if nats_bin.is_file() {
232 sp.update_spinner_message(" Stopping NATS server ...".to_string());
233 if let Err(e) = stop_nats(&install_dir, &nats_bin).await {
234 out_json.insert("nats_stopped".to_string(), json!(false));
235 out_text.push_str(&format!(
236 "❌ NATS server did not stop successfully: {e:?}\n"
237 ));
238 } else {
239 out_json.insert("nats_stopped".to_string(), json!(true));
240 out_text.push_str("✅ NATS server stopped successfully\n");
241 }
242 }
243
244 out_json.insert("success".to_string(), json!(true));
245 out_text.push_str("🛁 wash down completed successfully");
246
247 sp.finish_and_clear();
248 Ok(CommandOutput::new(out_text, out_json))
249}
250
251pub async fn stop_nats<P>(work_dir: P, bin_path: P) -> Result<Output>
253where
254 P: AsRef<Path>,
255{
256 let bin_path = bin_path.as_ref();
257 let pid_file = nats_pid_path(work_dir.as_ref());
258 let signal = if pid_file.is_file() {
259 format!("stop={}", &pid_file.display())
260 } else {
261 return Err(anyhow::anyhow!(
262 "No pidfile found for nats-server, assuming it's managed externally"
263 ));
264 };
265 let output = Command::new(bin_path)
266 .arg("--signal")
267 .arg(signal)
268 .stdin(Stdio::null())
269 .output()
270 .await
271 .map_err(anyhow::Error::from);
272
273 if pid_file.is_file() {
275 let _ = tokio::fs::remove_file(&pid_file).await;
276 }
277 output
278}
279
280pub async fn stop_wadm<P>(install_dir: P) -> Result<Output>
282where
283 P: AsRef<Path>,
284{
285 let wadm_pidfile_path = install_dir.as_ref().join(WADM_PID);
286 if let Ok(pid) = tokio::fs::read_to_string(&wadm_pidfile_path).await {
287 tokio::process::Command::new("kill")
288 .arg(pid)
289 .output()
290 .await
291 .map_err(|e| anyhow::anyhow!(e))
292 } else {
293 bail!("No pidfile found at [{}]", wadm_pidfile_path.display())
294 }
295}
296
297async fn delete_stream_idempotent(
299 js: &async_nats::jetstream::Context,
300 stream_name: impl AsRef<str>,
301) -> Result<()> {
302 match js.delete_stream(stream_name).await {
303 Ok(_) => Ok(()),
304 Err(e) if e.to_string().contains("stream not found") => Ok(()),
305 Err(e) => Err(anyhow::anyhow!(e)),
306 }
307}
308
309async fn delete_kv_idempotent(
311 js: &async_nats::jetstream::Context,
312 key: impl AsRef<str>,
313) -> Result<()> {
314 match js.delete_key_value(key).await {
315 Ok(_) => Ok(()),
316 Err(e) if e.to_string().contains("stream not found") => Ok(()),
317 Err(e) => Err(anyhow::anyhow!(e)),
318 }
319}