use crate::node::util::initialize_default_node;
use crate::shared_args::OptionalTimeoutArg;
use crate::tcp::inlet::create::{tcp_inlet_default_from_addr, tcp_inlet_default_to_addr};
use crate::tcp::util::alias_parser;
use crate::util::parsers::duration_parser;
use crate::util::parsers::hostname_parser;
use crate::util::{port_is_free_guard, print_warning_for_deprecated_flag_replaced};
use crate::{Command, CommandGlobalOpts};
use async_trait::async_trait;
use clap::builder::FalseyValueParser;
use clap::Args;
use colorful::Colorful;
use miette::{miette, IntoDiagnostic};
use ockam::identity::Identifier;
use ockam::transport::SchemeHostnamePort;
use ockam::Context;
use ockam_abac::PolicyExpression;
use ockam_api::address::extract_address_value;
use ockam_api::cli_state::random_name;
use ockam_api::colors::color_primary;
use ockam_api::influxdb::{InfluxDBPortals, LeaseUsage};
use ockam_api::nodes::models::portal::InletStatus;
use ockam_api::nodes::BackgroundNodeClient;
use ockam_api::{fmt_info, fmt_log, fmt_ok, fmt_warn, CliState, ConnectionStatus};
use ockam_core::api::{Reply, Status};
use ockam_multiaddr::{proto, MultiAddr, Protocol};
use ockam_node::compat::asynchronous::resolve_peer;
use std::str::FromStr;
use std::time::Duration;
use tracing::trace;
#[derive(Clone, Debug, Args)]
pub struct CreateCommand {
#[arg(id = "NAME", value_parser = alias_parser)]
pub name: Option<String>,
#[arg(long, display_order = 900, id = "NODE_NAME", value_parser = extract_address_value)]
pub at: Option<String>,
#[arg(long, display_order = 900, id = "SOCKET_ADDRESS", hide_default_value = true, default_value_t = tcp_inlet_default_from_addr(), value_parser = hostname_parser)]
pub from: SchemeHostnamePort,
#[arg(long, display_order = 900, id = "ROUTE", default_value_t = tcp_inlet_default_to_addr())]
pub to: String,
#[arg(long, display_order = 900, id = "RELAY_NAME")]
pub via: Option<String>,
#[arg(long, value_name = "IDENTITY_NAME", display_order = 900)]
pub identity: Option<String>,
#[arg(long, name = "AUTHORIZED", display_order = 900)]
pub authorized: Option<Identifier>,
#[arg(long, display_order = 900, id = "ALIAS", value_parser = alias_parser)]
pub alias: Option<String>,
#[arg(
long,
visible_alias = "expression",
display_order = 900,
id = "POLICY_EXPRESSION"
)]
pub allow: Option<PolicyExpression>,
#[arg(long, display_order = 900, id = "WAIT", default_value = "5s", value_parser = duration_parser)]
pub connection_wait: Duration,
#[arg(long, display_order = 900, id = "RETRY", default_value = "20s", value_parser = duration_parser)]
pub retry_wait: Duration,
#[command(flatten)]
pub timeout: OptionalTimeoutArg,
#[arg(long, default_value = "false")]
pub no_connection_wait: bool,
#[arg(
long,
visible_alias = "enable-udp-puncture",
value_name = "BOOL",
default_value_t = false,
hide = true
)]
pub udp: bool,
#[arg(
long,
visible_alias = "disable-tcp-fallback",
value_name = "BOOL",
default_value_t = false,
hide = true
)]
pub no_tcp_fallback: bool,
#[arg(long, env = "OCKAM_PRIVILEGED", value_parser = FalseyValueParser::default(), hide = true)]
pub privileged: bool,
#[arg(long, value_name = "BOOL", default_value_t = false, hide = true)]
pub tls: bool,
#[arg(long, value_name = "ROUTE", hide = true)]
pub tls_certificate_provider: Option<MultiAddr>,
#[arg(long, default_value = "per-client")]
pub leased_token_strategy: LeaseUsage,
#[arg(long, value_name = "ROUTE")]
pub lease_manager_route: Option<MultiAddr>,
}
#[async_trait]
impl Command for CreateCommand {
const NAME: &'static str = "influxdb-inlet create";
async fn run(mut self, ctx: &Context, opts: CommandGlobalOpts) -> crate::Result<()> {
initialize_default_node(ctx, &opts).await?;
let cmd = self.parse_args(&opts).await?;
let mut node = BackgroundNodeClient::create(ctx, &opts.state, &cmd.at).await?;
cmd.timeout.timeout.map(|t| node.set_timeout_mut(t));
let inlet_status = {
let pb = opts.terminal.spinner();
if let Some(pb) = pb.as_ref() {
pb.set_message(format!(
"Creating a InfluxDB Inlet at {}...\n",
color_primary(&cmd.from)
));
}
loop {
let result: Reply<InletStatus> = node
.create_influxdb_inlet(
ctx,
cmd.from.hostname_port(),
&cmd.to(),
cmd.name.as_ref().expect("The `name` argument should be set to its default value if not provided"),
&cmd.authorized,
&cmd.allow,
cmd.connection_wait,
!cmd.no_connection_wait,
&cmd
.secure_channel_identifier(&opts.state)
.await?,
cmd.udp || cmd.from.is_udp(),
cmd.no_tcp_fallback,
&cmd.tls_certificate_provider,
cmd.leased_token_strategy.clone(),
cmd.lease_manager_route.clone(),
)
.await?;
match result {
Reply::Successful(inlet_status) => {
break inlet_status;
}
Reply::Failed(_, s) => {
if let Some(status) = s {
if status == Status::BadRequest {
Err(miette!("Bad request when creating an inlet"))?
}
};
trace!("the inlet creation returned a non-OK status: {s:?}");
if cmd.retry_wait.as_millis() == 0 {
return Err(miette!("Failed to create InfluxDB inlet"))?;
}
if let Some(pb) = pb.as_ref() {
pb.set_message(format!(
"Waiting for InfluxDB Inlet {} to be available... Retrying momentarily\n",
color_primary(&cmd.to)
));
}
tokio::time::sleep(cmd.retry_wait).await
}
}
}
};
let node_name = node.node_name();
let created_message = format!(
"Created a new InfluxDB Inlet in the Node {} bound to {}",
color_primary(node_name),
color_primary(&inlet_status.bind_addr),
);
let plain = if cmd.no_connection_wait {
fmt_ok!("{created_message}\n")
+ &fmt_log!("It will automatically connect to the InfluxDB Outlet at {} as soon as it is available\n",
color_primary(&cmd.to)
)
} else if inlet_status.status == ConnectionStatus::Up {
fmt_ok!("{created_message}\n")
+ &fmt_log!(
"sending traffic to the TCP Outlet at {}\n",
color_primary(&cmd.to)
)
} else {
fmt_warn!("{created_message}\n")
+ &fmt_log!(
"but failed to connect to the TCP Outlet at {}\n",
color_primary(&cmd.to)
) + &fmt_info!("It will automatically connect to the InfluxDB Outlet as soon as it is available\n")
};
opts.terminal
.to_stdout()
.plain(plain)
.machine(inlet_status.bind_addr.to_string())
.json_obj(&inlet_status)?
.write_line()?;
Ok(())
}
}
impl CreateCommand {
async fn parse_args(mut self, opts: &CommandGlobalOpts) -> miette::Result<Self> {
if let Some(alias) = self.alias.as_ref() {
print_warning_for_deprecated_flag_replaced(
opts,
"alias",
"the <NAME> positional argument",
)?;
if self.name.is_some() {
opts.terminal.write_line(
fmt_warn!("The <NAME> argument is being overridden by the --alias flag")
+ &fmt_log!("Consider removing the --alias flag"),
)?;
}
self.name = Some(alias.clone());
} else {
self.name = self.name.or_else(|| Some(random_name()));
}
let from = resolve_peer(self.from.hostname_port())
.await
.into_diagnostic()?;
port_is_free_guard(&from)?;
self.to = crate::tcp::inlet::create::CreateCommand::parse_arg_to(
&opts.state,
self.to,
self.via.as_ref(),
)
.await?;
if self.to().matches(0, &[proto::Project::CODE.into()]) && self.authorized.is_some() {
return Err(miette!(
"--authorized can not be used with project addresses"
))?;
}
self.tls_certificate_provider =
if let Some(tls_certificate_provider) = &self.tls_certificate_provider {
Some(tls_certificate_provider.clone())
} else if self.tls || self.from.is_tls() {
Some(MultiAddr::from_str(
"/project/default/service/tls_certificate_provider",
)?)
} else {
None
};
if self
.lease_manager_route
.as_ref()
.is_some_and(|_| self.leased_token_strategy == LeaseUsage::Shared)
{
Err(miette!(
"lease-manager-route argument requires leased-token-strategy=per-client"
))?
};
Ok(self)
}
pub fn to(&self) -> MultiAddr {
MultiAddr::from_str(&self.to).unwrap()
}
pub async fn secure_channel_identifier(
&self,
state: &CliState,
) -> miette::Result<Option<Identifier>> {
if let Some(identity_name) = self.identity.as_ref() {
Ok(Some(state.get_identifier_by_name(identity_name).await?))
} else {
Ok(None)
}
}
}