use crate::node::create::DEFAULT_NODE_NAME;
use crate::node::CreateCommand;
use crate::service::config::ControlApiNodeResolution;
use crate::util::foreground_args::wait_for_exit_signal;
use crate::CommandGlobalOpts;
use miette::miette;
use miette::IntoDiagnostic;
use ockam::tcp::{TcpListenerOptions, TcpTransport};
use ockam::udp::{UdpBindArguments, UdpBindOptions, UdpTransport};
use ockam::{Address, Context};
use ockam_api::cli_state::random_name;
use ockam_api::colors::color_primary;
use ockam_api::control_api::frontend::NodeResolution;
use ockam_api::fmt_log;
use ockam_api::nodes::service::{NodeManagerTransport, SecureChannelType};
use ockam_api::nodes::InMemoryNode;
use ockam_api::nodes::{
service::{NodeManagerGeneralOptions, NodeManagerTransportOptions},
NodeManager, NodeManagerWorker, NODEMANAGER_ADDR,
};
use ockam_api::terminal::notification::NotificationHandler;
use ockam_core::LOCAL;
use ockam_multiaddr::MultiAddr;
use std::sync::Arc;
use tokio::time::{sleep, Duration};
use tracing::{debug, error, info, instrument};
impl CreateCommand {
#[instrument(skip_all, fields(node_name = self.name))]
pub(super) async fn foreground_mode(
&mut self,
ctx: &Context,
opts: CommandGlobalOpts,
) -> miette::Result<()> {
self.check_foreground_args(&opts).await?;
let node_name = self.name.clone();
debug!("creating node in foreground mode");
let trust_options = opts
.state
.retrieve_trust_options(
&self.trust_opts.project_name,
&self.trust_opts.authority_identity,
&self.trust_opts.authority_route,
&self.trust_opts.credential_scope,
)
.await
.into_diagnostic()?;
let tcp = TcpTransport::get_or_create(ctx).into_diagnostic()?;
let tcp_listener = tcp
.listen(&self.tcp_listener_address, TcpListenerOptions::new())
.await
.into_diagnostic()?;
info!("TCP listener at {}", tcp_listener.socket_address());
self.get_or_create_identity(&opts, &self.identity).await?;
let _notification_handler = if self.foreground_args.child_process {
None
} else {
Some(NotificationHandler::start(
&opts.state,
opts.terminal.clone(),
))
};
let node_info = opts
.state
.start_node_with_optional_values(&node_name, &self.identity, Some(&tcp_listener))
.await?;
debug!("node info persisted {node_info:?}");
let udp_options = if self.udp {
let udp = UdpTransport::get_or_create(ctx).into_diagnostic()?;
let options = UdpBindOptions::new();
let flow_control_id = options.flow_control_id();
udp.bind(
UdpBindArguments::new().with_bind_address(&self.udp_listener_address)?,
options,
)
.await?;
Some(NodeManagerTransport::new(flow_control_id, udp))
} else {
None
};
let in_memory_node = InMemoryNode::new(
ctx,
NodeManagerGeneralOptions::new(
opts.state.clone(),
node_name.clone(),
true,
self.status_endpoint()?,
true,
),
NodeManagerTransportOptions::new(
NodeManagerTransport::new(tcp_listener.flow_control_id().clone(), tcp),
udp_options,
),
trust_options,
)
.await
.into_diagnostic()?;
debug!("in-memory node created");
let in_memory_node = Arc::new(in_memory_node);
let node_manager_worker = NodeManagerWorker::new(in_memory_node.clone());
ctx.flow_controls()
.add_consumer(&NODEMANAGER_ADDR.into(), tcp_listener.flow_control_id());
ctx.start_worker(NODEMANAGER_ADDR, node_manager_worker)
.into_diagnostic()?;
debug!("node manager worker started");
if self
.start_services(ctx, in_memory_node.inner_clone(), &opts)
.await
.is_err()
{
sleep(Duration::from_secs(10)).await;
ctx.shutdown_node().await.into_diagnostic()?;
return Err(miette!("Failed to start services"));
}
let node_resources = in_memory_node.get_node_resources().await?;
opts.terminal
.clone()
.to_stdout()
.plain(self.plain_output(&opts, &node_name).await?)
.machine(&node_name)
.json_obj(&node_resources)?
.write_line()?;
wait_for_exit_signal(
&self.foreground_args,
&opts,
self.tcp_callback_port,
"To exit and stop the Node, please press Ctrl+C\n",
)
.await?;
let _ = opts.state.stop_node(&node_name).await;
Ok(())
}
async fn check_foreground_args(&mut self, opts: &CommandGlobalOpts) -> miette::Result<()> {
if !self.skip_is_running_check
&& opts
.state
.get_node(&self.name)
.await
.ok()
.map(|n| n.is_running())
.unwrap_or(false)
{
return Err(miette!(
"Node {} is already running",
color_primary(&self.name)
));
}
if !self.foreground_args.foreground && opts.state.is_using_in_memory_database()? {
return Err(miette!("Only foreground nodes can be created in-memory",));
}
if self.name == DEFAULT_NODE_NAME {
self.name = random_name();
}
Ok(())
}
async fn start_services(
&self,
ctx: &Context,
node_manager: Arc<NodeManager>,
opts: &CommandGlobalOpts,
) -> miette::Result<()> {
if let Some(config) = &self.services {
if let Some(startup_services) = &config.services {
if let Some(cfg) = startup_services.secure_channel_listener.as_ref() {
if !cfg.disabled {
opts.terminal
.write_line(fmt_log!("Starting secure-channel listener ..."))?;
node_manager
.create_secure_channel_listener(
Address::from((LOCAL, cfg.address.clone())),
cfg.authorized_identifiers.clone(),
cfg.identity.clone(),
ctx,
SecureChannelType::KeyExchangeAndMessages,
)
.await?;
}
}
if let Some(config) = &startup_services.control_api {
if config.frontend {
let authentication_token = match &config.authentication_token {
Some(token) => token.clone(),
None => std::env::var("OCKAM_CONTROL_API_AUTHENTICATION_TOKEN")
.map_err(|_| {
error!("Frontend Control API needs either `authentication_token` configuration or `OCKAM_CONTROL_API_AUTHENTICATION_TOKEN` environment variable to be set");
miette!("OCKAM_CONTROL_API_AUTHENTICATION_TOKEN not set")
})?,
};
opts.terminal
.write_line(fmt_log!("Starting control API Frontend..."))?;
let node_resolution = match &config.node_resolution {
ControlApiNodeResolution::Relay => {
let relay_node = if let Some(node_resolution_relay_node) =
&config.node_resolution_relay_node
{
node_resolution_relay_node.parse()?
} else if let Ok(default_project_name) =
node_manager.default_project_name().await
{
format!("/project/{default_project_name}").parse()?
} else {
MultiAddr::default()
};
NodeResolution::Relay { relay_node }
}
ControlApiNodeResolution::DirectConnection => {
NodeResolution::DirectConnection {
pattern: config.node_resolution_pattern.clone(),
port: config.connection_node_port,
}
}
};
node_manager
.create_control_api_frontend(
ctx,
config.http_bind_address,
node_resolution,
authentication_token,
Some(config.backend_policy.clone()),
)
.await?;
}
if config.backend {
opts.terminal
.write_line(fmt_log!("Starting control API Backend..."))?;
node_manager.create_control_api_backend(
ctx,
Some(config.frontend_policy.clone()),
)?;
}
}
}
}
Ok(())
}
}