use std::str::FromStr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::{path::PathBuf, sync::Arc};
use anyhow::{anyhow, bail, Context, Result};
use clap::Parser;
use console::style;
use notify::{event::EventKind, Event as NotifyEvent, RecursiveMode, Watcher};
use tokio::task::JoinHandle;
use tokio::time::{timeout, Duration};
use tokio::{select, sync::mpsc};
use wash_lib::generate::emoji;
use wash_lib::{
actor::{scale_actor, start_actor, StartActorArgs},
build::{build_project, SignConfig},
cli::dev::run_dev_loop,
cli::CommandOutput,
config::downloads_dir,
id::{ModuleId, ServerId},
parser::get_config,
};
use wasmcloud_control_interface::Host;
use crate::{
down::{handle_down, DownCommand},
up::{handle_up, NatsOpts, UpCommand, WadmOpts, WasmcloudOpts, DOWNLOADS_DIR},
};
#[derive(Debug, Clone, Parser)]
pub struct DevCommand {
#[clap(flatten)]
pub(crate) nats_opts: NatsOpts,
#[clap(flatten)]
pub(crate) wasmcloud_opts: WasmcloudOpts,
#[clap(flatten)]
pub(crate) wadm_opts: WadmOpts,
#[clap(long = "host-id", name = "host-id", value_parser)]
pub host_id: Option<ServerId>,
#[clap(name = "code-dir", long = "work-dir", env = "WASH_DEV_CODE_DIR")]
pub code_dir: Option<PathBuf>,
#[clap(
name = "leave-host-running",
long = "leave-host-running",
env = "WASH_DEV_LEAVE_HOST_RUNNING",
default_value = "false",
help = "Leave the wasmCloud host running after stopping the devloop"
)]
pub leave_host_running: bool,
#[clap(
name = "use-host-subprocess",
long = "use-host-subprocess",
env = "WASH_DEV_USE_HOST_SUBPROCESS",
default_value = "false",
help = "Run the wasmCloud host in a subprocess (rather than detached mode)"
)]
pub use_host_subprocess: bool,
}
struct HostSubprocess(Option<JoinHandle<()>>);
impl HostSubprocess {
fn into_inner(mut self) -> Option<JoinHandle<()>> {
self.0.take()
}
}
impl Drop for HostSubprocess {
fn drop(&mut self) {
if let Some(handle) = self.0.take() {
handle.abort();
}
}
}
pub async fn handle_command(
cmd: DevCommand,
output_kind: wash_lib::cli::OutputKind,
) -> Result<CommandOutput> {
let pid_file = downloads_dir()?.join(DOWNLOADS_DIR);
let existing_instance = tokio::fs::metadata(pid_file).await.is_ok();
let mut host_subprocess: Option<HostSubprocess> = None;
if !existing_instance {
eprintln!(
"{} {}{}",
emoji::WARN,
style("No running wasmcloud host detected (PID file missing), ").bold(),
style("starting a new host...").bold()
);
let mut wasmcloud_opts = cmd.wasmcloud_opts.clone();
wasmcloud_opts.allow_file_load = Some(true);
if cmd.use_host_subprocess {
eprintln!(
"{} {}",
emoji::WRENCH,
style("starting wasmCloud host subprocess...").bold(),
);
let nats_opts = cmd.nats_opts.clone();
let wadm_opts = cmd.wadm_opts.clone();
host_subprocess = Some(HostSubprocess(Some(tokio::spawn(async move {
let _ = handle_up(
UpCommand {
detached: false,
nats_opts,
wasmcloud_opts,
wadm_opts,
},
output_kind,
)
.await;
eprintln!(
"{} {}",
emoji::WRENCH,
style("shutting down host subprocess...").bold(),
);
}))));
tokio::time::sleep(Duration::from_secs(5)).await;
} else {
let _ = handle_up(
UpCommand {
detached: true,
nats_opts: cmd.nats_opts,
wasmcloud_opts,
wadm_opts: cmd.wadm_opts,
},
output_kind,
)
.await?;
}
eprintln!(
"{} {}",
emoji::WRENCH,
style("Successfully started wasmCloud instance").bold(),
);
}
let ctl_client = Arc::new(
cmd.wasmcloud_opts
.into_ctl_client(None)
.await
.context("failed to create wasmcloud control client")?,
);
let wait_ctl_client = ctl_client.clone();
if !existing_instance {
eprintln!("⏳ ");
eprintln!(
"{} {}",
emoji::HOURGLASS_DRAINING,
style("Waiting for host to become reachable...").bold(),
);
let _ = timeout(
Duration::from_secs(60),
tokio::spawn(async move {
loop {
match wait_ctl_client.get_hosts().await {
Ok(hs) => match &hs[..] {
[] => {}
[h] => {
eprintln!(
"{} {}",
emoji::GREEN_CHECK,
style(format!("Found single host w/ ID [{}]", h.id)).bold(),
);
break Ok(());
}
_hs => {
bail!("Detected an unexpected number (>1) of hosts present.");
}
},
Err(e) => {
eprintln!(
"{} {}",
emoji::WARN,
style(format!("Failed to get hosts (will retry in 5s): {e}"))
.bold(),
);
}
}
tokio::time::sleep(Duration::from_secs(5)).await;
}
}),
)
.await
.context("wasmCloud host did not become reachable")?;
}
let hosts = ctl_client
.get_hosts()
.await
.or_else(|e| bail!("failed to retrieve hosts from lattice: {e}"))?;
let host: Host = match &hosts[..] {
[] => bail!("0 hosts detected, is wasmCloud running?"),
[h] => h.clone(),
_ => {
if let Some(host_id) = cmd.host_id.map(ServerId::into_string) {
hosts
.into_iter()
.find(|h| h.id == host_id)
.ok_or_else(|| anyhow!("failed to find host [{host_id}]"))?
} else {
bail!(
"{} hosts detected, please specify the host on which to deploy with --host-id",
hosts.len()
)
}
}
};
let current_dir = std::env::current_dir()?;
let project_path = cmd.code_dir.unwrap_or(current_dir);
let project_cfg = get_config(Some(project_path.clone()), Some(true))?;
let sign_cfg: Option<SignConfig> = Some(SignConfig {
keys_directory: None,
issuer: None,
subject: None,
disable_keygen: false,
});
eprintln!(
"{} {}",
emoji::CONSTRUCTION_BARRIER,
style("Starting project build").bold(),
);
let artifact_path = build_project(&project_cfg, sign_cfg.clone())?.canonicalize()?;
eprintln!(
"✅ successfully built project at [{}]",
artifact_path.display()
);
let actor_ref = format!("file://{}", artifact_path.display());
let actor_id;
let inventory = ctl_client.get_host_inventory(&host.id).await.or_else(|e| {
bail!(
"failed to retrieve host inventory for host [{}]: {e}",
&host.id
)
})?;
if let Some(existing_actor) = inventory
.actors
.into_iter()
.find(|a| a.image_ref == Some(actor_ref.clone()))
{
actor_id = existing_actor.id;
scale_actor(&ctl_client, &host.id, &actor_ref, &actor_id, 1, None).await?;
} else {
actor_id = start_actor(StartActorArgs {
ctl_client: &ctl_client,
host_id: &host.id,
actor_ref: &actor_ref,
count: 1,
skip_wait: false,
timeout_ms: None,
})
.await?
.actor_id
.ok_or_else(|| anyhow!("failed to do thing"))?;
}
let (stop_tx, mut stop_rx) = mpsc::channel::<()>(1);
let (reload_tx, mut reload_rx) = mpsc::channel::<()>(1);
tokio::spawn(async move {
tokio::signal::ctrl_c()
.await
.context("failed to wait for ctrl_c signal")?;
stop_tx
.send(())
.await
.context("failed to send stop signal after receiving Ctrl + c")?;
Result::<_, anyhow::Error>::Ok(())
});
let pause_watch = Arc::new(AtomicBool::new(false));
let watcher_paused = pause_watch.clone();
let mut watcher = notify::recommended_watcher(move |res: _| match res {
Ok(event) => match event {
NotifyEvent {
kind: EventKind::Create(_),
..
}
| NotifyEvent {
kind: EventKind::Modify(_),
..
}
| NotifyEvent {
kind: EventKind::Remove(_),
..
} => {
if watcher_paused.load(Ordering::SeqCst) {
return;
}
let _ = reload_tx.blocking_send(());
}
_ => {}
},
Err(e) => {
eprintln!("[error] watch failed: {:?}", e);
}
})?;
watcher.watch(&project_path.clone(), RecursiveMode::Recursive)?;
eprintln!("👀 watching for file changes (press Ctrl+c to stop)...");
loop {
select! {
_ = reload_rx.recv() => {
pause_watch.store(true, Ordering::SeqCst);
run_dev_loop(&project_cfg, ModuleId::from_str(&actor_id)?, &actor_ref, ServerId::from_str(&host.id)?, &ctl_client, sign_cfg.clone()).await?;
pause_watch.store(false, Ordering::SeqCst);
eprintln!("👀 watching for file changes (press Ctrl+c to stop)...");
},
_ = stop_rx.recv() => {
pause_watch.store(true, Ordering::SeqCst);
eprintln!("🛑 received Ctrl + c, stopping devloop...");
if !cmd.leave_host_running {
eprintln!("⏳ stopping wasmCloud instance...");
handle_down(DownCommand::default(), output_kind).await?;
if let Some(handle) = host_subprocess.and_then(|hs| hs.into_inner()) {
handle.await?;
}
}
break Ok(CommandOutput::default());
},
}
}
}