use anyhow::{anyhow, bail, Context, Result};
use async_nats_0_33::Client;
use clap::Parser;
use serde_json::{json, Value};
use std::collections::HashMap;
use std::fmt::Write;
use std::io::ErrorKind;
use std::path::{Path, PathBuf};
use std::process::Stdio;
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
use sysinfo::{System, SystemExt};
use tokio::fs::create_dir_all;
use tokio::{
io::{AsyncBufReadExt, BufReader},
process::Child,
};
use tracing::warn;
use wash_lib::app::{load_app_manifest, AppManifest, AppManifestSource};
use wash_lib::cli::{CommandOutput, OutputKind};
use wash_lib::config::{
create_nats_client_from_opts, downloads_dir, host_pid_file, DEFAULT_NATS_TIMEOUT_MS,
};
use wash_lib::context::fs::ContextDir;
use wash_lib::context::ContextManager;
use wash_lib::start::{
ensure_nats_server, ensure_wadm, ensure_wasmcloud, find_wasmcloud_binary, nats_pid_path,
start_nats_server, start_wadm, start_wasmcloud_host, NatsConfig, WadmConfig, WADM_PID,
};
use wasmcloud_control_interface::{Client as CtlClient, ClientBuilder as CtlClientBuilder};
use crate::app::deploy_model_from_manifest;
use crate::appearance::spinner::Spinner;
use crate::down::stop_nats;
mod config;
mod credsfile;
pub use config::*;
#[derive(Parser, Debug, Clone)]
pub struct UpCommand {
#[clap(short = 'd', long = "detached", alias = "detach")]
pub detached: bool,
#[clap(flatten)]
pub nats_opts: NatsOpts,
#[clap(flatten)]
pub wasmcloud_opts: WasmcloudOpts,
#[clap(flatten)]
pub wadm_opts: WadmOpts,
}
#[derive(Parser, Debug, Clone)]
pub struct NatsOpts {
#[clap(
long = "nats-credsfile",
env = "NATS_CREDSFILE",
requires = "nats_remote_url"
)]
pub nats_credsfile: Option<PathBuf>,
#[clap(
long = "nats-config-file",
env = "NATS_CONFIG",
requires = "nats_host",
requires = "nats_port"
)]
pub nats_configfile: Option<PathBuf>,
#[clap(long = "nats-remote-url", env = "NATS_REMOTE_URL")]
pub nats_remote_url: Option<String>,
#[clap(
long = "nats-connect-only",
env = "NATS_CONNECT_ONLY",
conflicts_with = "nats_remote_url"
)]
pub connect_only: bool,
#[clap(long = "nats-version", default_value = NATS_SERVER_VERSION, env = "NATS_VERSION")]
pub nats_version: String,
#[clap(long = "nats-host", env = "WASMCLOUD_NATS_HOST")]
pub nats_host: Option<String>,
#[clap(long = "nats-port", env = "WASMCLOUD_NATS_PORT")]
pub nats_port: Option<u16>,
#[clap(
long = "nats-websocket-port",
env = "NATS_WEBSOCKET_PORT",
default_value = DEFAULT_NATS_WEBSOCKET_PORT
)]
pub nats_websocket_port: u16,
#[clap(long = "nats-js-domain", env = "NATS_JS_DOMAIN")]
pub nats_js_domain: Option<String>,
}
impl From<NatsOpts> for NatsConfig {
fn from(other: NatsOpts) -> NatsConfig {
let host = other
.nats_host
.unwrap_or_else(|| DEFAULT_NATS_HOST.to_string());
let port = other.nats_port.unwrap_or_else(|| {
DEFAULT_NATS_PORT
.parse()
.expect("failed to parse default NATS port")
});
NatsConfig {
host,
port,
store_dir: std::env::temp_dir().join(format!("wash-jetstream-{port}")),
js_domain: other.nats_js_domain,
remote_url: other.nats_remote_url,
credentials: other.nats_credsfile,
websocket_port: other.nats_websocket_port,
config_path: other.nats_configfile,
}
}
}
#[derive(Parser, Debug, Clone)]
pub struct WasmcloudOpts {
#[clap(long = "wasmcloud-version", default_value = WASMCLOUD_HOST_VERSION, env = "WASMCLOUD_VERSION")]
pub wasmcloud_version: String,
#[clap(
short = 'x',
long = "lattice",
env = WASMCLOUD_LATTICE
)]
pub lattice: Option<String>,
#[clap(long = "host-seed", env = WASMCLOUD_HOST_SEED)]
pub host_seed: Option<String>,
#[clap(long = "rpc-host", env = WASMCLOUD_RPC_HOST)]
pub rpc_host: Option<String>,
#[clap(long = "rpc-port", env = WASMCLOUD_RPC_PORT)]
pub rpc_port: Option<u16>,
#[clap(long = "rpc-seed", env = WASMCLOUD_RPC_SEED, requires = "rpc_jwt")]
pub rpc_seed: Option<String>,
#[clap(long = "rpc-timeout-ms", default_value = DEFAULT_RPC_TIMEOUT_MS, env = WASMCLOUD_RPC_TIMEOUT_MS)]
pub rpc_timeout_ms: Option<u64>,
#[clap(long = "rpc-jwt", env = WASMCLOUD_RPC_JWT, requires = "rpc_seed")]
pub rpc_jwt: Option<String>,
#[clap(long = "rpc-tls", env = WASMCLOUD_RPC_TLS)]
pub rpc_tls: bool,
#[clap(long = "rpc-tls-ca-file", env = WASMCLOUD_RPC_TLS_CA_FILE)]
pub rpc_tls_ca_file: Option<PathBuf>,
#[clap(long = "rpc-credsfile", env = WASMCLOUD_RPC_CREDSFILE)]
pub rpc_credsfile: Option<PathBuf>,
#[clap(long = "ctl-host", env = WASMCLOUD_CTL_HOST)]
pub ctl_host: Option<String>,
#[clap(long = "ctl-port", env = WASMCLOUD_CTL_PORT)]
pub ctl_port: Option<u16>,
#[clap(long = "ctl-seed", env = WASMCLOUD_CTL_SEED, requires = "ctl_jwt")]
pub ctl_seed: Option<String>,
#[clap(long = "ctl-jwt", env = WASMCLOUD_CTL_JWT, requires = "ctl_seed")]
pub ctl_jwt: Option<String>,
#[clap(long = "ctl-credsfile", env = WASMCLOUD_CTL_CREDSFILE)]
pub ctl_credsfile: Option<PathBuf>,
#[clap(long = "ctl-tls", env = WASMCLOUD_CTL_TLS)]
pub ctl_tls: bool,
#[clap(long = "ctl-tls-ca-file", env = WASMCLOUD_CTL_TLS_CA_FILE)]
pub ctl_tls_ca_file: Option<PathBuf>,
#[clap(long = "cluster-seed", env = WASMCLOUD_CLUSTER_SEED)]
pub cluster_seed: Option<String>,
#[clap(long = "cluster-issuers", env = WASMCLOUD_CLUSTER_ISSUERS)]
pub cluster_issuers: Option<Vec<String>>,
#[clap(long = "provider-delay", default_value = DEFAULT_PROV_SHUTDOWN_DELAY_MS, env = WASMCLOUD_PROV_SHUTDOWN_DELAY_MS)]
pub provider_delay: u32,
#[clap(long = "allow-latest", env = WASMCLOUD_OCI_ALLOW_LATEST)]
pub allow_latest: bool,
#[clap(long = "allowed-insecure", env = WASMCLOUD_OCI_ALLOWED_INSECURE)]
pub allowed_insecure: Option<Vec<String>>,
#[clap(long = "wasmcloud-js-domain", env = WASMCLOUD_JS_DOMAIN)]
pub wasmcloud_js_domain: Option<String>,
#[clap(long = "config-service-enabled", env = WASMCLOUD_CONFIG_SERVICE)]
pub config_service_enabled: bool,
#[clap(long = "allow-file-load", default_value = DEFAULT_ALLOW_FILE_LOAD, env = WASMCLOUD_ALLOW_FILE_LOAD)]
pub allow_file_load: Option<bool>,
#[clap(
long = "enable-structured-logging",
env = WASMCLOUD_STRUCTURED_LOGGING_ENABLED
)]
pub enable_structured_logging: bool,
#[clap(short = 'l', long = "label", alias = "labels")]
pub label: Option<Vec<String>>,
#[clap(long = "log-level", alias = "structured-log-level", default_value = DEFAULT_STRUCTURED_LOG_LEVEL, env = WASMCLOUD_LOG_LEVEL)]
pub structured_log_level: String,
#[clap(long = "enable-ipv6", env = WASMCLOUD_ENABLE_IPV6)]
pub enable_ipv6: bool,
#[clap(long = "wasmcloud-start-only")]
pub start_only: bool,
#[clap(long = "multi-local")]
pub multi_local: bool,
#[clap(long = "max-execution-time-ms", alias = "max-time-ms", env = WASMCLOUD_MAX_EXECUTION_TIME_MS, default_value = DEFAULT_MAX_EXECUTION_TIME_MS)]
pub max_execution_time: u64,
#[clap(long = "secrets-topic", env = WASMCLOUD_SECRETS_TOPIC)]
pub secrets_topic: Option<String>,
#[clap(long = "policy-topic", env = WASMCLOUD_POLICY_TOPIC)]
pub policy_topic: Option<String>,
}
impl WasmcloudOpts {
pub async fn into_ctl_client(self, auction_timeout_ms: Option<u64>) -> Result<CtlClient> {
let lattice = self.lattice.unwrap_or_else(|| DEFAULT_LATTICE.to_string());
let ctl_host = self
.ctl_host
.unwrap_or_else(|| DEFAULT_NATS_HOST.to_string());
let ctl_port = self
.ctl_port
.map(|p| p.to_string())
.unwrap_or_else(|| DEFAULT_NATS_PORT.to_string())
.to_string();
let auction_timeout_ms = auction_timeout_ms.unwrap_or(DEFAULT_NATS_TIMEOUT_MS);
let nc = create_nats_client_from_opts(
&ctl_host,
&ctl_port,
self.ctl_jwt,
self.ctl_seed,
self.ctl_credsfile,
self.ctl_tls_ca_file,
)
.await
.context("Failed to create NATS client")?;
let mut builder = CtlClientBuilder::new(nc)
.lattice(lattice)
.auction_timeout(tokio::time::Duration::from_millis(auction_timeout_ms));
if let Some(rpc_timeout_ms) = self.rpc_timeout_ms {
builder = builder.timeout(tokio::time::Duration::from_millis(rpc_timeout_ms))
}
if let Ok(topic_prefix) = std::env::var("WASMCLOUD_CTL_TOPIC_PREFIX") {
builder = builder.topic_prefix(topic_prefix);
}
let ctl_client = builder.build();
Ok(ctl_client)
}
}
#[derive(Parser, Debug, Clone)]
pub struct WadmOpts {
#[clap(long = "wadm-version", default_value = WADM_VERSION, env = "WADM_VERSION")]
pub wadm_version: String,
#[clap(long = "disable-wadm")]
pub disable_wadm: bool,
#[clap(long = "wadm-js-domain", env = "WADM_JS_DOMAIN")]
pub wadm_js_domain: Option<String>,
#[clap(long = "wadm-manifest", env = "WADM_MANIFEST")]
pub wadm_manifest: Option<PathBuf>,
}
#[derive(Debug, PartialEq, PartialOrd)]
enum WasmCloudHostState {
NotRunning,
Starting,
Running,
MultipleRunning,
}
pub async fn handle_command(command: UpCommand, output_kind: OutputKind) -> Result<CommandOutput> {
handle_up(command, output_kind).await
}
pub async fn handle_up(cmd: UpCommand, output_kind: OutputKind) -> Result<CommandOutput> {
let install_dir = downloads_dir()?;
create_dir_all(&install_dir).await?;
let spinner = Spinner::new(&output_kind)?;
let ctx = ContextDir::new()?
.load_default_context()
.context("failed to load context")?;
let nats_host = cmd.nats_opts.nats_host.clone().unwrap_or(ctx.ctl_host);
let nats_port = cmd.nats_opts.nats_port.unwrap_or(ctx.ctl_port);
let wasmcloud_opts = WasmcloudOpts {
lattice: Some(cmd.wasmcloud_opts.lattice.unwrap_or(ctx.lattice)),
ctl_host: Some(cmd.wasmcloud_opts.ctl_host.unwrap_or(nats_host.clone())),
ctl_port: Some(cmd.wasmcloud_opts.ctl_port.unwrap_or(nats_port)),
ctl_jwt: cmd.wasmcloud_opts.ctl_jwt.or(ctx.ctl_jwt),
ctl_seed: cmd.wasmcloud_opts.ctl_seed.or(ctx.ctl_seed),
ctl_credsfile: cmd.wasmcloud_opts.ctl_credsfile.or(ctx.ctl_credsfile),
rpc_host: Some(cmd.wasmcloud_opts.rpc_host.unwrap_or(nats_host.clone())),
rpc_port: Some(cmd.wasmcloud_opts.rpc_port.unwrap_or(nats_port)),
rpc_timeout_ms: Some(cmd.wasmcloud_opts.rpc_timeout_ms.unwrap_or(ctx.rpc_timeout)),
rpc_jwt: cmd.wasmcloud_opts.rpc_jwt.or(ctx.rpc_jwt),
rpc_seed: cmd.wasmcloud_opts.rpc_seed.or(ctx.rpc_seed),
rpc_credsfile: cmd.wasmcloud_opts.rpc_credsfile.or(ctx.rpc_credsfile),
max_execution_time: cmd.wasmcloud_opts.max_execution_time,
secrets_topic: cmd.wasmcloud_opts.secrets_topic,
policy_topic: cmd.wasmcloud_opts.policy_topic,
cluster_seed: cmd
.wasmcloud_opts
.cluster_seed
.or_else(|| ctx.cluster_seed.map(|seed| seed.to_string())),
wasmcloud_js_domain: cmd.wasmcloud_opts.wasmcloud_js_domain.or(ctx.js_domain),
..cmd.wasmcloud_opts
};
let host_env = configure_host_env(wasmcloud_opts.clone()).await?;
let nats_listen_address = format!("{nats_host}:{nats_port}");
let nats_client = nats_client_from_wasmcloud_opts(&wasmcloud_opts).await;
let should_run_nats = !cmd.nats_opts.connect_only && nats_client.is_err();
let supplied_remote_credentials = cmd.nats_opts.nats_remote_url.is_some();
let nats_bin = if should_run_nats || supplied_remote_credentials {
spinner.update_spinner_message(" Downloading NATS ...".to_string());
let nats_binary = ensure_nats_server(&cmd.nats_opts.nats_version, &install_dir).await?;
spinner.update_spinner_message(" Starting NATS ...".to_string());
let nats_config = NatsConfig {
host: nats_host.clone(),
port: nats_port,
store_dir: std::env::temp_dir().join(format!("wash-jetstream-{nats_port}")),
js_domain: cmd.nats_opts.nats_js_domain,
remote_url: cmd.nats_opts.nats_remote_url,
credentials: cmd.nats_opts.nats_credsfile.clone(),
websocket_port: cmd.nats_opts.nats_websocket_port,
config_path: cmd.nats_opts.nats_configfile,
};
start_nats(&install_dir, &nats_binary, nats_config).await?;
Some(nats_binary)
} else {
None
};
let client = nats_client_from_wasmcloud_opts(&wasmcloud_opts).await?;
let mut out_json = HashMap::new();
let mut out_text = String::from("");
let lattice = wasmcloud_opts
.clone()
.lattice
.context("missing lattice prefix")?;
let host_started = Arc::new(AtomicBool::new(false));
let wasmcloud_log_path = install_dir.join("wasmcloud.log");
let ctl_client = wasmcloud_opts.clone().into_ctl_client(None).await?;
if !cmd.wasmcloud_opts.multi_local
&& tokio::fs::try_exists(host_pid_file()?)
.await
.is_ok_and(|exists| exists)
{
let host_state = running_host_count(&ctl_client).await?;
if host_state == WasmCloudHostState::NotRunning {
eprintln!("🟨 Pid file {:?} exists but no hosts are running. Removing Pid file and proceeding with \"wash up\"",
host_pid_file()?);
tokio::fs::remove_file(host_pid_file()?).await?;
} else if host_state == WasmCloudHostState::MultipleRunning {
bail!("🟨 Multiple hosts are running. Please use --multi-local to start another");
} else {
spinner.finish_and_clear();
if let Some(ref manifest_path) = cmd.wadm_opts.wadm_manifest {
eprintln!("🟨 Wasmcloud host is already running. Deploying wadm manifest in detached mode.");
out_json.insert("deployed_wadm_manifest_path".into(), json!(manifest_path));
match process_wadm_manifest(
client.clone(),
lattice.clone(),
host_started.clone(),
host_state,
ctl_client,
manifest_path.clone(),
true,
)
.await
{
Ok(_) => out_text.push_str("Deployed wadm manifest"),
Err(e) => {
let _ = write!(out_text, "Deployment failed {}", e);
}
};
}
out_text.push_str("🛁 wash up completed successfully, already running");
out_json.insert("success".to_string(), json!(true));
let _ = write!(
out_text,
"\n🕸 NATS is running in the background at http://{nats_listen_address}"
);
let _ = write!(
out_text,
"\n📜 Logs for the host are being written to {}",
wasmcloud_log_path.to_string_lossy()
);
let _ = write!(out_text, "\n\n⬇️ To stop wasmCloud, run \"wash down\"");
return Ok(CommandOutput::new(out_text, out_json));
}
}
let wadm_process = if !cmd.wadm_opts.disable_wadm
&& !is_wadm_running(
&nats_host,
nats_port,
cmd.nats_opts.nats_credsfile.clone(),
&lattice,
)
.await
.unwrap_or(false)
{
spinner.update_spinner_message(" Starting wadm ...".to_string());
let config = WadmConfig {
structured_logging: wasmcloud_opts.enable_structured_logging,
js_domain: cmd.wadm_opts.wadm_js_domain.clone(),
nats_server_url: nats_listen_address.clone(),
nats_credsfile: cmd.nats_opts.nats_credsfile,
};
let wadm_log_path = install_dir.join("wadm.log");
let wadm_log_file = tokio::fs::File::create(&wadm_log_path)
.await?
.into_std()
.await;
let wadm_path = ensure_wadm(&cmd.wadm_opts.wadm_version, &install_dir).await;
match wadm_path {
Ok(path) => {
let wadm_child = start_wadm(&path, wadm_log_file, Some(config)).await;
if let Err(e) = &wadm_child {
eprintln!("🟨 Couldn't start wadm: {e}");
None
} else {
Some(wadm_child.unwrap())
}
}
Err(e) => {
let wadm_version: String = cmd.wadm_opts.wadm_version.clone();
eprintln!("🟨 Couldn't download wadm {wadm_version}: {e}");
if e.to_string().contains("Text file busy") {
eprintln!("🛟 Please ensure there aren't any leftover wadm processes");
}
None
}
}
} else {
None
};
let wasmcloud_executable = if !wasmcloud_opts.start_only {
spinner.update_spinner_message(" Downloading wasmCloud ...".to_string());
ensure_wasmcloud(&wasmcloud_opts.wasmcloud_version, &install_dir).await?
} else if let Some(wasmcloud_bin) =
find_wasmcloud_binary(&install_dir, &wasmcloud_opts.wasmcloud_version).await
{
wasmcloud_bin
} else {
if let Some(child) = wadm_process {
stop_wadm(child, &install_dir).await?;
}
if nats_bin.is_some() {
stop_nats(install_dir).await?;
}
bail!("wasmCloud was not installed, exiting without downloading as --wasmcloud-start-only was set");
};
spinner.update_spinner_message(" Starting wasmCloud ...".to_string());
let stderr: Stdio = if cmd.detached {
tokio::fs::File::create(&wasmcloud_log_path)
.await?
.into_std()
.await
.into()
} else {
Stdio::piped()
};
let version = wasmcloud_opts.wasmcloud_version;
let mut wasmcloud_child = match start_wasmcloud_host(
&wasmcloud_executable,
std::process::Stdio::null(),
stderr,
host_env,
)
.await
{
Ok(child) => child,
Err(e) => {
if let Some(child) = wadm_process {
stop_wadm(child, &install_dir).await?;
}
if nats_bin.is_some() {
stop_nats(install_dir).await?;
}
return Err(e);
}
};
spinner.finish_and_clear();
out_json.insert("success".to_string(), json!(true));
out_text.push_str("🛁 wash up completed successfully");
if let Some(ref manifest_path) = cmd.wadm_opts.wadm_manifest {
out_json.insert("deployed_wadm_manifest_path".into(), json!(manifest_path));
match process_wadm_manifest(
client.clone(),
lattice.clone(),
host_started.clone(),
WasmCloudHostState::NotRunning,
ctl_client,
manifest_path.clone(),
cmd.detached,
)
.await
{
Ok(_) => out_text.push_str("Deployed wadm manifest"),
Err(e) => {
let _ = write!(out_text, "Deployment failed {}", e);
}
};
}
let pid_file_contents = json!({
"version": version,
"pid": wasmcloud_child.id().unwrap()
});
tokio::fs::write(host_pid_file()?, pid_file_contents.to_string()).await?;
if cmd.detached {
out_json.insert("wasmcloud_log".to_string(), json!(wasmcloud_log_path));
out_json.insert("kill_cmd".to_string(), json!("wash down"));
out_json.insert("nats_url".to_string(), json!(nats_listen_address));
let _ = write!(
out_text,
"\n🕸 NATS is running in the background at http://{nats_listen_address}"
);
let _ = write!(
out_text,
"\n📜 Logs for the host are being written to {}",
wasmcloud_log_path.to_string_lossy()
);
let _ = write!(out_text, "\n\n⬇️ To stop wasmCloud, run \"wash down\"");
return Ok(CommandOutput::new(out_text, out_json));
}
run_wasmcloud_interactive(
&mut wasmcloud_child,
cmd.wadm_opts.wadm_manifest,
host_started.clone(),
output_kind,
)
.await?;
let spinner = Spinner::new(&output_kind)?;
spinner.update_spinner_message(
"CTRL+c received, stopping wasmCloud, wadm, and NATS...".to_string(),
);
stop_wasmcloud(wasmcloud_child).await?;
tokio::fs::remove_file(host_pid_file()?).await?;
if wadm_process.is_some() {
remove_wadm_pidfile(&install_dir).await?;
}
spinner.finish_and_clear();
Ok(CommandOutput::new(out_text, out_json))
}
async fn running_host_count(ctl_client: &CtlClient) -> Result<WasmCloudHostState> {
match ctl_client
.get_hosts()
.await
.map_err(|e| anyhow!(e))?
.into_iter()
.filter_map(|r| r.response)
.count()
{
1 => return Ok(WasmCloudHostState::Running),
2.. => return Ok(WasmCloudHostState::MultipleRunning),
_ => (),
}
let pid_file_string = tokio::fs::read_to_string(host_pid_file()?).await?;
let pid_file_value: Value = serde_json::from_str(&pid_file_string)?;
if let Some(pid) = pid_file_value.get("pid") {
if is_process_running(&pid.to_string()) {
return Ok(WasmCloudHostState::Starting);
}
}
Ok(WasmCloudHostState::NotRunning)
}
fn is_process_running(pid: &str) -> bool {
match pid.parse() {
Ok(pid) => {
let mut sys = System::new_all();
sys.refresh_processes();
sys.processes().get(&pid).is_some()
}
Err(_) => false,
}
}
#[allow(clippy::too_many_arguments)]
fn process_wadm_manifest(
client: async_nats_0_33::Client,
lattice: String,
host_started: Arc<AtomicBool>,
host_state: WasmCloudHostState,
ctl_client: CtlClient,
manifest_path: PathBuf,
detached: bool,
) -> tokio::task::JoinHandle<std::result::Result<(), anyhow::Error>> {
tokio::spawn(async move {
if detached && host_state < WasmCloudHostState::Running {
tokio::time::timeout(tokio::time::Duration::from_secs(3), async {
loop {
if let Ok(WasmCloudHostState::Running) = running_host_count(&ctl_client).await {
break;
}
}
})
.await
.context("failed to wait for host start while deploying WADM application")?;
} else if !detached {
while !host_started.load(Ordering::SeqCst) {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
};
let manifest = load_app_manifest(AppManifestSource::File(manifest_path.to_path_buf()))
.await
.with_context(|| {
format!(
"failed to load manifest from path [{}]",
manifest_path.display()
)
})?;
deploy_wadm_application(&client, manifest, lattice.as_ref())
.await
.with_context(|| {
format!(
"failed to deploy wadm application [{}]",
manifest_path.display()
)
})?;
Ok(()) as Result<()>
})
}
async fn deploy_wadm_application(
client: &async_nats_0_33::Client,
manifest: AppManifest,
lattice: &str,
) -> Result<()> {
let model_name = manifest.name().context("failed to find model name")?;
let _ = wash_lib::app::undeploy_model(client, Some(lattice.into()), model_name).await;
match deploy_model_from_manifest(client, Some(lattice.into()), manifest, None).await {
Err(e) if e.to_string().contains("already exists") => {}
Err(e) => bail!(e),
_ => {}
}
Ok(())
}
async fn start_nats(
install_dir: &Path,
nats_binary: &Path,
nats_config: NatsConfig,
) -> Result<Child> {
if let (Some(url), Some(creds)) = (
nats_config.remote_url.as_ref(),
nats_config.credentials.as_ref(),
) {
if let Err(e) = create_nats_client_from_opts(
url,
&nats_config.port.to_string(),
None,
None,
Some(creds.to_owned()),
None,
)
.await
{
bail!("Could not connect to leafnode remote: {}", e);
}
}
let nats_log_path = install_dir.join("nats.log");
let nats_log_file = tokio::fs::File::create(&nats_log_path)
.await?
.into_std()
.await;
let nats_process = start_nats_server(nats_binary, nats_log_file, nats_config).await?;
if let Some(pid) = nats_process.id() {
let pid_file = nats_pid_path(install_dir);
tokio::fs::write(&pid_file, pid.to_string()).await?;
}
Ok(nats_process)
}
async fn run_wasmcloud_interactive(
wasmcloud_child: &mut Child,
wadm_manifest: Option<PathBuf>,
host_started: Arc<AtomicBool>,
output_kind: OutputKind,
) -> Result<()> {
use std::sync::mpsc::channel;
let (running_sender, running_receiver) = channel();
let running = Arc::new(AtomicBool::new(true));
tokio::spawn(async move {
tokio::signal::ctrl_c()
.await
.context("failed to wait for ctrl_c signal")?;
if running.load(Ordering::SeqCst) {
running.store(false, Ordering::SeqCst);
let _ = running_sender.send(true);
} else {
warn!("\nRepeated CTRL+C received, killing wasmCloud and NATS. This may result in zombie processes")
}
Result::<_, anyhow::Error>::Ok(())
});
if output_kind != OutputKind::Json {
println!("🏃 Running in interactive mode.");
if let Some(ref manifest_path) = wadm_manifest {
println!(
"🚀 Deploying WADM manifest at [{}]",
manifest_path.display()
);
}
println!("🎛️ To start the dashboard, run `wash ui`");
println!("🚪 Press `CTRL+c` at any time to exit");
}
let handle = wasmcloud_child.stderr.take().map(|stderr| {
tokio::spawn(async {
let mut lines = BufReader::new(stderr).lines();
loop {
if let Ok(Some(line)) = lines.next_line().await {
println!("{line}");
}
}
})
});
host_started.store(true, Ordering::SeqCst);
let _ = running_receiver.recv();
if let Some(handle) = handle {
handle.abort()
};
Ok(())
}
#[cfg(unix)]
async fn stop_wasmcloud(mut wasmcloud_child: Child) -> Result<()> {
use nix::sys::signal::{kill, Signal};
use nix::unistd::Pid;
if let Some(pid) = wasmcloud_child.id() {
kill(Pid::from_raw(pid as i32), Signal::SIGTERM)?;
wasmcloud_child.wait().await?;
}
Ok(())
}
#[cfg(target_family = "windows")]
async fn stop_wasmcloud(mut wasmcloud_child: Child) -> Result<()> {
wasmcloud_child.kill().await?;
Ok(())
}
async fn is_wadm_running(
nats_host: &str,
nats_port: u16,
credsfile: Option<PathBuf>,
lattice: &str,
) -> Result<bool> {
let client = create_nats_client_from_opts(
nats_host,
&nats_port.to_string(),
None,
None,
credsfile,
None,
)
.await?;
Ok(
wash_lib::app::get_models(&client, Some(lattice.to_string()))
.await
.is_ok(),
)
}
async fn stop_wadm<P>(mut wadm: Child, install_dir: P) -> Result<()>
where
P: AsRef<Path>,
{
wadm.kill().await?;
remove_wadm_pidfile(install_dir).await
}
async fn remove_wadm_pidfile<P>(install_dir: P) -> Result<()>
where
P: AsRef<Path>,
{
if let Err(err) = tokio::fs::remove_file(install_dir.as_ref().join(WADM_PID)).await {
if err.kind() != ErrorKind::NotFound {
bail!(err);
}
}
Ok(())
}
async fn nats_client_from_wasmcloud_opts(wasmcloud_opts: &WasmcloudOpts) -> Result<Client> {
create_nats_client_from_opts(
&wasmcloud_opts
.ctl_host
.clone()
.unwrap_or_else(|| DEFAULT_NATS_HOST.to_string()),
&wasmcloud_opts
.ctl_port
.map(|port| port.to_string())
.unwrap_or_else(|| DEFAULT_NATS_PORT.to_string()),
wasmcloud_opts.ctl_jwt.clone(),
wasmcloud_opts.ctl_seed.clone(),
wasmcloud_opts.ctl_credsfile.clone(),
wasmcloud_opts.ctl_tls_ca_file.clone(),
)
.await
}
#[cfg(test)]
mod tests {
use super::UpCommand;
use anyhow::Result;
use clap::Parser;
const LOCAL_REGISTRY: &str = "localhost:5001";
#[test]
fn test_up_comprehensive() -> Result<()> {
const TESTDIR: &str = "./tests/fixtures";
let up_all_flags: UpCommand = Parser::try_parse_from([
"up",
"--allow-latest",
"--allowed-insecure",
LOCAL_REGISTRY,
"--cluster-issuers",
"CBZZ6BLE7PIJNCEJMXOHAJ65KIXRVXDA74W6LUKXC4EPFHTJREXQCOYI",
"--cluster-seed",
"SCAKLQ2FFT4LZUUVQMH6N37US3IZUEVJBUR3V532VV3DAAHSZXPQY6DYIM",
"--config-service-enabled",
"--ctl-credsfile",
TESTDIR,
"--ctl-host",
"127.0.0.2",
"--ctl-jwt",
"eyyjWT",
"--ctl-port",
"4232",
"--ctl-seed",
"SUALIKDKMIUAKRT5536EXKC3CX73TJD3CFXZMJSHIKSP3LTYIIUQGCUVGA",
"--ctl-tls",
"--enable-ipv6",
"--enable-structured-logging",
"--host-seed",
"SNAP4UVNHVWSBJ5MHAQ6M3RB23S3ALA3O3A4RF25G2FQB5CCZJBBBWCKBY",
"--detached",
"--nats-credsfile",
TESTDIR,
"--nats-host",
"127.0.0.2",
"--nats-js-domain",
"domain",
"--nats-port",
"4232",
"--nats-remote-url",
"tls://remote.global",
"--nats-version",
"v2.10.7",
"--provider-delay",
"500",
"--rpc-credsfile",
TESTDIR,
"--rpc-host",
"127.0.0.2",
"--rpc-jwt",
"eyyjWT",
"--rpc-port",
"4232",
"--rpc-seed",
"SUALIKDKMIUAKRT5536EXKC3CX73TJD3CFXZMJSHIKSP3LTYIIUQGCUVGA",
"--rpc-timeout-ms",
"500",
"--rpc-tls",
"--structured-log-level",
"warn",
"--wasmcloud-js-domain",
"domain",
"--wasmcloud-version",
"v0.57.1",
"--lattice",
"anotherprefix",
])?;
assert!(up_all_flags.wasmcloud_opts.allow_latest);
assert_eq!(
up_all_flags.wasmcloud_opts.allowed_insecure,
Some(vec![LOCAL_REGISTRY.to_string()])
);
assert_eq!(
up_all_flags.wasmcloud_opts.cluster_issuers,
Some(vec![
"CBZZ6BLE7PIJNCEJMXOHAJ65KIXRVXDA74W6LUKXC4EPFHTJREXQCOYI".to_string()
])
);
assert_eq!(
up_all_flags.wasmcloud_opts.cluster_seed,
Some("SCAKLQ2FFT4LZUUVQMH6N37US3IZUEVJBUR3V532VV3DAAHSZXPQY6DYIM".to_string())
);
assert!(up_all_flags.wasmcloud_opts.config_service_enabled);
assert!(!up_all_flags.nats_opts.connect_only);
assert!(up_all_flags.wasmcloud_opts.ctl_credsfile.is_some());
assert_eq!(
up_all_flags.wasmcloud_opts.ctl_host,
Some("127.0.0.2".to_string())
);
assert_eq!(
up_all_flags.wasmcloud_opts.ctl_jwt,
Some("eyyjWT".to_string())
);
assert_eq!(up_all_flags.wasmcloud_opts.ctl_port, Some(4232));
assert_eq!(
up_all_flags.wasmcloud_opts.ctl_seed,
Some("SUALIKDKMIUAKRT5536EXKC3CX73TJD3CFXZMJSHIKSP3LTYIIUQGCUVGA".to_string())
);
assert!(up_all_flags.wasmcloud_opts.ctl_tls);
assert!(up_all_flags.wasmcloud_opts.rpc_credsfile.is_some());
assert_eq!(
up_all_flags.wasmcloud_opts.rpc_host,
Some("127.0.0.2".to_string())
);
assert_eq!(
up_all_flags.wasmcloud_opts.rpc_jwt,
Some("eyyjWT".to_string())
);
assert_eq!(up_all_flags.wasmcloud_opts.rpc_port, Some(4232));
assert_eq!(
up_all_flags.wasmcloud_opts.rpc_seed,
Some("SUALIKDKMIUAKRT5536EXKC3CX73TJD3CFXZMJSHIKSP3LTYIIUQGCUVGA".to_string())
);
assert!(up_all_flags.wasmcloud_opts.rpc_tls);
assert!(up_all_flags.wasmcloud_opts.enable_ipv6);
assert!(up_all_flags.wasmcloud_opts.enable_structured_logging);
assert_eq!(
up_all_flags.wasmcloud_opts.host_seed,
Some("SNAP4UVNHVWSBJ5MHAQ6M3RB23S3ALA3O3A4RF25G2FQB5CCZJBBBWCKBY".to_string())
);
assert_eq!(
up_all_flags.wasmcloud_opts.structured_log_level,
"warn".to_string()
);
assert_eq!(
up_all_flags.wasmcloud_opts.wasmcloud_version,
"v0.57.1".to_string()
);
assert_eq!(
up_all_flags.wasmcloud_opts.lattice.unwrap(),
"anotherprefix".to_string()
);
assert_eq!(
up_all_flags.wasmcloud_opts.wasmcloud_js_domain,
Some("domain".to_string())
);
assert_eq!(up_all_flags.nats_opts.nats_version, "v2.10.7".to_string());
assert_eq!(
up_all_flags.nats_opts.nats_remote_url,
Some("tls://remote.global".to_string())
);
assert_eq!(up_all_flags.wasmcloud_opts.provider_delay, 500);
assert!(up_all_flags.detached);
Ok(())
}
#[test]
fn test_is_process_running() {
let current_pid = std::process::id().to_string();
assert!(
super::is_process_running(¤t_pid),
"Current process should be running"
);
let non_existent_pid = "-1";
assert!(
!super::is_process_running(non_existent_pid),
"Non-existent process should not be running"
);
let invalid_pid = "wasmcloud";
assert!(
!super::is_process_running(invalid_pid),
"Invalid pid should not be running"
);
}
}