use std::path::PathBuf;
use std::str::FromStr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use anyhow::{bail, Context as _, Result};
use clap::Parser;
use notify::event::ModifyKind;
use notify::{event::EventKind, Event as NotifyEvent, RecursiveMode, Watcher};
use semver::Version;
use session::{SessionMetadata, WashDevSession};
use tokio::{select, sync::mpsc};
use tracing::trace;
use wash_lib::cli::{CommandOutput, CommonPackageArgs};
use wash_lib::generate::emoji;
use wash_lib::id::ServerId;
use wash_lib::parser::load_config;
use crate::cmd::up::{
nats_client_from_wasmcloud_opts, remove_wadm_pidfile, NatsOpts, WadmOpts, WasmcloudOpts,
};
mod deps;
mod devloop;
mod manifest;
mod session;
mod wit;
const DEFAULT_KEYVALUE_PROVIDER_IMAGE: &str = "ghcr.io/wasmcloud/keyvalue-nats:0.3.1";
const DEFAULT_HTTP_CLIENT_PROVIDER_IMAGE: &str = "ghcr.io/wasmcloud/http-client:0.12.1";
const DEFAULT_HTTP_SERVER_PROVIDER_IMAGE: &str = "ghcr.io/wasmcloud/http-server:0.24.0";
const DEFAULT_BLOBSTORE_FS_PROVIDER_IMAGE: &str = "ghcr.io/wasmcloud/blobstore-fs:0.10.1";
const DEFAULT_MESSAGING_NATS_PROVIDER_IMAGE: &str = "ghcr.io/wasmcloud/messaging-nats:0.23.1";
const DEFAULT_INCOMING_HANDLER_ADDRESS: &str = "127.0.0.1:8000";
const DEFAULT_MESSAGING_HANDLER_SUBSCRIPTION: &str = "wasmcloud.dev";
const DEFAULT_BLOBSTORE_ROOT_DIR: &str = "/tmp";
const DEFAULT_KEYVALUE_BUCKET: &str = "wasmcloud";
const WASH_SESSIONS_FILE_NAME: &str = "wash-dev-sessions.json";
const SESSIONS_FILE_VERSION: Version = Version::new(0, 1, 0);
const SESSION_ID_LEN: usize = 6;
const DEFAULT_PROVIDER_STOP_TIMEOUT_MS: u64 = 3000;
async fn dev_dir() -> Result<PathBuf> {
let dir = wash_lib::config::dev_dir().context("failed to resolve config dir")?;
if !tokio::fs::try_exists(&dir)
.await
.context("failed to check if dev dir exists")?
{
tokio::fs::create_dir(&dir)
.await
.with_context(|| format!("failed to create dir [{}]", dir.display()))?
}
Ok(dir)
}
async fn sessions_file_path() -> Result<PathBuf> {
dev_dir()
.await
.map(|p| p.join(WASH_SESSIONS_FILE_NAME))
.context("failed to get dev dir")
}
#[derive(Debug, Clone, Parser)]
pub struct DevCommand {
#[clap(flatten)]
pub nats_opts: NatsOpts,
#[clap(flatten)]
pub wasmcloud_opts: WasmcloudOpts,
#[clap(flatten)]
pub wadm_opts: WadmOpts,
#[clap(flatten)]
pub package_args: CommonPackageArgs,
#[clap(long = "host-id", name = "host-id", value_parser)]
pub host_id: Option<ServerId>,
#[clap(
name = "code-dir",
short = 'd',
long = "work-dir",
env = "WASH_DEV_CODE_DIR"
)]
pub code_dir: Option<PathBuf>,
#[clap(name = "ignore-dir", short = 'i', long = "ignore-dir")]
pub ignore_dirs: Vec<PathBuf>,
#[clap(
name = "leave-host-running",
long = "leave-host-running",
env = "WASH_DEV_LEAVE_HOST_RUNNING",
default_value = "false",
help = "Leave the wasmCloud host running after stopping the devloop"
)]
pub leave_host_running: bool,
#[clap(long = "manifest-output-dir", env = "WASH_DEV_MANIFEST_OUTPUT_DIR")]
pub manifest_output_dir: Option<PathBuf>,
#[clap(long = "skip-fetch")]
pub skip_wit_fetch: bool,
}
pub async fn handle_command(
cmd: DevCommand,
output_kind: wash_lib::cli::OutputKind,
) -> Result<CommandOutput> {
let current_dir =
std::env::current_dir().context("failed to get current directory for wash dev")?;
let project_path = cmd.code_dir.unwrap_or(current_dir);
let mut project_cfg = load_config(Some(project_path.clone()), Some(true)).await?;
let mut wash_dev_session = WashDevSession::from_sessions_file(&project_path)
.await
.context("failed to build wash dev session")?;
let session_id = wash_dev_session.id.clone();
eprintln!(
"{} Resolved wash session ID [{session_id}]",
emoji::INFO_SQUARE
);
let ctl_client = cmd.wasmcloud_opts.clone().into_ctl_client(None).await;
let host_id = match ctl_client {
Ok(ref ctl_client) => match ctl_client.get_hosts().await.as_ref().map(|r| r.as_slice()) {
Ok([]) | Err(_) if cmd.host_id.is_none() => {
eprintln!(
"{} No running hosts found, will start one...",
emoji::INFO_SQUARE
);
None
}
Ok([]) | Err(_) => {
bail!("host ID specified but no running hosts found");
}
Ok([host]) if host.data().is_some() => {
Some(
ServerId::from_str(host.data().unwrap().id())
.map_err(|e| anyhow::anyhow!("failed to parse host ID: {e}"))?,
)
}
Ok(hosts) if cmd.host_id.is_some() => {
let host_id = cmd.host_id.unwrap();
if let Some(_host) = hosts
.iter()
.find(|h| h.data().map(|d| d.id()).is_some_and(|id| *id == *host_id))
{
Some(host_id)
} else {
bail!("specified host ID '{host_id}' not found in running hosts");
}
}
Ok(hosts) => {
bail!(
"found multiple running hosts, please specify a host ID with --host-id. Eligible hosts: [{:?}]",
hosts
.iter()
.filter_map(|h| h.data().map(|d| d.id()))
.collect::<Vec<&str>>()
.join(", ")
);
}
},
Err(_) if cmd.host_id.is_some() => bail!("host ID specified but could not connect to control interface, ensure host and NATS is running or omit host ID"),
Err(_) => None,
};
let (mut nats_child, mut wadm_child, mut wasmcloud_child) = (None, None, None);
if wash_dev_session.host_data.is_none() {
(nats_child, wadm_child, wasmcloud_child) = wash_dev_session
.start_host(
cmd.wasmcloud_opts.clone(),
cmd.nats_opts.clone(),
cmd.wadm_opts.clone(),
host_id,
)
.await
.with_context(|| format!("failed to start host for session [{session_id}]"))?;
}
let host_id = wash_dev_session
.host_data
.clone()
.context("missing host_id, after ensuring host has started")?
.0;
let nats_client = nats_client_from_wasmcloud_opts(&cmd.wasmcloud_opts).await?;
let ctl_client = if let Ok(ctl_client) = ctl_client {
ctl_client
} else {
cmd.wasmcloud_opts
.clone()
.into_ctl_client(None)
.await
.context("failed to create control interface client")?
};
let lattice = ctl_client.lattice();
let mut run_loop_state = devloop::RunLoopState {
dev_session: &mut wash_dev_session,
nats_client: &nats_client,
ctl_client: &ctl_client,
project_cfg: &mut project_cfg,
lattice,
session_id: &session_id,
manifest_output_dir: cmd.manifest_output_dir.as_ref(),
previous_deps: None,
artifact_path: None,
component_id: None,
component_ref: None,
package_args: &cmd.package_args,
skip_fetch: cmd.skip_wit_fetch,
output_kind,
};
if let Err(_e) = ctl_client.get_host_inventory(&host_id).await {
eprintln!(
"{} Failed to retrieve inventory from host [{host_id}]... Exiting developer loop",
emoji::ERROR
);
eprintln!(
"{} Try running `wash down --all` to stop all running wasmCloud instances, then run `wash dev` again",
emoji::ERROR
);
if let Err(e) = stop_dev_session(
run_loop_state,
&ctl_client,
wasmcloud_child,
wadm_child,
nats_child,
cmd.leave_host_running,
)
.await
{
eprintln!(
"{} Failed to cleanup incomplete dev session: {e}",
emoji::ERROR
);
}
bail!("failed to initialize dev session, host did not start.");
}
let (stop_tx, mut stop_rx) = mpsc::channel::<()>(1);
let (reload_tx, mut reload_rx) = mpsc::channel::<()>(1);
tokio::spawn(async move {
tokio::signal::ctrl_c()
.await
.context("failed to wait for ctrl_c signal")?;
stop_tx
.send(())
.await
.context("failed to send stop signal after receiving Ctrl + c")?;
Result::<_, anyhow::Error>::Ok(())
});
let pause_watch = Arc::new(AtomicBool::new(true));
let watcher_paused = pause_watch.clone();
let project_path_notify = project_path.clone();
let mut watcher = notify::recommended_watcher(move |res: _| match res {
Ok(event) => match event {
NotifyEvent {
kind: EventKind::Create(_),
paths,
..
}
| NotifyEvent {
kind: EventKind::Modify(ModifyKind::Data(_)),
paths,
..
}
| NotifyEvent {
kind: EventKind::Remove(_),
paths,
..
} => {
if paths.iter().any(|p| {
p.strip_prefix(project_path_notify.as_path())
.is_ok_and(|p| cmd.ignore_dirs.iter().any(|ignore| p.starts_with(ignore)))
}) {
return;
}
if watcher_paused.load(Ordering::SeqCst) {
return;
}
trace!("file event triggered dev loop: {paths:?}");
let _ = reload_tx.try_send(());
}
_ => {}
},
Err(e) => {
eprintln!("{} Watch failed: {:?}", emoji::ERROR, e);
}
})?;
watcher.watch(&project_path.clone(), RecursiveMode::Recursive)?;
if let Err(e) = devloop::run(&mut run_loop_state).await {
eprintln!(
"{} Failed to run first dev loop iteration, will retry: {e}",
emoji::WARN
);
}
pause_watch.store(false, Ordering::SeqCst);
let _ = reload_rx.try_recv();
eprintln!(
"{} Watching for file changes (press Ctrl+c to stop)...",
emoji::EYES
);
loop {
select! {
_ = reload_rx.recv() => {
pause_watch.store(true, Ordering::SeqCst);
devloop::run(&mut run_loop_state)
.await
.context("failed to run dev loop iteration")?;
eprintln!("\n{} Watching for file changes (press Ctrl+c to stop)...", emoji::EYES);
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let _ = reload_rx.try_recv();
pause_watch.store(false, Ordering::SeqCst);
},
_ = stop_rx.recv() => {
pause_watch.store(true, Ordering::SeqCst);
eprintln!("\n{} Received Ctrl + c, stopping devloop...", emoji::STOP);
stop_dev_session(run_loop_state, &ctl_client, wasmcloud_child, wadm_child, nats_child, cmd.leave_host_running).await?;
break Ok(CommandOutput::from_key_and_text(
"result",
format!(
"{} Dev session [{session_id}] exited successfully.",
emoji::GREEN_CHECK,
),
));
},
}
}
}
async fn stop_dev_session(
run_loop_state: devloop::RunLoopState<'_>,
ctl_client: &wasmcloud_control_interface::Client,
wasmcloud_child: Option<tokio::process::Child>,
wadm_child: Option<tokio::process::Child>,
nats_child: Option<tokio::process::Child>,
leave_host_running: bool,
) -> Result<()> {
run_loop_state.dev_session.in_use = false;
SessionMetadata::persist_session(run_loop_state.dev_session).await?;
if let Some(dependencies) = run_loop_state.previous_deps {
eprintln!(
"{} Cleaning up deployed wasmCloud application(s)...",
emoji::BROOM
);
dependencies
.delete_manifests(&ctl_client.nats_client(), ctl_client.lattice())
.await?;
}
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
if !leave_host_running {
eprintln!(
"{} Stopping wasmCloud instance...",
emoji::HOURGLASS_DRAINING
);
if let Some((ref host_id, _log_file)) = run_loop_state.dev_session.host_data.as_ref() {
let receiver = ctl_client
.events_receiver(vec!["host_stopped".to_string()])
.await;
if let Err(e) = ctl_client.stop_host(host_id, Some(2000)).await {
eprintln!(
"{} Failed to stop host through control interface: {e}",
emoji::WARN
);
}
if let Ok(mut receiver) = receiver {
if tokio::time::timeout(std::time::Duration::from_secs(2), receiver.recv())
.await
.is_err()
{
eprintln!(
"{} Did not receive host_stopped event, host may have exited early",
emoji::WARN
);
}
}
}
if let Some(mut host) = wasmcloud_child {
if tokio::time::timeout(std::time::Duration::from_secs(5), host.wait())
.await
.context("failed to wait for wasmcloud process to stop, forcefully terminating")
.is_err()
{
eprintln!(
"{} Terminating host forcefully, this may leave provider processes running",
emoji::WARN
);
host.kill()
.await
.context("failed to stop wasmcloud process")?;
}
}
if let Some(mut wadm) = wadm_child {
eprintln!("{} Stopping wadm...", emoji::HOURGLASS_DRAINING);
wadm.kill()
.await
.context("failed to stop wadm child process")?;
remove_wadm_pidfile(run_loop_state.dev_session.base_dir().await?)
.await
.context("failed to remove wadm pidfile")?;
}
if let Some(mut nats) = nats_child {
eprintln!("{} Stopping NATS...", emoji::HOURGLASS_DRAINING);
nats.kill().await?;
}
}
Ok(())
}