use std::ops::Deref;
use std::time::Duration;
use futures::executor;
use miette::IntoDiagnostic;
use ockam::identity::models::ChangeHistory;
use ockam::identity::{Identifier, SecureChannels};
use ockam::tcp::{TcpListenerOptions, TcpTransport};
use ockam::{Context, Result};
use ockam_core::compat::{string::String, sync::Arc};
use ockam_core::errcode::Kind;
use ockam_multiaddr::MultiAddr;
use crate::cli_state::random_name;
use crate::cli_state::CliState;
use crate::nodes::models::transport::BindAddress;
use crate::nodes::service::default_address::DefaultAddress;
use crate::nodes::service::{
NodeManagerGeneralOptions, NodeManagerTransportOptions, NodeManagerTrustOptions,
};
use crate::nodes::{NodeManager, NODEMANAGER_ADDR};
use crate::orchestrator::project::Project;
use crate::orchestrator::{
AuthorityNodeClient, ControllerClient, CredentialsEnabled, ProjectNodeClient,
};
pub struct InMemoryNode {
pub(crate) node_manager: Arc<NodeManager>,
persistent: bool,
timeout: Option<Duration>,
}
impl Deref for InMemoryNode {
type Target = Arc<NodeManager>;
fn deref(&self) -> &Self::Target {
&self.node_manager
}
}
impl Drop for InMemoryNode {
fn drop(&mut self) {
if !self.persistent {
executor::block_on(async {
let result = self.cli_state.remove_node(&self.node_name).await;
if let Err(err) = result {
if !err.to_string().contains("code: 1032") {
error!("Cannot delete the node {}: {err:?}", self.node_name);
}
}
});
}
}
}
impl InMemoryNode {
pub fn inner_clone(&self) -> Arc<NodeManager> {
self.node_manager.clone()
}
pub async fn start(ctx: &Context, cli_state: &CliState) -> miette::Result<Self> {
Self::start_with_project_name(ctx, cli_state, None).await
}
pub async fn start_with_project_name(
ctx: &Context,
cli_state: &CliState,
project_name: Option<String>,
) -> miette::Result<Self> {
let default_identity_name = cli_state
.get_or_create_default_named_identity()
.await?
.name();
Self::start_node(
ctx,
cli_state,
&default_identity_name,
None,
project_name,
None,
None,
)
.await
}
pub async fn start_with_identity_and_project_name(
ctx: &Context,
cli_state: &CliState,
identity: Option<String>,
project_name: Option<String>,
) -> miette::Result<Self> {
let identity = cli_state.get_identity_name_or_default(&identity).await?;
Self::start_node(ctx, cli_state, &identity, None, project_name, None, None).await
}
pub async fn start_with_identity(
ctx: &Context,
cli_state: &CliState,
identity: Option<String>,
) -> miette::Result<InMemoryNode> {
let identity = cli_state.get_identity_name_or_default(&identity).await?;
Self::start_node(ctx, cli_state, &identity, None, None, None, None).await
}
#[instrument(name = "start in-memory node", skip_all)]
pub async fn start_node(
ctx: &Context,
cli_state: &CliState,
identity_name: &str,
status_endpoint: Option<BindAddress>,
project_name: Option<String>,
authority_identity: Option<ChangeHistory>,
authority_route: Option<MultiAddr>,
) -> miette::Result<InMemoryNode> {
let defaults = NodeManagerDefaults::default();
let tcp = TcpTransport::get_or_create(ctx).into_diagnostic()?;
let tcp_listener = tcp
.listen(
defaults.tcp_listener_address.as_str(),
TcpListenerOptions::new(),
)
.await
.into_diagnostic()?;
let node = cli_state
.start_node_with_optional_values(
&defaults.node_name,
&Some(identity_name.to_string()),
Some(&tcp_listener),
)
.await
.into_diagnostic()?;
let trust_options = cli_state
.retrieve_trust_options(&project_name, &authority_identity, &authority_route, &None)
.await
.into_diagnostic()?;
let node_manager = Self::new(
ctx,
NodeManagerGeneralOptions::new(
cli_state.clone(),
node.name(),
false,
status_endpoint,
false,
),
NodeManagerTransportOptions::new_tcp(tcp_listener.flow_control_id().clone(), tcp),
trust_options,
)
.await
.into_diagnostic()?;
ctx.flow_controls()
.add_consumer(&NODEMANAGER_ADDR.into(), tcp_listener.flow_control_id());
Ok(node_manager)
}
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.timeout = Some(timeout);
self
}
pub async fn stop(&self, ctx: &Context) -> Result<()> {
for session in self.registry.inlets.values() {
session.session.lock().await.stop().await;
}
for session in self.registry.relays.values() {
session.session.lock().await.stop().await;
}
for addr in DefaultAddress::iter() {
let result = ctx.stop_address(&addr.into());
if let Err(err) = result {
if err.code().kind == Kind::NotFound {
continue;
} else {
return Err(err);
}
}
}
Ok(())
}
#[instrument(name = "new in-memory node", skip_all, fields(node_name = general_options.node_name))]
pub async fn new(
ctx: &Context,
general_options: NodeManagerGeneralOptions,
transport_options: NodeManagerTransportOptions,
trust_options: NodeManagerTrustOptions,
) -> Result<Self> {
let persistent = general_options.persistent;
let node_manager =
NodeManager::create(ctx, general_options, transport_options, trust_options).await?;
Ok(Self {
node_manager,
persistent,
timeout: None,
})
}
pub fn secure_channels(&self) -> Arc<SecureChannels> {
self.secure_channels.clone()
}
pub async fn create_controller(&self) -> miette::Result<ControllerClient> {
let client = self.node_manager.create_controller().await?;
if let Some(timeout) = self.timeout {
Ok(client
.with_request_timeout(&timeout)
.with_secure_channel_timeout(&timeout))
} else {
Ok(client)
}
}
pub async fn create_authority_client_with_project(
&self,
ctx: &Context,
project: &Project,
caller_identity_name: Option<String>,
skip_controller_call: bool,
) -> miette::Result<AuthorityNodeClient> {
let client = self
.node_manager
.create_authority_client_with_project(
ctx,
project,
caller_identity_name,
skip_controller_call,
)
.await?;
if let Some(timeout) = self.timeout {
Ok(client
.with_request_timeout(&timeout)
.with_secure_channel_timeout(&timeout))
} else {
Ok(client)
}
}
pub async fn create_authority_client_with_authority(
&self,
ctx: &Context,
authority_identifier: &Identifier,
authority_route: &MultiAddr,
caller_identity_name: Option<String>,
) -> miette::Result<AuthorityNodeClient> {
let client = self
.node_manager
.create_authority_client_with_authority(
ctx,
authority_identifier,
authority_route,
caller_identity_name,
)
.await?;
if let Some(timeout) = self.timeout {
Ok(client
.with_request_timeout(&timeout)
.with_secure_channel_timeout(&timeout))
} else {
Ok(client)
}
}
pub async fn create_project_client(
&self,
project_identifier: &Identifier,
project_multiaddr: &MultiAddr,
caller_identity_name: Option<String>,
credentials_enabled: CredentialsEnabled,
) -> miette::Result<ProjectNodeClient> {
let client = self
.node_manager
.create_project_client(
project_identifier,
project_multiaddr,
caller_identity_name,
credentials_enabled,
)
.await?;
if let Some(timeout) = self.timeout {
Ok(client
.with_request_timeout(&timeout)
.with_secure_channel_timeout(&timeout))
} else {
Ok(client)
}
}
}
pub struct NodeManagerDefaults {
pub node_name: String,
pub tcp_listener_address: String,
}
impl Default for NodeManagerDefaults {
fn default() -> Self {
Self {
node_name: random_name(),
tcp_listener_address: "127.0.0.1:0".to_string(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[ockam::test]
async fn test_start_twice(ctx: &mut Context) -> Result<()> {
let cli = CliState::test().await?;
let node_manager1 = InMemoryNode::start(ctx, &cli).await;
assert!(node_manager1.is_ok());
let node_manager2 = InMemoryNode::start(ctx, &cli).await;
if let Err(e) = node_manager2 {
panic!("cannot start the node manager a second time: {e:?}");
}
Ok(())
}
}