use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::process::{Output, Stdio};
use anyhow::{bail, Context as _, Result};
use clap::Parser;
use futures::future::join_all;
use serde_json::json;
use tokio::process::Command;
use tracing::warn;
use wash_lib::cli::{stop::stop_hosts, CommandOutput, OutputKind};
use wash_lib::config::{
create_nats_client_from_opts, downloads_dir, host_pid_file, DEFAULT_NATS_HOST,
DEFAULT_NATS_PORT,
};
use wash_lib::id::ServerId;
use wash_lib::start::{nats_pid_path, NATS_SERVER_BINARY, WADM_PID};
use crate::appearance::spinner::Spinner;
use crate::config::{
DEFAULT_LATTICE, WASMCLOUD_CTL_CREDSFILE, WASMCLOUD_CTL_HOST, WASMCLOUD_CTL_JWT,
WASMCLOUD_CTL_PORT, WASMCLOUD_CTL_SEED, WASMCLOUD_CTL_TLS_CA_FILE, WASMCLOUD_LATTICE,
};
#[derive(Parser, Debug, Clone, Default, clap::ValueEnum, Eq, PartialEq)]
pub enum PurgeJetstream {
#[default]
None,
All,
Wadm,
Wasmcloud,
}
#[derive(Parser, Debug, Clone, Default)]
pub struct DownCommand {
#[clap(
short = 'x',
long = "lattice",
default_value = DEFAULT_LATTICE,
env = WASMCLOUD_LATTICE,
)]
pub lattice: String,
#[clap(long = "ctl-host", env = WASMCLOUD_CTL_HOST)]
pub ctl_host: Option<String>,
#[clap(long = "ctl-port", env = WASMCLOUD_CTL_PORT)]
pub ctl_port: Option<u16>,
#[clap(long = "ctl-credsfile", env = WASMCLOUD_CTL_CREDSFILE)]
pub ctl_credsfile: Option<PathBuf>,
#[clap(long = "ctl-seed", env = WASMCLOUD_CTL_SEED, requires = "ctl_jwt")]
pub ctl_seed: Option<String>,
#[clap(long = "ctl-jwt", env = WASMCLOUD_CTL_JWT, requires = "ctl_seed")]
pub ctl_jwt: Option<String>,
#[clap(long = "ctl-tls-ca-file", env = WASMCLOUD_CTL_TLS_CA_FILE)]
pub ctl_tls_ca_file: Option<PathBuf>,
#[clap(long = "host-id")]
pub host_id: Option<ServerId>,
#[clap(long = "all")]
pub all: bool,
#[clap(
long = "purge-jetstream",
alias = "flush-jetstream",
default_value = "none"
)]
pub purge: PurgeJetstream,
}
pub async fn handle_command(
command: DownCommand,
output_kind: OutputKind,
) -> Result<CommandOutput> {
handle_down(command, output_kind).await
}
pub async fn handle_down(cmd: DownCommand, output_kind: OutputKind) -> Result<CommandOutput> {
let install_dir = downloads_dir()?;
let sp = Spinner::new(&output_kind)?;
sp.update_spinner_message(" Stopping wasmCloud ...".to_string());
let mut out_json = HashMap::new();
let mut out_text = String::from("");
let nats_client = create_nats_client_from_opts(
&cmd.ctl_host
.unwrap_or_else(|| DEFAULT_NATS_HOST.to_string()),
&cmd.ctl_port
.map(|port| port.to_string())
.unwrap_or_else(|| DEFAULT_NATS_PORT.to_string()),
cmd.ctl_jwt,
cmd.ctl_seed,
cmd.ctl_credsfile,
cmd.ctl_tls_ca_file,
)
.await;
if let Ok(client) = nats_client.as_ref() {
let ctl_client = wasmcloud_control_interface::ClientBuilder::new(client.clone())
.lattice(&cmd.lattice)
.auction_timeout(std::time::Duration::from_secs(2))
.build();
let host_id_string = cmd.host_id.map(|id| id.to_string());
let (hosts, hosts_remain) =
stop_hosts(ctl_client, host_id_string.as_ref(), cmd.all).await?;
out_json.insert("hosts_stopped".to_string(), json!(hosts));
out_text.push_str("✅ wasmCloud hosts stopped successfully\n");
if hosts_remain {
out_json.insert("nats_stopped".to_string(), json!(false));
out_json.insert("wadm_stopped".to_string(), json!(false));
out_text.push_str(
"🛁 Exiting without stopping NATS or wadm, there are still hosts running",
);
return Ok(CommandOutput::new(out_text, out_json));
} else {
let wasmcloud_pid_file_path = host_pid_file()?;
if let Err(e) = tokio::fs::remove_file(&wasmcloud_pid_file_path)
.await
.with_context(|| {
format!(
"failed to find wasmcloud pid file [{}]",
wasmcloud_pid_file_path.display()
)
})
{
warn!("{}", e.to_string());
}
}
} else {
warn!("Couldn't connect to NATS, unable to stop running hosts")
}
match stop_wadm(&install_dir).await {
Ok(_) => {
let pid_file_path = &install_dir.join(WADM_PID);
if let Err(e) = tokio::fs::remove_file(&pid_file_path)
.await
.with_context(|| {
format!("failed to find WADM pid file [{}]", pid_file_path.display())
})
{
warn!("{}", e.to_string());
}
out_json.insert("wadm_stopped".to_string(), json!(true));
out_text.push_str("✅ wadm stopped successfully\n");
}
Err(e) => {
out_json.insert("wadm_stopped".to_string(), json!(false));
out_text.push_str(&format!("❌ Could not stop wadm: {e:?}\n"));
}
}
if nats_client
.as_ref()
.is_ok_and(|_| cmd.purge != PurgeJetstream::None)
{
sp.update_spinner_message(" Purging NATS Jetstream ...".to_string());
let client = nats_client.unwrap();
let js_client = async_nats::jetstream::new(client);
if cmd.purge == PurgeJetstream::All || cmd.purge == PurgeJetstream::Wasmcloud {
join_all(vec![
delete_kv_idempotent(&js_client, format!("CONFIGDATA_{}", &cmd.lattice)),
delete_kv_idempotent(&js_client, format!("LATTICEDATA_{}", &cmd.lattice)),
])
.await
.iter()
.for_each(|result| {
if let Err(e) = result {
out_text.push_str(&format!("❌ Error removing stream: {e:?}\n"));
}
});
}
if cmd.purge == PurgeJetstream::All || cmd.purge == PurgeJetstream::Wadm {
let kvs = join_all(vec![
delete_kv_idempotent(&js_client, "wadm_manifests"),
delete_kv_idempotent(&js_client, "wadm_state"),
])
.await;
let streams = join_all(vec![
delete_stream_idempotent(&js_client, "wadm_commands"),
delete_stream_idempotent(&js_client, "wadm_events"),
delete_stream_idempotent(&js_client, "wadm_mirror"),
delete_stream_idempotent(&js_client, "wadm_notify"),
delete_stream_idempotent(&js_client, "wadm_status"),
])
.await;
kvs.iter().chain(streams.iter()).for_each(|result| {
if let Err(e) = result {
out_text.push_str(&format!("❌ Error removing stream: {e:?}\n"));
}
});
}
out_text.push_str("✅ NATS Jetstream purged successfully\n");
}
let nats_bin = install_dir.join(NATS_SERVER_BINARY);
if nats_bin.is_file() {
sp.update_spinner_message(" Stopping NATS server ...".to_string());
if let Err(e) = stop_nats(&install_dir, &nats_bin).await {
out_json.insert("nats_stopped".to_string(), json!(false));
out_text.push_str(&format!(
"❌ NATS server did not stop successfully: {e:?}\n"
));
} else {
out_json.insert("nats_stopped".to_string(), json!(true));
out_text.push_str("✅ NATS server stopped successfully\n");
}
}
out_json.insert("success".to_string(), json!(true));
out_text.push_str("🛁 wash down completed successfully");
sp.finish_and_clear();
Ok(CommandOutput::new(out_text, out_json))
}
pub async fn stop_nats<P>(work_dir: P, bin_path: P) -> Result<Output>
where
P: AsRef<Path>,
{
let bin_path = bin_path.as_ref();
let pid_file = nats_pid_path(work_dir.as_ref());
let signal = if pid_file.is_file() {
format!("stop={}", &pid_file.display())
} else {
return Err(anyhow::anyhow!(
"No pidfile found for nats-server, assuming it's managed externally"
));
};
let output = Command::new(bin_path)
.arg("--signal")
.arg(signal)
.stdin(Stdio::null())
.output()
.await
.map_err(anyhow::Error::from);
if pid_file.is_file() {
let _ = tokio::fs::remove_file(&pid_file).await;
}
output
}
pub async fn stop_wadm<P>(install_dir: P) -> Result<Output>
where
P: AsRef<Path>,
{
let wadm_pidfile_path = install_dir.as_ref().join(WADM_PID);
if let Ok(pid) = tokio::fs::read_to_string(&wadm_pidfile_path).await {
tokio::process::Command::new("kill")
.arg(pid)
.output()
.await
.map_err(|e| anyhow::anyhow!(e))
} else {
bail!("No pidfile found at [{}]", wadm_pidfile_path.display())
}
}
async fn delete_stream_idempotent(
js: &async_nats::jetstream::Context,
stream_name: impl AsRef<str>,
) -> Result<()> {
match js.delete_stream(stream_name).await {
Ok(_) => Ok(()),
Err(e) if e.to_string().contains("stream not found") => Ok(()),
Err(e) => Err(anyhow::anyhow!(e)),
}
}
async fn delete_kv_idempotent(
js: &async_nats::jetstream::Context,
key: impl AsRef<str>,
) -> Result<()> {
match js.delete_key_value(key).await {
Ok(_) => Ok(()),
Err(e) if e.to_string().contains("stream not found") => Ok(()),
Err(e) => Err(anyhow::anyhow!(e)),
}
}