use std::sync::Arc;
use std::time::Duration;
use miette::{miette, IntoDiagnostic};
use ockam::identity::get_default_timeout;
use ockam::tcp::{TcpConnection, TcpConnectionOptions, TcpTransport};
use ockam_core::api::{Reply, Request};
use ockam_core::{Message, Route};
use ockam_node::api::Client;
use ockam_node::Context;
use crate::cli_state::CliState;
use crate::nodes::NODEMANAGER_ADDR;
#[derive(Clone)]
pub struct BackgroundNodeClient {
cli_state: CliState,
node_name: String,
to: Route,
timeout: Option<Duration>,
tcp_transport: Arc<TcpTransport>,
}
impl BackgroundNodeClient {
pub async fn create(
ctx: &Context,
cli_state: &CliState,
node_name: &Option<String>,
) -> miette::Result<BackgroundNodeClient> {
let node_name = match node_name.clone() {
Some(name) => name,
None => cli_state.get_default_node().await?.name(),
};
Self::create_to_node(ctx, cli_state, &node_name)
}
pub fn create_to_node(
ctx: &Context,
cli_state: &CliState,
node_name: &str,
) -> miette::Result<BackgroundNodeClient> {
let tcp_transport = TcpTransport::get_or_create(ctx).into_diagnostic()?;
BackgroundNodeClient::new(&tcp_transport, cli_state, node_name)
}
pub async fn create_to_node_with_tcp(
tcp: &TcpTransport,
cli_state: &CliState,
node_name: &str,
) -> miette::Result<BackgroundNodeClient> {
BackgroundNodeClient::new(tcp, cli_state, node_name)
}
pub fn new(
tcp_transport: &TcpTransport,
cli_state: &CliState,
node_name: &str,
) -> miette::Result<BackgroundNodeClient> {
Ok(BackgroundNodeClient {
cli_state: cli_state.clone(),
node_name: node_name.to_string(),
to: NODEMANAGER_ADDR.into(),
timeout: Some(get_default_timeout()),
tcp_transport: Arc::new(tcp_transport.clone()),
})
}
pub async fn delete(&self) -> miette::Result<()> {
Ok(self.cli_state.delete_node(self.node_name()).await?)
}
pub fn set_node_name(&mut self, node_name: &str) -> &Self {
self.node_name = node_name.to_string();
self
}
pub fn node_name(&self) -> &str {
&self.node_name
}
pub fn set_timeout_mut(&mut self, timeout: Duration) -> &Self {
self.timeout = Some(timeout);
self
}
pub fn set_timeout(self, timeout: Option<Duration>) -> Self {
Self { timeout, ..self }
}
pub fn cli_state(&self) -> &CliState {
&self.cli_state
}
pub async fn ask<T, R>(&self, ctx: &Context, req: Request<T>) -> miette::Result<R>
where
T: Message,
R: Message,
{
self.ask_and_get_reply(ctx, req)
.await?
.success()
.into_diagnostic()
}
pub async fn ask_with_timeout<T, R>(
&self,
ctx: &Context,
req: Request<T>,
timeout: Duration,
) -> miette::Result<R>
where
T: Message,
R: Message,
{
let (tcp_connection, client) = self.make_client_with_timeout(Some(timeout)).await?;
let res = client
.ask(ctx, req)
.await
.into_diagnostic()?
.success()
.into_diagnostic();
let _ = tcp_connection.stop(ctx);
res
}
pub async fn ask_and_get_reply<T, R>(
&self,
ctx: &Context,
req: Request<T>,
) -> miette::Result<Reply<R>>
where
T: Message,
R: Message,
{
let (tcp_connection, client) = self.make_client().await?;
let res = client.ask(ctx, req).await.into_diagnostic();
let _ = tcp_connection.stop(ctx);
res
}
pub async fn tell<T>(&self, ctx: &Context, req: Request<T>) -> miette::Result<()>
where
T: Message,
{
let (tcp_connection, client) = self.make_client().await?;
let res = client
.tell(ctx, req)
.await
.into_diagnostic()?
.success()
.into_diagnostic();
let _ = tcp_connection.stop(ctx);
res
}
pub async fn tell_and_get_reply<T>(
&self,
ctx: &Context,
req: Request<T>,
) -> miette::Result<Reply<()>>
where
T: Message,
{
let (tcp_connection, client) = self.make_client().await?;
let res = client.tell(ctx, req).await.into_diagnostic();
let _ = tcp_connection.stop(ctx);
res
}
async fn create_route(
&self,
timeout: Option<Duration>,
) -> miette::Result<(TcpConnection, Route)> {
let tcp_connection = self.create_tcp_connection(timeout).await?;
let route = tcp_connection.sender_address().clone() + self.to.clone();
debug!("Sending requests to {route}");
Ok((tcp_connection, route))
}
async fn create_tcp_connection(
&self,
timeout: Option<Duration>,
) -> miette::Result<TcpConnection> {
let node_info = self.cli_state.get_node(&self.node_name).await?;
let tcp_listener_address = node_info
.tcp_connect_address()
.ok_or(miette!(
"an api transport should have been started for node {:?}",
&node_info
))?
.to_string();
self.tcp_transport
.connect(
&tcp_listener_address,
TcpConnectionOptions::new().set_timeout(timeout),
)
.await
.map_err(|_| {
miette!(
"Failed to connect to node {} at {}",
&self.node_name,
&tcp_listener_address
)
})
}
pub(crate) async fn make_client(&self) -> miette::Result<(TcpConnection, Client)> {
self.make_client_with_timeout(self.timeout).await
}
pub(crate) async fn make_client_with_timeout(
&self,
timeout: Option<Duration>,
) -> miette::Result<(TcpConnection, Client)> {
let (tcp_connection, route) = self.create_route(timeout).await?;
Ok((tcp_connection, Client::new(&route, timeout)))
}
}