use crate::node::default_node_name;
use crate::util::{api, node_rpc, Rpc, RpcBuilder};
use crate::{help, node::HELP_DETAIL, CommandGlobalOpts};
use clap::Args;
use colorful::Colorful;
use core::time::Duration;
use ockam::TcpTransport;
use ockam_api::nodes::models::identity::ShortIdentityResponse;
use ockam_api::nodes::models::portal::{InletList, OutletList};
use ockam_api::nodes::models::services::ServiceList;
use ockam_api::nodes::models::transport::TransportList;
use ockam_api::{addr_to_multiaddr, cli_state, route_to_multiaddr};
use ockam_core::Route;
use ockam_multiaddr::proto::{DnsAddr, Node, Tcp};
use ockam_multiaddr::MultiAddr;
use tokio_retry::strategy::FibonacciBackoff;
use tracing::debug;
const IS_NODE_UP_MAX_ATTEMPTS: usize = 50;
const IS_NODE_UP_MAX_TIMEOUT: Duration = Duration::from_secs(1);
#[derive(Clone, Debug, Args)]
#[command(arg_required_else_help = true, after_long_help = help::template(HELP_DETAIL))]
pub struct ShowCommand {
#[arg(default_value_t = default_node_name())]
node_name: String,
}
impl ShowCommand {
pub fn run(self, options: CommandGlobalOpts) {
node_rpc(run_impl, (options, self))
}
}
async fn run_impl(
ctx: ockam::Context,
(opts, cmd): (CommandGlobalOpts, ShowCommand),
) -> crate::Result<()> {
let node_name = &cmd.node_name;
let tcp = TcpTransport::create(&ctx).await?;
let mut rpc = RpcBuilder::new(&ctx, &opts, node_name).tcp(&tcp)?.build();
print_query_status(&mut rpc, node_name, false).await?;
Ok(())
}
#[allow(clippy::too_many_arguments)]
fn print_node_info(
node_port: u16,
node_name: &str,
status_is_up: bool,
default_id: Option<&str>,
services: Option<&ServiceList>,
tcp_listeners: Option<&TransportList>,
secure_channel_listeners: Option<&Vec<String>>,
inlets_outlets: Option<(&InletList, &OutletList)>,
) {
println!();
println!("Node:");
println!(" Name: {}", node_name);
println!(
" Status: {}",
match status_is_up {
true => "UP".light_green(),
false => "DOWN".light_red(),
}
);
println!(" Route To Node:");
let mut m = MultiAddr::default();
if m.push_back(Node::new(node_name)).is_ok() {
println!(" Short: {}", m);
}
let mut m = MultiAddr::default();
if m.push_back(DnsAddr::new("localhost")).is_ok() && m.push_back(Tcp::new(node_port)).is_ok() {
println!(" Verbose: {}", m);
}
if let Some(id) = default_id {
println!(" Identity: {}", id);
}
if let Some(list) = tcp_listeners {
println!(" Transports:");
for e in &list.list {
println!(" Transport:");
println!(" Type: {}", e.tt);
println!(" Mode: {}", e.tm);
println!(" Address: {}", e.payload);
}
}
if let Some(list) = secure_channel_listeners {
println!(" Secure Channel Listeners:");
for e in list {
println!(" Listener:");
if let Some(ma) = addr_to_multiaddr(e) {
println!(" Address: {}", ma);
}
}
}
if let Some((inlets, outlets)) = inlets_outlets {
println!(" Inlets:");
for e in &inlets.list {
println!(" Inlet:");
println!(" Listen Address: {}", e.bind_addr);
if let Some(r) = Route::parse(e.outlet_route.as_ref()) {
if let Some(ma) = route_to_multiaddr(&r) {
println!(" Route To Outlet: {}", ma);
}
}
}
println!(" Outlets:");
for e in &outlets.list {
println!(" Outlet:");
println!(" Forward Address: {}", e.tcp_addr);
if let Some(ma) = addr_to_multiaddr(e.worker_addr.as_ref()) {
println!(" Address: {}", ma);
}
}
}
if let Some(list) = services {
println!(" Services:");
for e in &list.list {
println!(" Service:");
println!(" Type: {}", e.service_type);
if let Some(ma) = addr_to_multiaddr(e.addr.as_ref()) {
println!(" Address: {}", ma);
}
}
}
}
pub async fn print_query_status(
rpc: &mut Rpc<'_>,
node_name: &str,
wait_until_ready: bool,
) -> anyhow::Result<()> {
let cli_state = cli_state::CliState::new()?;
if !is_node_up(rpc, wait_until_ready).await? {
let node_state = cli_state.nodes.get(node_name)?;
let node_port = node_state.setup()?.default_tcp_listener()?.addr.port();
print_node_info(node_port, node_name, false, None, None, None, None, None);
} else {
rpc.request(api::short_identity()).await?;
let default_id = match rpc.parse_response::<ShortIdentityResponse>() {
Ok(resp) => String::from(resp.identity_id),
Err(_) => String::from("None"),
};
let mut rpc = rpc.clone();
rpc.request(api::list_services()).await?;
let services = rpc.parse_response::<ServiceList>()?;
let mut rpc = rpc.clone();
rpc.request(api::list_tcp_listeners()).await?;
let tcp_listeners = rpc.parse_response::<TransportList>()?;
let mut rpc = rpc.clone();
rpc.request(api::list_secure_channel_listener()).await?;
let secure_channel_listeners = rpc.parse_response::<Vec<String>>()?;
let mut rpc = rpc.clone();
rpc.request(api::list_inlets()).await?;
let inlets = rpc.parse_response::<InletList>()?;
let mut rpc = rpc.clone();
rpc.request(api::list_outlets()).await?;
let outlets = rpc.parse_response::<OutletList>()?;
let node_state = cli_state.nodes.get(node_name)?;
let node_port = node_state.setup()?.default_tcp_listener()?.addr.port();
print_node_info(
node_port,
node_name,
true,
Some(&default_id),
Some(&services),
Some(&tcp_listeners),
Some(&secure_channel_listeners),
Some((&inlets, &outlets)),
);
}
Ok(())
}
async fn is_node_up(rpc: &mut Rpc<'_>, wait_until_ready: bool) -> anyhow::Result<bool> {
let attempts = match wait_until_ready {
true => IS_NODE_UP_MAX_ATTEMPTS,
false => 1,
};
let timeout = FibonacciBackoff::from_millis(250)
.max_delay(IS_NODE_UP_MAX_TIMEOUT)
.take(attempts);
let cli_state = cli_state::CliState::new()?;
let now = std::time::Instant::now();
for t in timeout {
if cli_state
.nodes
.get(rpc.node_name())?
.setup()?
.default_tcp_listener()
.is_err()
{
tokio::time::sleep(t).await;
continue;
}
if rpc
.request_with_timeout(api::query_status(), t)
.await
.is_ok()
&& rpc.is_ok().is_ok()
{
debug!("Node is up. Took {:?}", now.elapsed());
return Ok(true);
}
}
Ok(false)
}