use crate::node::CreateCommand;
use crate::run::parser::building_blocks::ArgValue;
use crate::run::parser::config::ConfigParser;
use crate::run::parser::resource::*;
use crate::run::parser::Version;
use crate::value_parsers::{async_parse_path_or_url, parse_enrollment_ticket, parse_key_val};
use crate::CommandGlobalOpts;
use clap::Args;
use miette::miette;
use ockam_api::cli_state::journeys::APPLICATION_EVENT_COMMAND_CONFIGURATION_FILE;
use ockam_api::cli_state::{random_name, EnrollmentTicket};
use ockam_node::Context;
use serde::{Deserialize, Serialize};
use tracing::{instrument, Span};
#[derive(Clone, Debug, Args, Default)]
pub struct ConfigArgs {
#[arg(long, value_name = "YAML")]
pub node_config: Option<String>,
#[arg(long, value_name = "ENROLLMENT TICKET", value_parser = parse_enrollment_ticket)]
pub enrollment_ticket: Option<EnrollmentTicket>,
#[arg(long = "variable", value_name = "VARIABLE", value_parser = parse_key_val::<String, String>)]
pub variables: Vec<(String, String)>,
}
impl CreateCommand {
#[instrument(skip_all)]
pub async fn run_config(self, ctx: &Context, opts: CommandGlobalOpts) -> miette::Result<()> {
let mut node_config = self.get_node_config().await?;
node_config.merge(&self)?;
node_config.run(ctx, &opts).await?;
if self.foreground_args.foreground {
self.wait_for_exit_signal(ctx, opts).await?;
}
Ok(())
}
#[instrument(skip_all, fields(app.event.command.configuration_file))]
pub async fn get_node_config(&self) -> miette::Result<NodeConfig> {
let contents = match self.config_args.node_config.clone() {
Some(contents) => contents,
None => async_parse_path_or_url(&self.name).await?,
};
for (key, value) in &self.config_args.variables {
if value.is_empty() {
return Err(miette!("Empty value for variable '{key}'"));
}
std::env::set_var(key, value);
}
let node_config = NodeConfig::new(&contents)?;
Span::current().record(
APPLICATION_EVENT_COMMAND_CONFIGURATION_FILE.as_str(),
&contents.to_string(),
);
Ok(node_config)
}
}
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
pub struct NodeConfig {
#[serde(flatten)]
pub version: Version,
#[serde(flatten)]
pub project_enroll: ProjectEnroll,
#[serde(flatten)]
pub node: Node,
#[serde(flatten)]
pub policies: Policies,
#[serde(flatten)]
pub tcp_outlets: TcpOutlets,
#[serde(flatten)]
pub tcp_inlets: TcpInlets,
#[serde(flatten)]
pub kafka_inlet: KafkaInlet,
#[serde(flatten)]
pub kafka_outlet: KafkaOutlet,
#[serde(flatten)]
pub relays: Relays,
}
impl ConfigParser<'_> for NodeConfig {}
impl NodeConfig {
fn new(contents: &str) -> miette::Result<Self> {
Self::parse(&Self::resolve(contents)?)
}
fn merge(&mut self, cli_args: &CreateCommand) -> miette::Result<()> {
for (key, value) in &cli_args.config_args.variables {
if value.is_empty() {
return Err(miette!("Empty value for variable '{key}'"));
}
std::env::set_var(key, value);
}
if self.node.name.is_none() {
self.node.name = Some(ArgValue::String(random_name()));
}
if let Some(ticket) = &cli_args.config_args.enrollment_ticket {
self.project_enroll.ticket = Some(ticket.hex_encoded()?);
}
if self.node.skip_is_running_check.is_none() {
self.node.skip_is_running_check = Some(ArgValue::Bool(cli_args.skip_is_running_check));
}
if self.node.exit_on_eof.is_none() {
self.node.exit_on_eof = Some(ArgValue::Bool(cli_args.foreground_args.exit_on_eof));
}
if self.node.tcp_listener_address.is_none() {
self.node.tcp_listener_address =
Some(ArgValue::String(cli_args.tcp_listener_address.clone()));
}
if self.node.identity.is_none() {
self.node.identity = cli_args.identity.clone().map(ArgValue::String);
}
if self.node.project.is_none() {
self.node.project = cli_args
.trust_opts
.project_name
.clone()
.map(ArgValue::String);
}
Ok(())
}
pub async fn run(self, ctx: &Context, opts: &CommandGlobalOpts) -> miette::Result<()> {
for section in self.parse_commands()? {
section.run(ctx, opts).await?
}
Ok(())
}
pub fn parse_commands(self) -> miette::Result<Vec<ParsedCommands>> {
let node_name = self.node.name();
Ok(vec![
self.project_enroll.parse_commands()?.into(),
self.node.parse_commands()?.into(),
self.relays.parse_commands(&node_name)?.into(),
self.policies.parse_commands()?.into(),
self.tcp_outlets.parse_commands(&node_name)?.into(),
self.tcp_inlets.parse_commands(&node_name)?.into(),
self.kafka_inlet.parse_commands(&node_name)?.into(),
self.kafka_outlet.parse_commands(&node_name)?.into(),
])
}
}
#[cfg(test)]
mod tests {
use ockam_api::authenticator::one_time_code::OneTimeCode;
use ockam_api::cli_state::EnrollmentTicket;
use super::*;
#[test]
fn parse_demo_config_files() {
let demo_files_dir = std::env::current_dir()
.unwrap()
.join("src")
.join("node")
.join("create")
.join("demo_config_files");
let files = std::fs::read_dir(demo_files_dir).unwrap();
for file in files {
let file = file.unwrap();
let path = file.path();
let contents = std::fs::read_to_string(&path).unwrap();
let res = NodeConfig::parse(&contents);
res.unwrap();
}
}
#[test]
fn merge_config_with_cli() {
let enrollment_ticket = EnrollmentTicket::new(OneTimeCode::new(), None);
let enrollment_ticket_hex = enrollment_ticket.hex_encoded().unwrap();
std::env::set_var("ENROLLMENT_TICKET", &enrollment_ticket_hex);
let cli_args = CreateCommand {
tcp_listener_address: "127.0.0.1:1234".to_string(),
config_args: ConfigArgs {
enrollment_ticket: Some(enrollment_ticket.clone()),
..Default::default()
},
..Default::default()
};
let mut config = NodeConfig::parse("").unwrap();
config.merge(&cli_args).unwrap();
let node = config.node.parse_commands().unwrap().pop().unwrap();
assert_eq!(node.tcp_listener_address, "127.0.0.1:1234");
assert_eq!(
config.project_enroll.ticket,
Some(enrollment_ticket_hex.clone())
);
let mut config = NodeConfig::parse(
r#"
ticket: $ENROLLMENT_TICKET
name: n1
tcp-listener-address: 127.0.0.1:5555
"#,
)
.unwrap();
config.merge(&cli_args).unwrap();
let node = config.node.parse_commands().unwrap().pop().unwrap();
assert_eq!(node.name, "n1");
assert_eq!(node.tcp_listener_address, "127.0.0.1:5555".to_string());
assert_eq!(
config.project_enroll.ticket,
Some(enrollment_ticket_hex.clone())
);
}
}