use colorful::Colorful;
use minicbor::{CborLen, Decode, Encode};
use nix::errno::Errno;
use nix::sys::signal;
use ockam::identity::utils::now;
use ockam::identity::Identifier;
use ockam::tcp::TcpListener;
use ockam_core::errcode::{Kind, Origin};
use ockam_core::Error;
use ockam_multiaddr::proto::{DnsAddr, Node, Tcp};
use ockam_multiaddr::MultiAddr;
use serde::Serialize;
use std::fmt::{Display, Formatter};
use std::path::PathBuf;
use std::process;
use std::time::Duration;
use sysinfo::{Pid, ProcessStatus, ProcessesToUpdate, System};
use crate::cli_state::{random_name, NamedVault, Result};
use crate::cli_state::{CliState, CliStateError};
use crate::colors::color_primary;
use crate::config::lookup::InternetAddress;
use crate::{fmt_warn, ConnectionStatus};
impl CliState {
#[instrument(skip_all, fields(node_name = node_name, identity_name = identity_name.clone()))]
pub async fn start_node_with_optional_values(
&self,
node_name: &str,
identity_name: &Option<String>,
tcp_listener: Option<&TcpListener>,
) -> Result<NodeInfo> {
let mut node = self
.create_node_with_optional_identity(node_name, identity_name)
.await?;
if node.pid.is_none() {
let pid = process::id();
self.set_node_pid(node_name, pid).await?;
node = node.set_pid(pid);
}
if let Some(tcp_listener) = tcp_listener {
let address = (*tcp_listener.socket_address()).into();
self.set_tcp_listener_address(&node.name(), &address)
.await?;
node = node.set_tcp_listener_address(address)
}
Ok(node)
}
#[instrument(skip_all, fields(node_name = node_name, identity_name = identity_name.clone()))]
pub async fn create_node_with_optional_identity(
&self,
node_name: &str,
identity_name: &Option<String>,
) -> Result<NodeInfo> {
let identity = match identity_name {
Some(name) => self.get_named_identity(name).await?,
None => self.get_or_create_default_named_identity().await?,
};
let node = self
.create_node_with_identifier(node_name, &identity.identifier())
.await?;
Ok(node)
}
#[instrument(skip_all, fields(node_name = node_name))]
pub async fn create_node(&self, node_name: &str) -> Result<NodeInfo> {
let identity = self.create_identity_with_name(&random_name()).await?;
self.create_node_with_identifier(node_name, &identity.identifier())
.await
}
pub fn backup_logs(&self, node_name: &str) -> Result<()> {
let node_dir = self.node_dir(node_name)?;
let now = now()?;
let backup_dir = Self::backup_default_dir()?.join(format!("{}-{node_name}", now.0));
std::fs::create_dir_all(&backup_dir)?;
info!("Backing up logs for {node_name} from {node_dir:?} to {backup_dir:?}");
for entry in std::fs::read_dir(node_dir)? {
let entry = entry?;
let from = entry.path();
let to = backup_dir.join(entry.file_name());
std::fs::copy(from, to)?;
}
info!("Logs for {node_name} were backed up to {backup_dir:?}");
Ok(())
}
#[instrument(skip_all, fields(node_name = node_name))]
pub async fn delete_node(&self, node_name: &str) -> Result<()> {
self.stop_node(node_name).await?;
self.remove_node(node_name).await?;
Ok(())
}
#[instrument(skip_all)]
pub async fn delete_all_nodes(&self) -> Result<()> {
let nodes = self.nodes_repository().get_nodes().await?;
for node in nodes {
if let Err(err) = self.delete_node(&node.name()).await {
self.notify_message(fmt_warn!(
"Failed to delete the node {}: {err}",
color_primary(node.name())
));
}
}
Ok(())
}
#[instrument(skip_all, fields(node_name = node_name))]
pub async fn remove_node(&self, node_name: &str) -> Result<()> {
let repository = self.nodes_repository();
let node_exists = repository.get_node(node_name).await.is_ok();
if node_exists {
repository.delete_node(node_name).await?;
let other_nodes = repository.get_nodes().await?;
if let Some(other_node) = other_nodes.first() {
repository.set_default_node(&other_node.name()).await?;
}
}
let _ = std::fs::remove_dir_all(self.node_dir(node_name)?);
debug!(name=%node_name, "node deleted");
Ok(())
}
#[instrument(skip_all, fields(node_name = node_name))]
pub async fn stop_node(&self, node_name: &str) -> Result<()> {
debug!(name=%node_name, "stopping node...");
let node = self.get_node(node_name).await?;
if let Some(pid) = node.pid() {
if pid == process::id() {
debug!(name=%node_name, "node is the current process, skipping sending kill signal");
self.nodes_repository().set_no_node_pid(node_name).await?;
return Ok(());
}
if let Err(e) = self
.kill_node_process(&node, pid, signal::Signal::SIGTERM)
.await
{
warn!(name=%node_name, %pid, %e, "failed to stop node process with SIGTERM");
if let Err(e) = self
.kill_node_process(&node, pid, signal::Signal::SIGKILL)
.await
{
error!(name=%node_name, %pid, %e, "failed to stop node process with SIGKILL");
return Err(e);
} else {
self.notify_progress_finish(format!(
"The node {} has been stopped",
color_primary(node_name),
));
}
}
}
self.nodes_repository().set_no_node_pid(node_name).await?;
debug!(name=%node_name, "node stopped");
Ok(())
}
async fn kill_node_process(
&self,
node: &NodeInfo,
pid: u32,
signal: signal::Signal,
) -> Result<()> {
debug!(%pid, %signal, "sending kill signals to node's process");
let node_name = &node.name;
let pid = nix::unistd::Pid::from_raw(pid as i32);
let _ = self.send_kill_signal(pid, signal);
let timeout = Duration::from_millis(100);
tokio::time::sleep(timeout).await;
let max_attempts = Duration::from_secs(5).as_millis() / timeout.as_millis();
let show_message_at_attempt = Duration::from_secs(2).as_millis() / timeout.as_millis();
let mut attempts = 0;
while let NodeProcessStatus::Running(_) = node.status() {
match self.send_kill_signal(pid, signal) {
Ok(()) => break,
Err(err) => {
if attempts > max_attempts {
warn!(name = %node_name, %pid, %signal, "node process did not exit");
self.notify_progress_finish_and_clear();
return Err(err);
}
if attempts == show_message_at_attempt {
self.notify_progress(format!(
"Waiting for node's {} process {} to stop",
color_primary(node_name),
color_primary(pid)
));
}
attempts += 1;
tokio::time::sleep(timeout).await;
}
}
}
self.notify_progress_finish_and_clear();
Ok(())
}
fn send_kill_signal(&self, pid: nix::unistd::Pid, signal: signal::Signal) -> Result<()> {
match signal::kill(pid, signal) {
Ok(_) => Err(CliStateError::Other(
"kill signal sent, process might still be alive".into(),
)),
Err(err) => {
let base_error = CliStateError::Io(std::io::Error::new(
std::io::ErrorKind::Other,
format!("failed to stop PID {pid} with error {err}"),
));
match err {
Errno::ESRCH => Ok(()),
Errno::EINVAL => Err(base_error),
Errno::EPERM => Err(base_error),
_ => Err(base_error),
}
}
}
}
#[instrument(skip_all, fields(node_name = node_name))]
pub async fn set_default_node(&self, node_name: &str) -> Result<()> {
Ok(self.nodes_repository().set_default_node(node_name).await?)
}
#[instrument(skip_all, fields(node_name = node_name, address = %address))]
pub async fn set_tcp_listener_address(
&self,
node_name: &str,
address: &InternetAddress,
) -> Result<()> {
self.nodes_repository()
.set_tcp_listener_address(node_name, address)
.await?;
Ok(())
}
#[instrument(skip_all, fields(node_name = node_name, address = %address))]
pub async fn set_node_http_server_addr(
&self,
node_name: &str,
address: &InternetAddress,
) -> Result<()> {
Ok(self
.nodes_repository()
.set_status_endpoint_address(node_name, address)
.await?)
}
#[instrument(skip_all, fields(node_name = node_name))]
pub async fn set_as_authority_node(&self, node_name: &str) -> Result<()> {
Ok(self
.nodes_repository()
.set_as_authority_node(node_name)
.await?)
}
#[instrument(skip_all, fields(node_name = node_name, pid = %pid))]
pub async fn set_node_pid(&self, node_name: &str, pid: u32) -> Result<()> {
Ok(self.nodes_repository().set_node_pid(node_name, pid).await?)
}
}
impl CliState {
#[instrument(skip_all, fields(node_name = node_name))]
pub async fn get_node(&self, node_name: &str) -> Result<NodeInfo> {
if let Some(node) = self.nodes_repository().get_node(node_name).await? {
Ok(node)
} else {
Err(CliStateError::ResourceNotFound {
resource: "node".to_string(),
name: node_name.to_string(),
})?
}
}
#[instrument(skip_all)]
pub async fn get_nodes(&self) -> Result<Vec<NodeInfo>> {
Ok(self.nodes_repository().get_nodes().await?)
}
#[instrument(skip_all)]
pub async fn get_default_node(&self) -> Result<NodeInfo> {
Ok(self
.nodes_repository()
.get_default_node()
.await?
.ok_or_else(|| Error::new(Origin::Api, Kind::NotFound, "There is no default node"))?)
}
#[instrument(skip_all, fields(node_name = node_name.clone()))]
pub async fn get_node_or_default(&self, node_name: &Option<String>) -> Result<NodeInfo> {
match node_name {
Some(name) => self.get_node(name).await,
None => self.get_default_node().await,
}
}
#[instrument(skip_all, fields(node_name = node_name))]
pub fn stdout_logs(&self, node_name: &str) -> Result<PathBuf> {
let node_dir = self.create_node_dir(node_name)?;
let current_log_file = std::fs::read_dir(node_dir)?
.flatten()
.filter(|entry| {
if let (Some(name), Ok(metadata)) = (entry.file_name().to_str(), entry.metadata()) {
name.contains("stdout") && metadata.is_file()
} else {
false
}
})
.max_by_key(|file| file.metadata().unwrap().modified().unwrap())
.ok_or_else(|| {
Error::new(
Origin::Api,
Kind::NotFound,
format!("there is no log file for the node {node_name}"),
)
})?;
Ok(current_log_file.path())
}
}
impl CliState {
#[instrument(skip_all, fields(node_name = node_name, identifier = %identifier))]
pub async fn create_node_with_identifier(
&self,
node_name: &str,
identifier: &Identifier,
) -> Result<NodeInfo> {
let repository = self.nodes_repository();
let mut is_default = repository.is_default_node(node_name).await?
|| repository.get_nodes().await?.is_empty();
if let Some(node) = repository.get_default_node().await? {
if node.pid.is_none() {
is_default = true;
}
}
let tcp_listener_address = repository.get_tcp_listener_address(node_name).await?;
let status_endpoint_address = repository.get_status_endpoint_address(node_name).await?;
let node_info = NodeInfo::new(
node_name.to_string(),
identifier.clone(),
0,
is_default,
false,
tcp_listener_address,
Some(process::id()),
status_endpoint_address,
);
repository.store_node(&node_info).await?;
Ok(node_info)
}
#[instrument(skip_all, fields(identity_name = identity_name))]
pub(super) async fn get_nodes_by_identity_name(
&self,
identity_name: &str,
) -> Result<Vec<NodeInfo>> {
let identifier = self.get_identifier_by_name(identity_name).await?;
Ok(self
.nodes_repository()
.get_nodes_by_identifier(&identifier)
.await?)
}
#[instrument(skip_all, fields(node_name = node_name))]
pub(super) async fn get_node_vault(&self, node_name: &str) -> Result<NamedVault> {
let identifier = self.get_node(node_name).await?.identifier();
let identity = self.get_named_identity_by_identifier(&identifier).await?;
self.get_named_vault(&identity.vault_name()).await
}
fn create_node_dir(&self, node_name: &str) -> Result<PathBuf> {
let path = self.node_dir(node_name)?;
std::fs::create_dir_all(&path)?;
Ok(path)
}
pub fn default_node_dir(node_name: &str) -> Result<PathBuf> {
Ok(Self::make_node_dir_path(
&CliState::default_dir()?,
node_name,
))
}
pub fn node_dir(&self, node_name: &str) -> Result<PathBuf> {
Ok(Self::make_node_dir_path(self.dir()?, node_name))
}
pub fn command_log_path(command_name: &str) -> Result<PathBuf> {
Ok(Self::make_command_log_path(
&CliState::default_dir()?,
command_name,
))
}
}
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Encode, Decode, CborLen)]
#[serde(rename_all = "lowercase", tag = "status", content = "pid")]
pub enum NodeProcessStatus {
#[n(0)]
Running(#[n(0)] u32),
#[n(1)]
Zombie(#[n(0)] u32),
#[n(2)]
Stopped,
}
impl NodeProcessStatus {
pub fn is_running(&self) -> bool {
matches!(self, NodeProcessStatus::Running(_))
}
}
impl Display for NodeProcessStatus {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let status = match self {
NodeProcessStatus::Running(_) => ConnectionStatus::Up,
NodeProcessStatus::Zombie(_) => ConnectionStatus::Down,
NodeProcessStatus::Stopped => ConnectionStatus::Down,
};
let pid = match self {
NodeProcessStatus::Running(pid) => Some(pid),
NodeProcessStatus::Zombie(pid) => Some(pid),
NodeProcessStatus::Stopped => None,
};
write!(f, "The node is {status}")?;
if let Some(pid) = pid {
write!(f, ", with PID {pid}")?;
}
Ok(())
}
}
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct NodeInfo {
name: String,
identifier: Identifier,
verbosity: u8,
is_default: bool,
is_authority: bool,
tcp_listener_address: Option<InternetAddress>,
pid: Option<u32>,
status_endpoint_address: Option<InternetAddress>,
}
impl NodeInfo {
#[allow(clippy::too_many_arguments)]
pub fn new(
name: String,
identifier: Identifier,
verbosity: u8,
is_default: bool,
is_authority: bool,
tcp_listener_address: Option<InternetAddress>,
pid: Option<u32>,
status_endpoint_address: Option<InternetAddress>,
) -> Self {
Self {
name,
identifier,
verbosity,
is_default,
is_authority,
tcp_listener_address,
pid,
status_endpoint_address,
}
}
pub fn name(&self) -> String {
self.name.clone()
}
pub fn identifier(&self) -> Identifier {
self.identifier.clone()
}
pub fn verbosity(&self) -> u8 {
self.verbosity
}
pub fn is_default(&self) -> bool {
self.is_default
}
pub fn set_as_default(&self) -> Self {
let mut result = self.clone();
result.is_default = true;
result
}
pub fn is_authority_node(&self) -> bool {
self.is_authority
}
pub fn tcp_listener_port(&self) -> Option<u16> {
self.tcp_listener_address.as_ref().map(|t| t.port())
}
pub fn tcp_listener_address(&self) -> Option<InternetAddress> {
self.tcp_listener_address.clone()
}
pub fn tcp_connect_address(&self) -> Option<InternetAddress> {
if let Some(tcp_listener_address) = &self.tcp_listener_address {
match tcp_listener_address {
InternetAddress::Dns(_, _) => self.tcp_listener_address.clone(),
InternetAddress::V4(address) => {
if address.ip().is_unspecified() {
Some(InternetAddress::V4(std::net::SocketAddrV4::new(
std::net::Ipv4Addr::LOCALHOST,
address.port(),
)))
} else {
self.tcp_listener_address.clone()
}
}
InternetAddress::V6(address) => {
if address.ip().is_unspecified() {
Some(InternetAddress::V6(std::net::SocketAddrV6::new(
std::net::Ipv6Addr::LOCALHOST,
address.port(),
address.flowinfo(),
address.scope_id(),
)))
} else {
self.tcp_listener_address.clone()
}
}
}
} else {
None
}
}
pub fn tcp_listener_multi_address(&self) -> Result<MultiAddr> {
Ok(self
.tcp_listener_address
.as_ref()
.ok_or_else(|| {
ockam::Error::new(
Origin::Api,
Kind::Internal,
"no transport has been set on the node".to_string(),
)
})
.and_then(|t| t.multi_addr())?)
}
pub fn status_endpoint_address(&self) -> Option<InternetAddress> {
self.status_endpoint_address.clone()
}
pub fn pid(&self) -> Option<u32> {
self.pid
}
pub fn set_pid(&self, pid: u32) -> NodeInfo {
let mut result = self.clone();
result.pid = Some(pid);
result
}
pub fn set_tcp_listener_address(&self, address: InternetAddress) -> NodeInfo {
let mut result = self.clone();
result.tcp_listener_address = Some(address);
result
}
pub fn is_running(&self) -> bool {
matches!(self.status(), NodeProcessStatus::Running(_))
}
pub fn status(&self) -> NodeProcessStatus {
if let Some(pid) = self.pid() {
let mut sys = System::new();
sys.refresh_processes(ProcessesToUpdate::Some(&[Pid::from_u32(pid)]), false);
if let Some(p) = sys.process(Pid::from_u32(pid)) {
if matches!(p.status(), ProcessStatus::Dead | ProcessStatus::Zombie) {
NodeProcessStatus::Zombie(pid)
} else {
NodeProcessStatus::Running(pid)
}
} else {
NodeProcessStatus::Stopped
}
} else {
NodeProcessStatus::Stopped
}
}
pub fn route(&self) -> Result<MultiAddr> {
let mut m = MultiAddr::default();
m.push_back(Node::new(&self.name))?;
Ok(m)
}
pub fn verbose_route(&self) -> Result<Option<MultiAddr>> {
if let Some(port) = self.tcp_listener_port() {
let mut m = MultiAddr::default();
m.push_back(DnsAddr::new("localhost"))?;
m.push_back(Tcp::new(port))?;
Ok(Some(m))
} else {
Ok(None)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::lookup::InternetAddress;
use std::net::SocketAddr;
use std::str::FromStr;
#[tokio::test]
async fn test_create_node() -> Result<()> {
let cli = CliState::test().await?;
let node_name = "node-1";
let result = cli.create_node(node_name).await?;
assert_eq!(result.name(), node_name.to_string());
let result = cli.get_default_node().await?.name();
assert_eq!(result, node_name.to_string());
let result = cli.get_or_create_default_named_vault().await.ok();
assert!(result.is_some());
let result = cli.get_or_create_default_named_identity().await.ok();
assert!(result.is_some());
let identifier = result.unwrap().identifier();
let result = cli.get_node(node_name).await?.identifier();
assert_eq!(result, identifier);
Ok(())
}
#[tokio::test]
async fn test_update_node() -> Result<()> {
let cli = CliState::test().await?;
let node_name = "node-1";
let _ = cli.create_node(node_name).await?;
cli.set_tcp_listener_address(
node_name,
&SocketAddr::from_str("127.0.0.1:0").unwrap().into(),
)
.await?;
let _ = cli.create_node(node_name).await?;
let result = cli.get_default_node().await?;
assert_eq!(result.name(), node_name.to_string());
assert!(result.is_default());
assert_eq!(
result.tcp_listener_address(),
InternetAddress::new("127.0.0.1:0")
);
Ok(())
}
#[tokio::test]
async fn test_remove_node() -> Result<()> {
let cli = CliState::test().await?;
let node1 = "node-1";
let node_info1 = cli.create_node(node1).await?;
let result = cli.get_default_node().await?;
assert_eq!(result, node_info1);
let node2 = "node-2";
let node_info2 = cli.create_node(node2).await?;
cli.remove_node(node1).await?;
let result = cli.get_node(node1).await.ok();
assert_eq!(
result, None,
"the node information is not available anymore"
);
assert!(
!cli.node_dir(node1).unwrap().exists(),
"the node directory must be deleted"
);
let result = cli.get_default_node().await?;
assert_eq!(result, node_info2.set_as_default());
Ok(())
}
#[tokio::test]
async fn test_create_node_with_optional_identity() -> Result<()> {
let cli = CliState::test().await?;
let node = cli
.create_node_with_optional_identity("node-1", &None)
.await?;
let result = cli.get_node(&node.name()).await?;
assert_eq!(result.name(), node.name());
let identity = cli.create_identity_with_name("name").await?;
let node = cli
.create_node_with_optional_identity("node-2", &Some(identity.name()))
.await?;
let result = cli.get_node(&node.name()).await?;
assert_eq!(result.identifier(), identity.identifier());
Ok(())
}
}