use crate::node::util::initialize_default_node;
use crate::shared_args::OptionalTimeoutArg;
use crate::tcp::util::alias_parser;
use crate::util::parsers::duration_parser;
use crate::util::parsers::hostname_parser;
use crate::util::parsers::http_header_parser;
use crate::util::{
port_is_free_guard, print_warning_for_deprecated_flag_replaced, process_nodes_multiaddr,
};
use crate::{docs, Command, CommandGlobalOpts, Error};
use async_trait::async_trait;
use clap::builder::FalseyValueParser;
use clap::Args;
use colorful::Colorful;
use miette::{miette, IntoDiagnostic};
use ockam::identity::Identifier;
use ockam::transport::SchemeHostnamePort;
use ockam::Context;
use ockam_abac::PolicyExpression;
use ockam_api::address::extract_address_value;
use ockam_api::cli_state::journeys::{
JourneyEvent, NODE_NAME, TCP_INLET_ALIAS, TCP_INLET_AT, TCP_INLET_CONNECTION_STATUS,
TCP_INLET_FROM, TCP_INLET_TO,
};
use ockam_api::cli_state::{random_name, CliState};
use ockam_api::colors::{color_primary, color_primary_alt};
use ockam_api::nodes::models::portal::InletStatus;
use ockam_api::nodes::service::tcp_inlets::Inlets;
use ockam_api::nodes::BackgroundNodeClient;
use ockam_api::{fmt_info, fmt_log, fmt_ok, fmt_warn, ConnectionStatus};
use ockam_core::api::{Reply, Status};
use ockam_core::{route, Address};
use ockam_multiaddr::proto;
use ockam_multiaddr::{MultiAddr, Protocol as _};
use ockam_node::compat::asynchronous::resolve_peer;
use std::collections::HashMap;
use std::str::FromStr;
use std::time::Duration;
use tracing::trace;
const AFTER_LONG_HELP: &str = include_str!("./static/create/after_long_help.txt");
#[derive(Clone, Debug, Args)]
#[command(after_long_help = docs::after_help(AFTER_LONG_HELP))]
pub struct CreateCommand {
#[arg(id = "NAME", value_parser = alias_parser)]
pub name: Option<String>,
#[arg(long, display_order = 900, id = "NODE_NAME", value_parser = extract_address_value)]
pub at: Option<String>,
#[arg(long, display_order = 900, id = "SOCKET_ADDRESS", hide_default_value = true, default_value_t = tcp_inlet_default_from_addr(), value_parser = hostname_parser)]
pub from: SchemeHostnamePort,
#[arg(long, display_order = 900, id = "ROUTE", default_value_t = tcp_inlet_default_to_addr())]
pub to: String,
#[arg(long, display_order = 900, id = "RELAY_NAME")]
pub via: Option<String>,
#[arg(long, value_name = "IDENTITY_NAME", display_order = 900)]
pub identity: Option<String>,
#[arg(long, name = "AUTHORIZED", display_order = 900)]
pub authorized: Option<Identifier>,
#[arg(long, display_order = 900, id = "ALIAS", value_parser = alias_parser)]
pub alias: Option<String>,
#[arg(help = docs::about("\
Policy expression that will be used for access control to the TCP Inlet. \
If you don't provide it, the policy set for the \"tcp-inlet\" resource type will be used. \
\n\nYou can check the fallback policy with `ockam policy show --resource-type tcp-inlet`."))]
#[arg(
long,
visible_alias = "expression",
display_order = 900,
id = "POLICY_EXPRESSION"
)]
pub allow: Option<PolicyExpression>,
#[arg(long, display_order = 900, id = "WAIT", default_value = "5s", value_parser = duration_parser)]
pub connection_wait: Duration,
#[arg(long, display_order = 900, id = "RETRY", default_value = "20s", value_parser = duration_parser)]
pub retry_wait: Duration,
#[command(flatten)]
pub timeout: OptionalTimeoutArg,
#[arg(long, default_value = "false")]
pub no_connection_wait: bool,
#[arg(
long,
visible_alias = "enable-udp-puncture",
value_name = "BOOL",
default_value_t = false,
hide = true
)]
pub udp: bool,
#[arg(
long,
visible_alias = "disable-tcp-fallback",
value_name = "BOOL",
default_value_t = false,
hide = true
)]
pub no_tcp_fallback: bool,
#[arg(long, env = "OCKAM_PRIVILEGED", value_parser = FalseyValueParser::default(), hide = true)]
pub privileged: bool,
#[arg(long, value_name = "BOOL", default_value_t = false, hide = true)]
pub tls: bool,
#[arg(long, value_name = "ROUTE", hide = true)]
pub tls_certificate_provider: Option<MultiAddr>,
#[arg(long, env = "OCKAM_TCP_PORTAL_SKIP_HANDSHAKE", value_parser = FalseyValueParser::default())]
pub skip_handshake: bool,
#[arg(long, env = "OCKAM_TCP_PORTAL_ENABLE_NAGLE", value_parser = FalseyValueParser::default())]
pub enable_nagle: bool,
#[arg(long, value_name = "HTTP_HEADER", value_parser = http_header_parser)]
pub http_header: Vec<(String, String)>,
}
pub(crate) fn tcp_inlet_default_from_addr() -> SchemeHostnamePort {
SchemeHostnamePort::from_str("127.0.0.1:0").unwrap()
}
pub(crate) fn tcp_inlet_default_to_addr() -> String {
"/project/<default_project_name>/service/forward_to_<default_relay_name>/secure/api/service/<default_service_name>".to_string()
}
#[async_trait]
impl Command for CreateCommand {
const NAME: &'static str = "tcp-inlet create";
async fn run(self, ctx: &Context, opts: CommandGlobalOpts) -> crate::Result<()> {
initialize_default_node(ctx, &opts).await?;
let cmd = self.parse_args(&opts).await?;
let mut node = BackgroundNodeClient::create(ctx, &opts.state, &cmd.at).await?;
cmd.timeout.timeout.map(|t| node.set_timeout_mut(t));
let inlet_status = {
let pb = opts.terminal.spinner();
let prefix_route = if !cmd.http_header.is_empty() {
let overwrite_http_header_address = Address::random_tagged("http_interceptor");
if let Some(pb) = pb.as_ref() {
pb.set_message(format!(
"Creating HTTP Interceptor Service at {}...\n",
color_primary(&overwrite_http_header_address)
));
}
let result = node
.create_http_header_overwrite_service(
ctx,
&overwrite_http_header_address,
cmd.http_header.clone(),
)
.await;
match result {
Ok(_) => {
if let Some(pb) = pb.as_ref() {
let created_message = format!(
"Created a new HTTP Interceptor Service bound to {}\n",
color_primary(overwrite_http_header_address.to_string()),
);
pb.set_message(fmt_ok!("{}", created_message));
}
}
Err(_) => Err(miette!("Failed to create interceptor"))?,
}
route![overwrite_http_header_address]
} else {
route![]
};
if let Some(pb) = pb.as_ref() {
pb.set_message(format!(
"Creating TCP Inlet at {}...\n",
color_primary(cmd.from.to_string())
));
}
loop {
let result: Reply<InletStatus> = node
.create_inlet(
ctx,
cmd.from.hostname_port(),
&cmd.to(),
cmd.name.as_ref().expect("The `name` argument should be set to its default value if not provided"),
&cmd.authorized,
&cmd.allow,
cmd.connection_wait,
!cmd.no_connection_wait,
&cmd.secure_channel_identifier(&opts.state).await?,
cmd.udp || cmd.from.is_udp(),
cmd.no_tcp_fallback,
cmd.privileged,
&cmd.tls_certificate_provider,
cmd.skip_handshake,
cmd.enable_nagle,
prefix_route.clone(),
)
.await?;
match result {
Reply::Successful(inlet_status) => {
break inlet_status;
}
Reply::Failed(_, s) => {
if let Some(status) = s {
if status == Status::BadRequest {
Err(miette!("Bad request when creating an inlet"))?
}
};
trace!("the inlet creation returned a non-OK status: {s:?}");
if cmd.retry_wait.as_millis() == 0 {
return Err(miette!("Failed to create TCP inlet"))?;
}
if let Some(pb) = pb.as_ref() {
pb.set_message(format!(
"Waiting for TCP Inlet {} to be available... Retrying momentarily\n",
color_primary(&cmd.to)
));
}
tokio::time::sleep(cmd.retry_wait).await
}
}
}
};
let node_name = node.node_name();
cmd.add_inlet_created_event(&opts, node_name, &inlet_status)
.await?;
let created_message = format!(
"Created a new TCP Inlet in the Node {} bound to {}",
color_primary(node_name),
color_primary(inlet_status.bind_addr.to_string()),
);
let mut plain = if cmd.no_connection_wait {
fmt_ok!("{created_message}\n")
+ &fmt_info!(
"It will automatically connect to the TCP Outlet at {} as soon as it is available\n",
color_primary(&cmd.to)
)
} else if inlet_status.status == ConnectionStatus::Up {
fmt_ok!("{created_message}\n")
+ &fmt_log!(
"sending traffic to the TCP Outlet at {}\n",
color_primary(&cmd.to)
)
} else {
fmt_warn!("{created_message}\n")
+ &fmt_log!(
"but it failed to connect to the TCP Outlet at {}\n",
color_primary(&cmd.to)
)
+ &fmt_info!(
"It will automatically connect to the TCP Outlet as soon as it is available\n",
)
};
if cmd.privileged {
plain += &fmt_info!(
"This TCP Inlet is operating in {} mode\n",
color_primary_alt("privileged".to_string())
);
}
opts.terminal
.to_stdout()
.plain(plain)
.machine(inlet_status.bind_addr.to_string())
.json(serde_json::json!(&inlet_status))
.write_line()?;
Ok(())
}
}
impl CreateCommand {
pub fn to(&self) -> MultiAddr {
MultiAddr::from_str(&self.to).unwrap()
}
pub async fn secure_channel_identifier(
&self,
state: &CliState,
) -> miette::Result<Option<Identifier>> {
if let Some(identity_name) = self.identity.as_ref() {
Ok(Some(state.get_identifier_by_name(identity_name).await?))
} else {
Ok(None)
}
}
pub async fn add_inlet_created_event(
&self,
opts: &CommandGlobalOpts,
node_name: &str,
inlet: &InletStatus,
) -> miette::Result<()> {
let mut attributes = HashMap::new();
attributes.insert(TCP_INLET_AT, node_name.to_string());
attributes.insert(TCP_INLET_FROM, self.from.to_string());
attributes.insert(TCP_INLET_TO, self.to.clone());
attributes.insert(TCP_INLET_ALIAS, inlet.alias.clone());
attributes.insert(TCP_INLET_CONNECTION_STATUS, inlet.status.to_string());
attributes.insert(NODE_NAME, node_name.to_string());
Ok(opts
.state
.add_journey_event(JourneyEvent::TcpInletCreated, attributes)
.await?)
}
pub async fn parse_args(mut self, opts: &CommandGlobalOpts) -> miette::Result<Self> {
if let Some(alias) = self.alias.as_ref() {
print_warning_for_deprecated_flag_replaced(
opts,
"alias",
"the <NAME> positional argument",
)?;
if self.name.is_some() {
opts.terminal.write_line(
fmt_warn!("The <NAME> argument is being overridden by the --alias flag")
+ &fmt_log!("Consider removing the --alias flag"),
)?;
}
self.name = Some(alias.clone());
} else {
self.name = self.name.or_else(|| Some(random_name()));
}
let from = resolve_peer(self.from.hostname_port())
.await
.into_diagnostic()?;
port_is_free_guard(&from)?;
self.to = Self::parse_arg_to(&opts.state, self.to, self.via.as_ref()).await?;
if self.to().matches(0, &[proto::Project::CODE.into()]) && self.authorized.is_some() {
return Err(miette!(
"--authorized can not be used with project addresses"
))?;
}
self.tls_certificate_provider =
if let Some(tls_certificate_provider) = &self.tls_certificate_provider {
Some(tls_certificate_provider.clone())
} else if self.tls || self.from.is_tls() {
Some(MultiAddr::from_str(
"/project/default/service/tls_certificate_provider",
)?)
} else {
None
};
Ok(self)
}
pub(crate) async fn parse_arg_to(
state: &CliState,
to: impl Into<String>,
via: Option<&String>,
) -> miette::Result<String> {
let mut to = to.into();
let to_is_default = to == tcp_inlet_default_to_addr();
let mut service_name = "outlet".to_string();
let relay_name = via.cloned().unwrap_or("default".to_string());
match MultiAddr::from_str(&to) {
Ok(to) => {
if let Some(proto) = to.first() {
if proto.code() == proto::Service::CODE && to.len() == 1 {
service_name = proto
.cast::<proto::Service>()
.ok_or_else(|| Error::arg_validation("to", via, None))?
.to_string();
}
else {
if !to_is_default && via.is_some() {
return Err(Error::arg_validation(
"to",
via,
Some("'via' can't be passed if 'to' is a route"),
))?;
}
}
}
}
Err(_) => {
service_name = to.to_string();
to = tcp_inlet_default_to_addr();
}
}
if to.contains("<default_project_name>") {
let project_name = state
.projects()
.get_default_project()
.await
.map(|p| p.name().to_string())
.ok()
.ok_or(Error::arg_validation("to", via, Some("No projects found")))?;
to = to.replace("<default_project_name>", &project_name);
}
to = to.replace("<default_relay_name>", &relay_name);
to = to.replace("<default_service_name>", &service_name);
let to = MultiAddr::from_str(&to).into_diagnostic()?;
Ok(process_nodes_multiaddr(&to, state).await?.to_string())
}
}
#[cfg(test)]
mod tests {
use ockam_api::nodes::InMemoryNode;
use ockam_api::orchestrator::project::models::ProjectModel;
use ockam_api::orchestrator::project::Project;
use crate::run::parser::resource::utils::parse_cmd_from_args;
use super::*;
#[test]
fn command_can_be_parsed_from_name() {
let cmd = parse_cmd_from_args(CreateCommand::NAME, &[]);
assert!(cmd.is_ok());
}
#[ockam_macros::test]
async fn parse_arg_to(ctx: &mut Context) -> ockam_core::Result<()> {
let state = CliState::test().await.unwrap();
let node = InMemoryNode::start(ctx, &state).await.unwrap();
let node_name = node.node_name();
let node_port = state
.get_node(&node_name)
.await
.unwrap()
.tcp_listener_port()
.unwrap();
let project = Project::import(ProjectModel {
identity: Some(
Identifier::from_str(
"Ie92f183eb4c324804ef4d62962dea94cf095a265a1b2c3d4e5f6a6b5c4d3e2f1",
)
.unwrap(),
),
name: "p1".to_string(),
..Default::default()
})
.await
.unwrap();
state.projects().store_project(project).await.unwrap();
let cases = ["/alice/service", "alice/relay"];
for to in cases {
CreateCommand::parse_arg_to(&state, to, None)
.await
.expect_err("Invalid multiaddr");
}
let res = CreateCommand::parse_arg_to(&state, tcp_inlet_default_to_addr(), None)
.await
.unwrap();
assert_eq!(
res,
"/project/p1/service/forward_to_default/secure/api/service/outlet".to_string()
);
let cases = [
("/project/p2/service/forward_to_n1/secure/api/service/myoutlet", None),
("/worker/603b62d245c9119d584ba3d874eb8108/service/forward_to_n3/service/hop/service/outlet", None),
(&format!("/node/{node_name}/service/myoutlet"), Some(format!("/ip4/127.0.0.1/tcp/{node_port}/service/myoutlet"))),
];
for (to, expected) in cases {
let res = CreateCommand::parse_arg_to(&state, to, None).await.unwrap();
let expected = expected.unwrap_or(to.to_string());
assert_eq!(res, expected);
}
let res = CreateCommand::parse_arg_to(&state, "myoutlet", None)
.await
.unwrap();
assert_eq!(
res,
"/project/p1/service/forward_to_default/secure/api/service/myoutlet".to_string()
);
let cases = [
(
tcp_inlet_default_to_addr(),
"myrelay",
"/project/p1/service/forward_to_myrelay/secure/api/service/outlet",
),
(
"myoutlet".to_string(),
"myrelay",
"/project/p1/service/forward_to_myrelay/secure/api/service/myoutlet",
),
];
for (to, via, expected) in cases {
let res = CreateCommand::parse_arg_to(&state, &to, Some(&via.to_string()))
.await
.unwrap();
assert_eq!(res, expected.to_string());
}
let to = "/project/p1/service/forward_to_n1/secure/api/service/outlet";
CreateCommand::parse_arg_to(&state, to, Some(&"myrelay".to_string()))
.await
.expect_err("'via' can't be passed if 'to' is a full route");
Ok(())
}
}