use std::ops::Deref;
use std::time::Duration;
use futures::executor;
use miette::IntoDiagnostic;
use ockam::identity::SecureChannels;
use ockam::{Context, Result, TcpTransport};
use ockam_core::compat::{string::String, sync::Arc};
use ockam_core::errcode::Kind;
use ockam_transport_tcp::TcpListenerOptions;
use crate::cli_state::random_name;
use crate::cli_state::CliState;
use crate::cli_state::NamedTrustContext;
use crate::cloud::Controller;
use crate::nodes::service::default_address::DefaultAddress;
use crate::nodes::service::{
NodeManagerGeneralOptions, NodeManagerTransportOptions, NodeManagerTrustOptions,
};
use crate::nodes::{NodeManager, NODEMANAGER_ADDR};
use crate::session::sessions::Session;
pub struct InMemoryNode {
pub(crate) node_manager: Arc<NodeManager>,
persistent: bool,
timeout: Option<Duration>,
}
impl Deref for InMemoryNode {
type Target = Arc<NodeManager>;
fn deref(&self) -> &Self::Target {
&self.node_manager
}
}
impl Drop for InMemoryNode {
fn drop(&mut self) {
if !self.persistent {
executor::block_on(async {
self.node_manager
.delete_node()
.await
.unwrap_or_else(|e| panic!("cannot delete the node {}: {e:?}", self.node_name))
});
}
}
}
impl InMemoryNode {
pub async fn start(ctx: &Context, cli_state: &CliState) -> miette::Result<Self> {
Self::start_with_trust_context(ctx, cli_state, None, None).await
}
pub async fn start_with_trust_context(
ctx: &Context,
cli_state: &CliState,
project_name: Option<String>,
trust_context: Option<NamedTrustContext>,
) -> miette::Result<Self> {
let default_identity_name = cli_state.get_default_named_identity().await?.name();
Self::start_node(
ctx,
cli_state,
&default_identity_name,
project_name,
trust_context,
)
.await
}
pub async fn start_node(
ctx: &Context,
cli_state: &CliState,
identity_name: &str,
project_name: Option<String>,
trust_context: Option<NamedTrustContext>,
) -> miette::Result<InMemoryNode> {
let defaults = NodeManagerDefaults::default();
let node = cli_state
.create_node_with_optional_values(
&defaults.node_name,
&Some(identity_name.to_string()),
&project_name,
)
.await?;
let tcp = TcpTransport::create(ctx).await.into_diagnostic()?;
let bind = defaults.tcp_listener_address;
let options = TcpListenerOptions::new();
let listener = tcp.listen(&bind, options).await.into_diagnostic()?;
cli_state
.set_tcp_listener_address(&node.name(), listener.socket_address().to_string())
.await?;
let node_manager = Self::new(
ctx,
NodeManagerGeneralOptions::new(cli_state.clone(), node.name(), None, false, false),
NodeManagerTransportOptions::new(listener.flow_control_id().clone(), tcp),
NodeManagerTrustOptions::new(trust_context),
)
.await
.into_diagnostic()?;
ctx.flow_controls()
.add_consumer(NODEMANAGER_ADDR, listener.flow_control_id());
Ok(node_manager)
}
pub async fn create_controller(&self) -> miette::Result<Controller> {
self.create_controller_client(self.timeout)
.await
.into_diagnostic()
}
pub fn add_session(&self, session: Session) {
self.medic_handle.add_session(session);
}
pub fn remove_session(&self, key: &str) {
self.medic_handle.remove_session(key);
}
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.timeout = Some(timeout);
self
}
pub async fn stop(&self, ctx: &Context) -> Result<()> {
self.medic_handle.stop_medic(ctx).await?;
for addr in DefaultAddress::iter() {
let result = ctx.stop_worker(addr).await;
if let Err(err) = result {
if err.code().kind == Kind::NotFound {
continue;
} else {
return Err(err);
}
}
}
Ok(())
}
pub async fn new(
ctx: &Context,
general_options: NodeManagerGeneralOptions,
transport_options: NodeManagerTransportOptions,
trust_options: NodeManagerTrustOptions,
) -> Result<Self> {
let persistent = general_options.persistent;
let node_manager =
NodeManager::create(ctx, general_options, transport_options, trust_options).await?;
debug!("start the Medic");
Ok(Self {
node_manager: Arc::new(node_manager),
persistent,
timeout: None,
})
}
pub fn secure_channels(&self) -> Arc<SecureChannels> {
self.secure_channels.clone()
}
}
pub struct NodeManagerDefaults {
pub node_name: String,
pub tcp_listener_address: String,
}
impl Default for NodeManagerDefaults {
fn default() -> Self {
Self {
node_name: random_name(),
tcp_listener_address: "127.0.0.1:0".to_string(),
}
}
}