use clap::Args;
use console::Term;
use miette::IntoDiagnostic;
use ockam_api::nodes::models::secure_channel::ListSecureChannelListenerResponse;
use tokio_retry::strategy::FixedInterval;
use tracing::{info, trace, warn};
use ockam_api::nodes::models::base::NodeStatus;
use ockam_api::nodes::models::portal::{InletList, OutletList};
use ockam_api::nodes::models::services::ServiceList;
use ockam_api::nodes::models::transport::TransportList;
use ockam_api::nodes::BackgroundNodeClient;
use ockam_core::AsyncTryClone;
use ockam_node::Context;
use crate::terminal::tui::ShowCommandTui;
use crate::terminal::PluralTerm;
use crate::util::{api, async_cmd};
use crate::{docs, CommandGlobalOpts, Result, Terminal, TerminalStream};
use super::models::portal::{ShowInletStatus, ShowOutletStatus};
use super::models::secure_channel::ShowSecureChannelListener;
use super::models::services::ShowServiceStatus;
use super::models::show::ShowNodeResponse;
use super::models::transport::ShowTransportStatus;
const LONG_ABOUT: &str = include_str!("./static/show/long_about.txt");
const PREVIEW_TAG: &str = include_str!("../static/preview_tag.txt");
const AFTER_LONG_HELP: &str = include_str!("./static/show/after_long_help.txt");
const IS_NODE_UP_TIME_BETWEEN_CHECKS_MS: usize = 50;
const IS_NODE_UP_MAX_ATTEMPTS: usize = 60;
#[derive(Clone, Debug, Args)]
#[command(
long_about = docs::about(LONG_ABOUT),
before_help = docs::before_help(PREVIEW_TAG),
after_long_help = docs::after_help(AFTER_LONG_HELP)
)]
pub struct ShowCommand {
node_name: Option<String>,
}
impl ShowCommand {
pub fn run(self, opts: CommandGlobalOpts) -> miette::Result<()> {
async_cmd(&self.name(), opts.clone(), |ctx| async move {
self.async_run(&ctx, opts).await
})
}
pub fn name(&self) -> String {
"show node".into()
}
async fn async_run(&self, ctx: &Context, opts: CommandGlobalOpts) -> miette::Result<()> {
ShowTui::run(ctx, opts, self.node_name.clone()).await
}
}
pub struct ShowTui {
ctx: Context,
opts: CommandGlobalOpts,
node_name: Option<String>,
}
impl ShowTui {
pub async fn run(
ctx: &Context,
opts: CommandGlobalOpts,
node_name: Option<String>,
) -> miette::Result<()> {
let tui = Self {
ctx: ctx.async_try_clone().await.into_diagnostic()?,
opts,
node_name,
};
tui.show().await
}
}
#[ockam_core::async_trait]
impl ShowCommandTui for ShowTui {
const ITEM_NAME: PluralTerm = PluralTerm::Node;
fn cmd_arg_item_name(&self) -> Option<String> {
self.node_name.clone()
}
fn terminal(&self) -> Terminal<TerminalStream<Term>> {
self.opts.terminal.clone()
}
async fn get_arg_item_name_or_default(&self) -> miette::Result<String> {
Ok(self
.opts
.state
.get_node_or_default(&self.node_name)
.await?
.name())
}
async fn list_items_names(&self) -> miette::Result<Vec<String>> {
Ok(self
.opts
.state
.get_nodes()
.await?
.iter()
.map(|n| n.name())
.collect())
}
async fn show_single(&self, item_name: &str) -> miette::Result<()> {
let mut node =
BackgroundNodeClient::create(&self.ctx, &self.opts.state, &Some(item_name.to_string()))
.await?;
print_query_status(&self.opts, &self.ctx, &mut node, false).await?;
Ok(())
}
}
pub async fn print_query_status(
opts: &CommandGlobalOpts,
ctx: &Context,
node: &mut BackgroundNodeClient,
wait_until_ready: bool,
) -> miette::Result<()> {
let cli_state = opts.state.clone();
let node_name = node.node_name();
let node_info = cli_state.get_node(&node_name).await?;
let show_node = if !is_node_up(ctx, node, wait_until_ready).await? {
let is_authority_node = cli_state
.get_node(&node_name)
.await
.ok()
.map(|n| n.is_authority_node())
.unwrap_or(false);
ShowNodeResponse::new(
node_info.is_default(),
&node_name,
is_authority_node,
node_info.tcp_listener_port(),
node_info.pid(),
)
} else {
let mut show_node = ShowNodeResponse::new(
node_info.is_default(),
&node_name,
true,
node_info.tcp_listener_port(),
node_info.pid(),
);
let services: ServiceList = node.ask(ctx, api::list_services()).await?;
show_node.services = services
.list
.into_iter()
.map(ShowServiceStatus::from)
.collect();
let transports: TransportList = node.ask(ctx, api::list_tcp_listeners()).await?;
show_node.transports = transports
.list
.into_iter()
.map(ShowTransportStatus::from)
.collect();
let listeners: ListSecureChannelListenerResponse =
node.ask(ctx, api::list_secure_channel_listener()).await?;
show_node.secure_channel_listeners = listeners
.list
.into_iter()
.map(ShowSecureChannelListener::from)
.collect();
let inlets: InletList = node.ask(ctx, api::list_inlets()).await?;
show_node.inlets = inlets.list.into_iter().map(ShowInletStatus::from).collect();
let outlets: OutletList = node.ask(ctx, api::list_outlets()).await?;
show_node.outlets = outlets
.list
.into_iter()
.map(ShowOutletStatus::from)
.collect();
show_node
};
opts.terminal
.clone()
.stdout()
.plain(&show_node)
.json(serde_json::to_string_pretty(&show_node).into_diagnostic()?)
.write_line()?;
Ok(())
}
pub async fn is_node_up(
ctx: &Context,
node_client: &mut BackgroundNodeClient,
wait_until_ready: bool,
) -> Result<bool> {
let attempts = match wait_until_ready {
true => IS_NODE_UP_MAX_ATTEMPTS,
false => 1,
};
let retries =
FixedInterval::from_millis(IS_NODE_UP_TIME_BETWEEN_CHECKS_MS as u64).take(attempts);
let now = std::time::Instant::now();
let node_name = node_client.node_name();
for timeout_duration in retries {
let node = node_client.cli_state().get_node(&node_name).await.ok();
let node_tcp_listener_address = node.as_ref().and_then(|n| n.tcp_listener_address());
if node.is_none() || node_tcp_listener_address.is_none() {
trace!(%node_name, "node has not been initialized");
tokio::time::sleep(timeout_duration).await;
continue;
}
let result = node_client
.set_timeout_mut(timeout_duration)
.ask::<(), NodeStatus>(ctx, api::query_status())
.await;
if let Ok(node_status) = result {
let elapsed = now.elapsed();
info!(%node_name, ?elapsed, "node is up {:?}", node_status);
return Ok(true);
} else {
trace!(%node_name, "node is initializing");
tokio::time::sleep(timeout_duration).await;
}
}
warn!(%node_name, "node didn't respond in time");
Ok(false)
}