1use std::collections::HashMap;
2use std::path::PathBuf;
3use std::time::Duration;
4
5use anyhow::{bail, Context};
6use async_nats::RequestErrorKind;
7use clap::{Args, Subcommand};
8use serde_json::json;
9use wadm_client::Result;
10use wadm_types::api::ModelSummary;
11use wadm_types::validation::{ValidationFailure, ValidationOutput};
12use wash_lib::app::{load_app_manifest, validate_manifest_file, AppManifest};
13use wash_lib::cli::get::parse_watch_interval;
14use wash_lib::cli::{CliConnectionOpts, CommandOutput, OutputKind};
15use wash_lib::config::WashConnectionOptions;
16
17use crate::appearance::spinner::Spinner;
18use crossterm::{
19 cursor, execute,
20 terminal::{Clear, ClearType},
21};
22use std::io::Write;
23
24mod output;
25
26#[derive(Debug, Clone, Subcommand)]
27pub enum AppCliCommand {
28 #[clap(name = "list")]
30 List(ListCommand),
31 #[clap(name = "get")]
33 Get(GetCommand),
34 #[clap(name = "status")]
36 Status(StatusCommand),
37 #[clap(name = "history")]
39 History(HistoryCommand),
40 #[clap(name = "delete", alias = "del")]
42 Delete(DeleteCommand),
43 #[clap(name = "put")]
45 Put(PutCommand),
46 #[clap(name = "deploy")]
48 Deploy(DeployCommand),
49 #[clap(name = "undeploy")]
51 Undeploy(UndeployCommand),
52 #[clap(name = "validate")]
54 Validate(ValidateCommand),
55}
56
57#[derive(Args, Debug, Clone)]
58pub struct ListCommand {
59 #[clap(flatten)]
60 opts: CliConnectionOpts,
61
62 #[clap(long,short, num_args = 0..=1, default_missing_value = "1000", value_parser = parse_watch_interval)]
64 pub watch: Option<std::time::Duration>,
65}
66
67#[derive(Args, Debug, Clone)]
68pub struct UndeployCommand {
69 #[clap(name = "name", required_unless_present("all"))]
71 app_name: Option<String>,
72
73 #[clap(flatten)]
74 opts: CliConnectionOpts,
75
76 #[clap(long = "all", default_value = "false")]
78 all: bool,
79}
80
81#[derive(Args, Debug, Clone)]
82pub struct DeployCommand {
83 #[clap(name = "application")]
85 app_name: Option<String>,
86
87 #[clap(name = "version")]
89 version: Option<String>,
90
91 #[clap(long = "replace")]
93 replace: bool,
94
95 #[clap(flatten)]
96 opts: CliConnectionOpts,
97}
98
99#[derive(Args, Debug, Clone)]
100pub struct DeleteCommand {
101 #[clap(name = "name", required_unless_present("all_undeployed"))]
103 app_name: Option<String>,
104
105 #[clap(name = "version")]
107 version: Option<String>,
108
109 #[clap(flatten)]
110 opts: CliConnectionOpts,
111
112 #[clap(long = "all-undeployed", default_value = "false")]
114 all_undeployed: bool,
115}
116
117#[derive(Args, Debug, Clone)]
118pub struct PutCommand {
119 source: Option<String>,
121
122 #[clap(flatten)]
123 opts: CliConnectionOpts,
124}
125
126#[derive(Args, Debug, Clone)]
128pub struct GetCommand {
129 #[clap(name = "name")]
133 app_name: Option<String>,
134
135 #[clap(name = "version")]
137 version: Option<String>,
138
139 #[clap(long,short, num_args = 0..=1, default_missing_value = "1s", value_parser = parse_watch_interval)]
143 pub watch: Option<std::time::Duration>,
144
145 #[clap(flatten)]
146 opts: CliConnectionOpts,
147}
148
149#[derive(Args, Debug, Clone)]
150pub struct StatusCommand {
151 #[clap(name = "name")]
153 app_name: String,
154
155 #[clap(flatten)]
156 opts: CliConnectionOpts,
157}
158
159#[derive(Args, Debug, Clone)]
160pub struct HistoryCommand {
161 #[clap(name = "name")]
163 app_name: String,
164
165 #[clap(flatten)]
166 opts: CliConnectionOpts,
167}
168
169#[derive(Args, Debug, Clone)]
170pub struct ValidateCommand {
171 #[clap(name = "application")]
173 application: PathBuf,
174 #[clap(long)]
176 check_image_refs: bool,
177}
178
179pub async fn handle_command(
180 command: AppCliCommand,
181 output_kind: OutputKind,
182) -> anyhow::Result<CommandOutput> {
183 use AppCliCommand::*;
184 let sp: Spinner = Spinner::new(&output_kind)?;
185 let command_output: wadm_client::Result<CommandOutput> = match command {
186 List(cmd) => {
187 sp.update_spinner_message("Listing applications ...".to_string());
188 get_application_list(cmd, &sp).await
189 }
190 Get(cmd) => {
191 if let Some(app_name) = cmd.clone().app_name {
192 sp.update_spinner_message("Getting application... ".to_string());
193 get_manifest(cmd, &app_name).await
194 } else {
195 sp.update_spinner_message("Getting application manifests... ".to_string());
196 get_applications(cmd, &sp).await
197 }
198 }
199 Status(cmd) => {
200 sp.update_spinner_message("Getting application status ... ".to_string());
201 get_model_status(cmd).await
202 }
203 History(cmd) => {
204 sp.update_spinner_message("Getting application version history ... ".to_string());
205 get_application_versions(cmd).await
206 }
207 Delete(cmd) => {
208 sp.update_spinner_message("Deleting application version ... ".to_string());
209 delete_application_version(cmd).await
210 }
211 Put(cmd) => {
212 sp.update_spinner_message("Creating application version ... ".to_string());
213 put_model(cmd).await
214 }
215 Deploy(cmd) => {
216 sp.update_spinner_message("Deploying application ... ".to_string());
217 deploy_model(cmd).await
218 }
219 Undeploy(cmd) => {
220 sp.update_spinner_message("Undeploying application ... ".to_string());
221 undeploy_model(cmd).await
222 }
223 Validate(cmd) => {
224 sp.update_spinner_message("Validating application manifest ... ".to_string());
225 handle_validate(cmd).await
226 }
227 };
228
229 match command_output {
231 Err(wadm_client::error::ClientError::NatsError(e))
232 if e.kind() == RequestErrorKind::NoResponders =>
233 {
234 bail!("Connection succeeded to lattice but no wadm server was listening. Ensure wadm is running.")
235 }
236 _ => {}
237 }
238
239 sp.finish_and_clear();
240
241 Ok(command_output?)
242}
243async fn handle_validate(cmd: ValidateCommand) -> Result<CommandOutput> {
245 let (_manifest, validation_results) =
246 validate_manifest_file(&cmd.application, cmd.check_image_refs)
247 .await
248 .context("failed to validate Wadm manifest")?;
249 Ok(show_validate_manifest_results(validation_results))
250}
251
252async fn undeploy_model(cmd: UndeployCommand) -> Result<CommandOutput> {
253 let connection_opts =
254 <CliConnectionOpts as TryInto<WashConnectionOptions>>::try_into(cmd.opts)?;
255 let lattice = Some(connection_opts.get_lattice());
256 let client = connection_opts.into_nats_client().await?;
257
258 let models = match cmd.app_name {
261 Some(app_name) => {
263 let model_name = if tokio::fs::try_exists(&app_name)
266 .await
267 .is_ok_and(|exists| exists)
268 {
269 let manifest = load_app_manifest(app_name.parse()?)
270 .await
271 .with_context(|| format!("failed to load app manifest at [{app_name}]"))?;
272 manifest
273 .name()
274 .map(ToString::to_string)
275 .context("failed to find name of manifest")?
276 } else {
277 app_name
278 };
279
280 vec![model_name]
281 }
282 None if cmd.all => wash_lib::app::get_models(&client, lattice.clone())
284 .await?
285 .into_iter()
286 .map(|m| m.name)
287 .collect(),
288 _ => Vec::new(),
289 };
290
291 let mut undeployed = Vec::new();
292 let mut output_map = HashMap::new();
293
294 for model_name in models.iter() {
296 match wash_lib::app::undeploy_model(&client, lattice.clone(), model_name).await {
297 Ok(_) => undeployed.push(model_name),
298 Err(e) => eprintln!("failed to undeploy model [{model_name}]: {e}"),
299 }
300 }
301
302 let output_msg = match &models[..] {
303 [] => "No applications undeployed".into(),
304 [m] => format!("Undeployed application: {}", m),
305 _ => format!("Undeployed [{}] applications", undeployed.len()),
306 };
307 output_map.insert("results".to_string(), json!(output_msg));
308 output_map.insert(
309 "undeployed_application_names".to_string(),
310 json!(undeployed),
311 );
312 Ok(CommandOutput::new(output_msg, output_map))
313}
314
315async fn deploy_model(cmd: DeployCommand) -> Result<CommandOutput> {
316 let connection_opts =
317 <CliConnectionOpts as TryInto<WashConnectionOptions>>::try_into(cmd.opts)?;
318 let lattice = Some(connection_opts.get_lattice());
319
320 let client = connection_opts.into_nats_client().await?;
321
322 let app_manifest = match cmd.app_name {
323 Some(source) if source == "-" => load_app_manifest("-".parse()?).await?,
324 Some(source) => load_app_manifest(source.parse()?).await?,
325 None => {
326 return Err(wadm_client::error::ClientError::ManifestLoad(
327 anyhow::anyhow!(
328 "Missing manifest name/path. To load a manifest from STDIN, please pass '-'"
329 ),
330 ))
331 }
332 };
333
334 if cmd.replace {
336 if let (Some(name), version) = (
337 app_manifest.name(),
338 app_manifest.version().map(ToString::to_string),
339 ) {
340 if let Err(e) =
341 wash_lib::app::delete_model_version(&client, lattice.clone(), name, version).await
342 {
343 eprintln!("🟨 Failed to delete model during replace operation: {e}");
344 }
345 }
346 }
347
348 deploy_model_from_manifest(&client, lattice, app_manifest, cmd.version).await
349}
350
351pub(crate) async fn deploy_model_from_manifest(
352 client: &async_nats::Client,
353 lattice: Option<String>,
354 manifest: AppManifest,
355 version: Option<String>,
356) -> Result<CommandOutput> {
357 let (name, version) = match manifest {
358 AppManifest::SerializedModel(manifest) => wash_lib::app::put_and_deploy_model(
359 client,
360 lattice,
361 serde_yaml::to_string(&manifest)
362 .context("failed to convert manifest to string")?
363 .as_ref(),
364 )
365 .await
366 .map(|(name, version)| (name, Some(version))),
367 AppManifest::ModelName(model_name) => {
368 wash_lib::app::deploy_model(client, lattice, &model_name, version.clone()).await
369 }
370 }?;
371
372 let mut map = HashMap::new();
373 let version = version.unwrap_or_default();
374 map.insert("deployed".to_string(), json!(true));
375 map.insert("model_name".to_string(), json!(name));
376 map.insert("model_version".to_string(), json!(version));
377 Ok(CommandOutput::new(
378 format!("Deployed application \"{name}\", version \"{version}\""),
379 map,
380 ))
381}
382
383async fn put_model(cmd: PutCommand) -> Result<CommandOutput> {
384 let connection_opts =
385 <CliConnectionOpts as TryInto<WashConnectionOptions>>::try_into(cmd.opts)?;
386 let lattice = Some(connection_opts.get_lattice());
387
388 let client = connection_opts.into_nats_client().await?;
389
390 let app_manifest = match &cmd.source {
391 Some(source) => load_app_manifest(source.parse()?).await?,
392 None => load_app_manifest("-".parse()?).await?,
393 };
394
395 let (name, version) = match app_manifest {
396 AppManifest::SerializedModel(manifest) => wash_lib::app::put_model(
397 &client,
398 lattice,
399 serde_yaml::to_string(&manifest)
400 .context("failed to convert manifest to string")?
401 .as_ref(),
402 )
403 .await
404 .map_err(|e| anyhow::anyhow!(e)),
405 AppManifest::ModelName(name) => {
406 return Err(wadm_client::error::ClientError::ManifestLoad(anyhow::anyhow!("failed to retrieve manifest. Ensure `{name}` is a valid path to a Wadm application manifest.")));
407 }
408 }?;
409
410 let mut map = HashMap::new();
411 map.insert("deployed".to_string(), json!(true));
412 map.insert("model_name".to_string(), json!(name));
413 map.insert("model_version".to_string(), json!(version));
414 Ok(CommandOutput::new(
415 format!("Put application \"{name}\", version \"{version}\""),
416 map,
417 ))
418}
419
420async fn get_application_versions(cmd: HistoryCommand) -> Result<CommandOutput> {
421 let connection_opts =
422 <CliConnectionOpts as TryInto<WashConnectionOptions>>::try_into(cmd.opts)?;
423 let lattice = Some(connection_opts.get_lattice());
424
425 let client = connection_opts.into_nats_client().await?;
426
427 let versions = wash_lib::app::get_model_history(&client, lattice, &cmd.app_name).await?;
428 let mut map = HashMap::new();
429 map.insert("revisions".to_string(), json!(versions));
430 Ok(CommandOutput::new(
431 output::list_revisions_table(versions),
432 map,
433 ))
434}
435
436async fn get_model_status(cmd: StatusCommand) -> Result<CommandOutput> {
437 let connection_opts =
438 <CliConnectionOpts as TryInto<WashConnectionOptions>>::try_into(cmd.opts)?;
439 let lattice = Some(connection_opts.get_lattice());
440
441 let client = connection_opts.into_nats_client().await?;
442
443 let status = wash_lib::app::get_model_status(&client, lattice, &cmd.app_name).await?;
444
445 let mut map = HashMap::new();
446 map.insert("status".to_string(), json!(status));
447 Ok(CommandOutput::new(
448 output::status_table(cmd.app_name, status),
449 map,
450 ))
451}
452
453async fn get_manifest(cmd: GetCommand, app_name: &str) -> Result<CommandOutput> {
454 let connection_opts =
455 <CliConnectionOpts as TryInto<WashConnectionOptions>>::try_into(cmd.opts)?;
456 let lattice = Some(connection_opts.get_lattice());
457
458 let client = connection_opts.into_nats_client().await?;
459
460 let manifest =
461 wash_lib::app::get_model_details(&client, lattice, app_name, cmd.version).await?;
462
463 let mut map = HashMap::new();
464 map.insert("application".to_string(), json!(manifest));
465 let yaml = serde_yaml::to_string(&manifest).unwrap();
466 Ok(CommandOutput::new(yaml, map))
467}
468
469async fn delete_application_version(cmd: DeleteCommand) -> Result<CommandOutput> {
470 let connection_opts =
471 <CliConnectionOpts as TryInto<WashConnectionOptions>>::try_into(cmd.opts)?;
472 let lattice = Some(connection_opts.get_lattice());
473
474 let client = connection_opts.into_nats_client().await?;
475
476 let models = match cmd.app_name {
479 Some(app_name) => {
481 let (model_name, version): (String, Option<String>) =
484 if tokio::fs::try_exists(&app_name)
485 .await
486 .is_ok_and(|exists| exists)
487 {
488 let manifest = load_app_manifest(app_name.parse()?)
489 .await
490 .with_context(|| format!("failed to load app manifest at [{app_name}]"))?;
491 (
492 manifest
493 .name()
494 .map(ToString::to_string)
495 .context("failed to find name of manifest")?,
496 manifest.version().map(ToString::to_string),
497 )
498 } else {
499 (app_name, cmd.version)
500 };
501
502 vec![(model_name, version)]
503 }
504 None if cmd.all_undeployed => wash_lib::app::get_models(&client, lattice.clone())
506 .await?
507 .into_iter()
508 .filter_map(|m| match m.detailed_status.info.status_type {
509 wadm_types::api::StatusType::Undeployed => Some((m.name, Some(m.version))),
510 _ => None,
511 })
512 .collect(),
513 _ => Vec::new(),
514 };
515
516 let mut deleted_models = Vec::new();
517
518 #[derive(serde::Serialize)]
519 struct ModelNameAndVersion<'a> {
520 model_name: &'a String,
521 version: &'a Option<String>,
522 }
523
524 for (model_name, version) in models.iter() {
526 match wash_lib::app::delete_model_version(
527 &client,
528 lattice.clone(),
529 model_name,
530 version.clone(),
531 )
532 .await
533 {
534 Ok(true) => deleted_models.push(ModelNameAndVersion {
535 model_name,
536 version,
537 }),
538 Ok(false) => {}
540 Err(e) => {
541 eprintln!("failed to delete model [{model_name}]: {e}");
542 }
543 }
544 }
545
546 let mut output_map = HashMap::new();
547 let output_msg = match models[..] {
548 [] => "No applications deleted".into(),
549 [(ref model_name, _)] => {
550 output_map.insert("deleted".to_string(), json!(true));
551 if deleted_models.len() == 1 {
552 format!("Deleted application: {model_name}")
553 } else {
554 format!("Already deleted application: {model_name}")
555 }
556 }
557 _ => {
558 output_map.insert("deleted_applications".into(), json!(deleted_models));
559 format!("Deleted [{}] applications", deleted_models.len())
560 }
561 };
562
563 Ok(CommandOutput::new(output_msg, output_map))
564}
565
566async fn get_application_list(cmd: ListCommand, sp: &Spinner) -> Result<CommandOutput> {
567 let connection_opts =
568 <CliConnectionOpts as TryInto<WashConnectionOptions>>::try_into(cmd.opts)?;
569 let lattice = Some(connection_opts.get_lattice());
570
571 let client = connection_opts.into_nats_client().await?;
572
573 if cmd.watch.is_some() {
574 sp.finish_and_clear();
575 watch_applications(&client, lattice, cmd.watch).await?;
576 Ok(CommandOutput::new(
577 "Completed Watching Applications".to_string(),
578 HashMap::new(),
579 ))
580 } else {
581 let models = wash_lib::app::get_models(&client, lattice).await?;
582 let mut map = HashMap::new();
583 map.insert("applications".to_string(), json!(models));
584 Ok(CommandOutput::new(output::list_models_table(models), map))
585 }
586}
587
588async fn get_applications(cmd: GetCommand, sp: &Spinner) -> Result<CommandOutput> {
589 let connection_opts =
590 <CliConnectionOpts as TryInto<WashConnectionOptions>>::try_into(cmd.opts)?;
591 let lattice = Some(connection_opts.get_lattice());
592
593 let client = connection_opts.into_nats_client().await?;
594
595 if cmd.watch.is_some() {
596 sp.finish_and_clear();
597 watch_applications(&client, lattice, cmd.watch).await?;
598 Ok(CommandOutput::new(
599 "Completed Watching Applications".to_string(),
600 HashMap::new(),
601 ))
602 } else {
603 let models = wash_lib::app::get_models(&client, lattice).await?;
604 let mut map = HashMap::new();
605 map.insert("applications".to_string(), json!(models));
606 Ok(CommandOutput::new(output::list_models_table(models), map))
607 }
608}
609
610async fn watch_applications(
611 client: &async_nats::Client,
612 lattice: Option<String>,
613 watch: Option<Duration>,
614) -> Result<()> {
615 let mut stdout = std::io::stdout();
616
617 execute!(stdout, Clear(ClearType::FromCursorUp), cursor::MoveTo(0, 0))
618 .map_err(|e| anyhow::anyhow!("Failed to clear terminal: {}", e))?;
619
620 let mut ctrlc = std::pin::pin!(tokio::signal::ctrl_c());
621 let watch_interval = watch.unwrap_or(Duration::from_millis(1000));
622
623 loop {
624 let models = tokio::select! {
625 res = wash_lib::app::get_models(client, lattice.clone()) => res?,
626 _res = &mut ctrlc => {
627 execute!(stdout, Clear(ClearType::Purge), Clear(ClearType::FromCursorUp), cursor::MoveTo(0, 0), cursor::Show)
628 .map_err(|e| anyhow::anyhow!("Failed to execute terminal commands: {}", e))?;
629 stdout.flush()
630 .map_err(|e| anyhow::anyhow!("Failed to flush stdout: {}", e))?;
631 return Ok(());
632 }
633 };
634
635 let table = output::list_models_table(models);
636
637 execute!(stdout, Clear(ClearType::Purge), cursor::MoveTo(0, 0))
638 .map_err(|e| anyhow::anyhow!("Failed to execute terminal commands: {}", e))?;
639
640 stdout
641 .write_all(table.as_bytes())
642 .map_err(|e| anyhow::anyhow!("Failed to write table to stdout: {}", e))?;
643
644 stdout
645 .flush()
646 .map_err(|e| anyhow::anyhow!("Failed to flush stdout: {}", e))?;
647
648 execute!(
649 stdout,
650 Clear(ClearType::CurrentLine),
651 Clear(ClearType::FromCursorDown),
652 )
653 .map_err(|e| anyhow::anyhow!("Failed to clear terminal: {}", e))?;
654
655 tokio::select! {
656 _ = tokio::time::sleep(watch_interval) => continue,
657 _res = &mut ctrlc => {
658 execute!(stdout, Clear(ClearType::Purge), Clear(ClearType::FromCursorUp), cursor::MoveTo(0, 0), cursor::Show)
659 .map_err(|e| anyhow::anyhow!("Failed to execute terminal commands: {}", e))?;
660 stdout.flush()
661 .map_err(|e| anyhow::anyhow!("Failed to flush stdout: {}", e))?;
662 return Ok(());
663 }
664 }
665 }
666}
667
668fn show_validate_manifest_results(messages: impl AsRef<[ValidationFailure]>) -> CommandOutput {
669 let messages = messages.as_ref();
670 let valid = messages.valid();
671 let warnings = messages
672 .warnings()
673 .into_iter()
674 .cloned()
675 .collect::<Vec<ValidationFailure>>();
676 let errors = messages
677 .errors()
678 .into_iter()
679 .cloned()
680 .collect::<Vec<ValidationFailure>>();
681 let message = if valid {
682 "manifest is valid".into()
683 } else {
684 format!(
685 r#"invalid manifest:
686warnings: {warnings:#?}
687errors: {errors:#?}
688"#
689 )
690 };
691 let json_output = HashMap::<String, serde_json::Value>::from([
692 ("valid".into(), messages.valid().into()),
693 ("warnings".into(), json!(warnings)),
694 ("errors".into(), json!(errors)),
695 ]);
696 CommandOutput::new(message, json_output)
697}