use std::collections::HashMap;
use std::net::SocketAddr;
use clap::Args;
use colorful::Colorful;
use miette::IntoDiagnostic;
use opentelemetry::trace::FutureExt;
use tokio::sync::Mutex;
use tokio::try_join;
use ockam::Context;
use ockam_abac::Expr;
use ockam_api::address::extract_address_value;
use ockam_api::journeys::{JourneyEvent, NODE_NAME, TCP_OUTLET_AT, TCP_OUTLET_FROM, TCP_OUTLET_TO};
use ockam_api::nodes::service::portals::Outlets;
use ockam_api::nodes::BackgroundNodeClient;
use ockam_core::Address;
use crate::node::util::initialize_default_node;
use crate::util::async_cmd;
use crate::util::parsers::socket_addr_parser;
use crate::{docs, fmt_info, fmt_ok, CommandGlobalOpts};
use crate::{fmt_log, terminal::color_primary};
const AFTER_LONG_HELP: &str = include_str!("./static/create/after_long_help.txt");
const LONG_ABOUT: &str = include_str!("./static/create/long_about.txt");
#[derive(Clone, Debug, Args)]
#[command(
long_about = docs::about(LONG_ABOUT),
after_long_help = docs::after_help(AFTER_LONG_HELP)
)]
pub struct CreateCommand {
#[arg(long, display_order = 900, id = "SOCKET_ADDRESS", value_parser = socket_addr_parser)]
pub to: SocketAddr,
#[arg(long, display_order = 902, id = "OUTLET_ADDRESS", value_parser = extract_address_value)]
pub from: Option<String>,
#[arg(long, display_order = 903, id = "NODE_NAME", value_parser = extract_address_value)]
pub at: Option<String>,
#[arg(hide = true, long = "allow", display_order = 904, id = "EXPRESSION")]
pub policy_expression: Option<Expr>,
}
impl CreateCommand {
pub fn run(self, opts: CommandGlobalOpts) -> miette::Result<()> {
async_cmd(&self.name(), opts.clone(), |ctx| async move {
self.async_run(&ctx, opts).await
})
}
pub fn name(&self) -> String {
"create tcp outlet".into()
}
pub async fn async_run(self, ctx: &Context, opts: CommandGlobalOpts) -> miette::Result<()> {
initialize_default_node(ctx, &opts).await?;
let node = BackgroundNodeClient::create(ctx, &opts.state, &self.at).await?;
let node_name = node.node_name();
let is_finished: Mutex<bool> = Mutex::new(false);
let send_req = async {
let from = self.from.map(Address::from);
let res = node
.create_outlet(ctx, &self.to, from.as_ref(), self.policy_expression)
.await?;
*is_finished.lock().await = true;
Ok(res)
}
.with_current_context();
let output_messages = vec![
format!(
"Attempting to create TCP Outlet to {}...",
color_primary(self.to.to_string())
),
format!(
"Creating outlet service on node {}...",
color_primary(&node_name)
),
"Setting up TCP outlet worker...".to_string(),
];
let progress_output = opts
.terminal
.progress_output(&output_messages, &is_finished);
let (outlet_status, _) = try_join!(send_req, progress_output)?;
let worker_addr = outlet_status.worker_address().into_diagnostic()?;
let json = serde_json::to_string_pretty(&outlet_status).into_diagnostic()?;
let mut attributes = HashMap::new();
attributes.insert(TCP_OUTLET_AT, node_name.clone());
attributes.insert(TCP_OUTLET_FROM, worker_addr.to_string().clone());
attributes.insert(TCP_OUTLET_TO, self.to.to_string());
attributes.insert(NODE_NAME, node_name.clone());
opts.state
.add_journey_event(JourneyEvent::TcpOutletCreated, attributes)
.await?;
opts.terminal
.stdout()
.plain(
fmt_ok!("Created a new TCP Outlet\n")
+ &fmt_log!(" Node: {}\n", color_primary(&node_name))
+ &fmt_log!(
" Outlet Address: {}\n",
color_primary(outlet_status.worker_addr.address())
)
+ &fmt_log!(" Socket Address: {}\n", color_primary(self.to.to_string()))
+ &fmt_info!(
"You may want to take a look at the {}, {}, {} commands next",
color_primary("ockam relay"),
color_primary("ockam tcp-inlet"),
color_primary("ockam policy")
),
)
.machine(worker_addr)
.json(json)
.write_line()?;
Ok(())
}
}