wash_cli/cmd/dev/
mod.rs

1use std::path::PathBuf;
2use std::str::FromStr;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::Arc;
5
6use anyhow::{bail, Context as _, Result};
7use clap::Parser;
8use notify::event::ModifyKind;
9use notify::{event::EventKind, Event as NotifyEvent, RecursiveMode, Watcher};
10use semver::Version;
11use session::{SessionMetadata, WashDevSession};
12use tokio::{select, sync::mpsc};
13
14use tracing::trace;
15use wash_lib::cli::{CommandOutput, CommonPackageArgs};
16use wash_lib::generate::emoji;
17use wash_lib::id::ServerId;
18use wash_lib::parser::load_config;
19
20use crate::cmd::up::{
21    nats_client_from_wasmcloud_opts, remove_wadm_pidfile, NatsOpts, WadmOpts, WasmcloudOpts,
22};
23
24mod deps;
25mod devloop;
26mod manifest;
27mod session;
28mod wit;
29
30const DEFAULT_KEYVALUE_PROVIDER_IMAGE: &str = "ghcr.io/wasmcloud/keyvalue-nats:0.3.1";
31const DEFAULT_HTTP_CLIENT_PROVIDER_IMAGE: &str = "ghcr.io/wasmcloud/http-client:0.12.1";
32const DEFAULT_HTTP_SERVER_PROVIDER_IMAGE: &str = "ghcr.io/wasmcloud/http-server:0.24.0";
33const DEFAULT_BLOBSTORE_FS_PROVIDER_IMAGE: &str = "ghcr.io/wasmcloud/blobstore-fs:0.10.1";
34const DEFAULT_MESSAGING_NATS_PROVIDER_IMAGE: &str = "ghcr.io/wasmcloud/messaging-nats:0.23.1";
35
36const DEFAULT_INCOMING_HANDLER_ADDRESS: &str = "127.0.0.1:8000";
37const DEFAULT_MESSAGING_HANDLER_SUBSCRIPTION: &str = "wasmcloud.dev";
38const DEFAULT_BLOBSTORE_ROOT_DIR: &str = "/tmp";
39const DEFAULT_KEYVALUE_BUCKET: &str = "wasmcloud";
40
41const WASH_SESSIONS_FILE_NAME: &str = "wash-dev-sessions.json";
42
43const SESSIONS_FILE_VERSION: Version = Version::new(0, 1, 0);
44const SESSION_ID_LEN: usize = 6;
45
46const DEFAULT_PROVIDER_STOP_TIMEOUT_MS: u64 = 3000;
47
48/// The path to the dev directory for wash
49async fn dev_dir() -> Result<PathBuf> {
50    let dir = wash_lib::config::dev_dir().context("failed to resolve config dir")?;
51    if !tokio::fs::try_exists(&dir)
52        .await
53        .context("failed to check if dev dir exists")?
54    {
55        tokio::fs::create_dir(&dir)
56            .await
57            .with_context(|| format!("failed to create dir [{}]", dir.display()))?
58    }
59    Ok(dir)
60}
61
62/// Retrieve the path to the file that stores
63async fn sessions_file_path() -> Result<PathBuf> {
64    dev_dir()
65        .await
66        .map(|p| p.join(WASH_SESSIONS_FILE_NAME))
67        .context("failed to get dev dir")
68}
69
70#[derive(Debug, Clone, Parser)]
71pub struct DevCommand {
72    #[clap(flatten)]
73    pub nats_opts: NatsOpts,
74
75    #[clap(flatten)]
76    pub wasmcloud_opts: WasmcloudOpts,
77
78    #[clap(flatten)]
79    pub wadm_opts: WadmOpts,
80
81    #[clap(flatten)]
82    pub package_args: CommonPackageArgs,
83
84    /// ID of the host to use for `wash dev`
85    /// if one is not selected, `wash dev` will attempt to use the single host in the lattice
86    #[clap(long = "host-id", name = "host-id", value_parser)]
87    pub host_id: Option<ServerId>,
88
89    /// Path to code directory
90    #[clap(
91        name = "code-dir",
92        short = 'd',
93        long = "work-dir",
94        env = "WASH_DEV_CODE_DIR"
95    )]
96    pub code_dir: Option<PathBuf>,
97
98    /// Directories to ignore when watching for changes. This should be set
99    /// to directories where generated files are placed, such as `target/` or `dist/`.
100    /// Can be specified multiple times.
101    #[clap(name = "ignore-dir", short = 'i', long = "ignore-dir")]
102    pub ignore_dirs: Vec<PathBuf>,
103
104    /// Whether to leave the host running after dev
105    #[clap(
106        name = "leave-host-running",
107        long = "leave-host-running",
108        env = "WASH_DEV_LEAVE_HOST_RUNNING",
109        default_value = "false",
110        help = "Leave the wasmCloud host running after stopping the devloop"
111    )]
112    pub leave_host_running: bool,
113
114    /// Write generated WADM manifest(s) to a given folder (every time they are generated)
115    #[clap(long = "manifest-output-dir", env = "WASH_DEV_MANIFEST_OUTPUT_DIR")]
116    pub manifest_output_dir: Option<PathBuf>,
117
118    /// Skip wit dependency fetching and use only what is currently present in the wit directory
119    /// (useful for airgapped or disconnected environments)
120    #[clap(long = "skip-fetch")]
121    pub skip_wit_fetch: bool,
122}
123
124/// Handle `wash dev`
125pub async fn handle_command(
126    cmd: DevCommand,
127    output_kind: wash_lib::cli::OutputKind,
128) -> Result<CommandOutput> {
129    let current_dir =
130        std::env::current_dir().context("failed to get current directory for wash dev")?;
131    let project_path = cmd.code_dir.unwrap_or(current_dir);
132    let mut project_cfg = load_config(Some(project_path.clone()), Some(true)).await?;
133
134    let mut wash_dev_session = WashDevSession::from_sessions_file(&project_path)
135        .await
136        .context("failed to build wash dev session")?;
137    let session_id = wash_dev_session.id.clone();
138    eprintln!(
139        "{} Resolved wash session ID [{session_id}]",
140        emoji::INFO_SQUARE
141    );
142
143    // Create NATS and control interface client to use to connect
144    let ctl_client = cmd.wasmcloud_opts.clone().into_ctl_client(None).await;
145    let host_id = match ctl_client {
146        Ok(ref ctl_client) => match ctl_client.get_hosts().await.as_ref().map(|r| r.as_slice()) {
147            // Failing to get hosts is acceptable if none are supposed to be running, or if NATS is not running
148            Ok([]) | Err(_) if cmd.host_id.is_none() => {
149                eprintln!(
150                    "{} No running hosts found, will start one...",
151                    emoji::INFO_SQUARE
152                );
153                None
154            }
155            Ok([]) | Err(_) => {
156                bail!("host ID specified but no running hosts found");
157            }
158            Ok([host]) if host.data().is_some() => {
159                // SAFETY: We know that the host exists and has data
160                Some(
161                    ServerId::from_str(host.data().unwrap().id())
162                        .map_err(|e| anyhow::anyhow!("failed to parse host ID: {e}"))?,
163                )
164            }
165            Ok(hosts) if cmd.host_id.is_some() => {
166                // SAFETY: We know that the host ID is Some as checked above
167                let host_id = cmd.host_id.unwrap();
168                if let Some(_host) = hosts
169                    .iter()
170                    .find(|h| h.data().map(|d| d.id()).is_some_and(|id| *id == *host_id))
171                {
172                    Some(host_id)
173                } else {
174                    bail!("specified host ID '{host_id}' not found in running hosts");
175                }
176            }
177            Ok(hosts) => {
178                bail!(
179                    "found multiple running hosts, please specify a host ID with --host-id. Eligible hosts: [{:?}]",
180                    hosts
181                        .iter()
182                        .filter_map(|h| h.data().map(|d| d.id()))
183                        .collect::<Vec<&str>>()
184                        .join(", ")
185                );
186            }
187        },
188        Err(_) if cmd.host_id.is_some() => bail!("host ID specified but could not connect to control interface, ensure host and NATS is running or omit host ID"),
189        Err(_) => None,
190    };
191
192    let (mut nats_child, mut wadm_child, mut wasmcloud_child) = (None, None, None);
193
194    // If there is not a running host for this session, then we can start one
195    if wash_dev_session.host_data.is_none() {
196        (nats_child, wadm_child, wasmcloud_child) = wash_dev_session
197            .start_host(
198                cmd.wasmcloud_opts.clone(),
199                cmd.nats_opts.clone(),
200                cmd.wadm_opts.clone(),
201                host_id,
202            )
203            .await
204            .with_context(|| format!("failed to start host for session [{session_id}]"))?;
205    }
206    let host_id = wash_dev_session
207        .host_data
208        .clone()
209        .context("missing host_id, after ensuring host has started")?
210        .0;
211
212    let nats_client = nats_client_from_wasmcloud_opts(&cmd.wasmcloud_opts).await?;
213    // If we failed to connect to the control interface earlier, we can retry now that NATS is up
214    let ctl_client = if let Ok(ctl_client) = ctl_client {
215        ctl_client
216    } else {
217        cmd.wasmcloud_opts
218            .clone()
219            .into_ctl_client(None)
220            .await
221            .context("failed to create control interface client")?
222    };
223    let lattice = ctl_client.lattice();
224
225    // Build state for the run loop
226    let mut run_loop_state = devloop::RunLoopState {
227        dev_session: &mut wash_dev_session,
228        nats_client: &nats_client,
229        ctl_client: &ctl_client,
230        project_cfg: &mut project_cfg,
231        lattice,
232        session_id: &session_id,
233        manifest_output_dir: cmd.manifest_output_dir.as_ref(),
234        previous_deps: None,
235        artifact_path: None,
236        component_id: None,
237        component_ref: None,
238        package_args: &cmd.package_args,
239        skip_fetch: cmd.skip_wit_fetch,
240        output_kind,
241    };
242
243    // See if the host is running by retrieving an inventory
244    if let Err(_e) = ctl_client.get_host_inventory(&host_id).await {
245        eprintln!(
246            "{} Failed to retrieve inventory from host [{host_id}]... Exiting developer loop",
247            emoji::ERROR
248        );
249        eprintln!(
250            "{} Try running `wash down --all` to stop all running wasmCloud instances, then run `wash dev` again",
251            emoji::ERROR
252        );
253        if let Err(e) = stop_dev_session(
254            run_loop_state,
255            &ctl_client,
256            wasmcloud_child,
257            wadm_child,
258            nats_child,
259            cmd.leave_host_running,
260        )
261        .await
262        {
263            eprintln!(
264                "{} Failed to cleanup incomplete dev session: {e}",
265                emoji::ERROR
266            );
267        }
268
269        bail!("failed to initialize dev session, host did not start.");
270    }
271
272    // Set up a oneshot channel to perform graceful shutdown, handle Ctrl + c w/ tokio
273    let (stop_tx, mut stop_rx) = mpsc::channel::<()>(1);
274    let (reload_tx, mut reload_rx) = mpsc::channel::<()>(1);
275    tokio::spawn(async move {
276        tokio::signal::ctrl_c()
277            .await
278            .context("failed to wait for ctrl_c signal")?;
279        stop_tx
280            .send(())
281            .await
282            .context("failed to send stop signal after receiving Ctrl + c")?;
283        Result::<_, anyhow::Error>::Ok(())
284    });
285
286    // Enable/disable watching to prevent having the output artifact trigger a rebuild
287    // This starts as true to prevent a rebuild on the first run
288    let pause_watch = Arc::new(AtomicBool::new(true));
289    let watcher_paused = pause_watch.clone();
290
291    // Spawn a file watcher to listen for changes and send on reload_tx
292    let project_path_notify = project_path.clone();
293    let mut watcher = notify::recommended_watcher(move |res: _| match res {
294        Ok(event) => match event {
295            NotifyEvent {
296                kind: EventKind::Create(_),
297                paths,
298                ..
299            }
300            | NotifyEvent {
301                kind: EventKind::Modify(ModifyKind::Data(_)),
302                paths,
303                ..
304            }
305            | NotifyEvent {
306                kind: EventKind::Remove(_),
307                paths,
308                ..
309            } => {
310                // Ensure that paths that take place in ignored directories don't trigger a reload
311                // This is primarily here to avoid recursively triggering reloads for files that are
312                // generated by the build process.
313                if paths.iter().any(|p| {
314                    p.strip_prefix(project_path_notify.as_path())
315                        .is_ok_and(|p| cmd.ignore_dirs.iter().any(|ignore| p.starts_with(ignore)))
316                }) {
317                    return;
318                }
319                // If watch has been paused for any reason, skip notifications
320                if watcher_paused.load(Ordering::SeqCst) {
321                    return;
322                }
323                trace!("file event triggered dev loop: {paths:?}");
324
325                // NOTE(brooksmtownsend): `try_send` here is used intentionally to prevent
326                // multiple file reloads from queuing up a backlog of reloads.
327                let _ = reload_tx.try_send(());
328            }
329            _ => {}
330        },
331        Err(e) => {
332            eprintln!("{} Watch failed: {:?}", emoji::ERROR, e);
333        }
334    })?;
335    watcher.watch(&project_path.clone(), RecursiveMode::Recursive)?;
336
337    // NOTE(brooksmtownsend): Yes, it would make more sense to return here. For some reason unknown to me
338    // trying to return any error here will just cause the dev loop to hang infinitely and require a force quit.
339    // Even a panic will display a tokio error and then hang. Thankfully, the error will just probably happen
340    // again when the dev loop runs and in that case it'll successfully exit out.
341    if let Err(e) = devloop::run(&mut run_loop_state).await {
342        eprintln!(
343            "{} Failed to run first dev loop iteration, will retry: {e}",
344            emoji::WARN
345        );
346    }
347    // Enable file watching
348    pause_watch.store(false, Ordering::SeqCst);
349    // Make sure the reload channel is empty before starting the loop
350    let _ = reload_rx.try_recv();
351
352    // Watch FS for changes and listen for Ctrl + C in tandem
353    eprintln!(
354        "{} Watching for file changes (press Ctrl+c to stop)...",
355        emoji::EYES
356    );
357    loop {
358        select! {
359            // Process a file change/reload
360            _ = reload_rx.recv() => {
361                pause_watch.store(true, Ordering::SeqCst);
362                devloop::run(&mut run_loop_state)
363                    .await
364                    .context("failed to run dev loop iteration")?;
365                eprintln!("\n{} Watching for file changes (press Ctrl+c to stop)...", emoji::EYES);
366                // Avoid jitter with reloads by pausing the watcher for a short time
367                tokio::time::sleep(std::time::Duration::from_millis(100)).await;
368                // Make sure that the reload channel is empty before unpausing the watcher
369                let _ = reload_rx.try_recv();
370                pause_watch.store(false, Ordering::SeqCst);
371            },
372
373            // Process a stop
374            _ = stop_rx.recv() => {
375                pause_watch.store(true, Ordering::SeqCst);
376                eprintln!("\n{} Received Ctrl + c, stopping devloop...", emoji::STOP);
377
378                stop_dev_session(run_loop_state, &ctl_client, wasmcloud_child, wadm_child, nats_child, cmd.leave_host_running).await?;
379
380                break Ok(CommandOutput::from_key_and_text(
381                    "result",
382                    format!(
383                        "{} Dev session [{session_id}] exited successfully.",
384                        emoji::GREEN_CHECK,
385                    ),
386                ));
387            },
388        }
389    }
390}
391
392async fn stop_dev_session(
393    run_loop_state: devloop::RunLoopState<'_>,
394    ctl_client: &wasmcloud_control_interface::Client,
395    wasmcloud_child: Option<tokio::process::Child>,
396    wadm_child: Option<tokio::process::Child>,
397    nats_child: Option<tokio::process::Child>,
398    leave_host_running: bool,
399) -> Result<()> {
400    // Update the sessions file with the fact that this session stopped
401    run_loop_state.dev_session.in_use = false;
402    SessionMetadata::persist_session(run_loop_state.dev_session).await?;
403
404    // Delete manifests related to the application
405    if let Some(dependencies) = run_loop_state.previous_deps {
406        eprintln!(
407            "{} Cleaning up deployed wasmCloud application(s)...",
408            emoji::BROOM
409        );
410        dependencies
411            .delete_manifests(&ctl_client.nats_client(), ctl_client.lattice())
412            .await?;
413    }
414
415    // NOTE(brooksmtownsend): Wait here for a second or so to ensure that all links and config are cleaned up.
416    // There's not a really easy way to ensure everything is cleaned up after deleting the old manifest, so we
417    // can do our best to give wadm a second to tell the host what to delete.
418    tokio::time::sleep(std::time::Duration::from_secs(1)).await;
419
420    // Stop the host, unless explicitly instructed to leave host running
421    if !leave_host_running {
422        eprintln!(
423            "{} Stopping wasmCloud instance...",
424            emoji::HOURGLASS_DRAINING
425        );
426
427        // Stop host via the control interface
428        if let Some((ref host_id, _log_file)) = run_loop_state.dev_session.host_data.as_ref() {
429            let receiver = ctl_client
430                .events_receiver(vec!["host_stopped".to_string()])
431                .await;
432            if let Err(e) = ctl_client.stop_host(host_id, Some(2000)).await {
433                eprintln!(
434                    "{} Failed to stop host through control interface: {e}",
435                    emoji::WARN
436                );
437            }
438
439            // Wait for the host_stopped event to be received
440            if let Ok(mut receiver) = receiver {
441                // If we don't receive the host_stopped event within 2 seconds, log a warning
442                if tokio::time::timeout(std::time::Duration::from_secs(2), receiver.recv())
443                    .await
444                    .is_err()
445                {
446                    eprintln!(
447                        "{} Did not receive host_stopped event, host may have exited early",
448                        emoji::WARN
449                    );
450                }
451            }
452        }
453
454        // Ensure that the host exited, if not, kill the process forcefully
455        if let Some(mut host) = wasmcloud_child {
456            if tokio::time::timeout(std::time::Duration::from_secs(5), host.wait())
457                .await
458                .context("failed to wait for wasmcloud process to stop, forcefully terminating")
459                .is_err()
460            {
461                eprintln!(
462                    "{} Terminating host forcefully, this may leave provider processes running",
463                    emoji::WARN
464                );
465                host.kill()
466                    .await
467                    .context("failed to stop wasmcloud process")?;
468            }
469        }
470
471        // Stop WADM
472        if let Some(mut wadm) = wadm_child {
473            eprintln!("{} Stopping wadm...", emoji::HOURGLASS_DRAINING);
474            wadm.kill()
475                .await
476                .context("failed to stop wadm child process")?;
477            remove_wadm_pidfile(run_loop_state.dev_session.base_dir().await?)
478                .await
479                .context("failed to remove wadm pidfile")?;
480        }
481
482        // Stop NATS
483        if let Some(mut nats) = nats_child {
484            eprintln!("{} Stopping NATS...", emoji::HOURGLASS_DRAINING);
485            nats.kill().await?;
486        }
487    }
488
489    Ok(())
490}