use crate::error::NetworkError;
use crate::kernel::kernel_communicator::{
CallbackKey, KernelAsyncCallbackHandler, KernelStreamSubscription,
};
use crate::prelude::NodeRequest;
use crate::proto::node::CitadelNodeRemoteInner;
use crate::proto::outbound_sender::BoundedSender;
use bytemuck::NoUninit;
use citadel_crypt::ratchets::Ratchet;
use citadel_io::tokio::sync::mpsc::error::TrySendError;
use citadel_user::account_manager::AccountManager;
use citadel_wire::hypernode_type::NodeType;
use serde::{Deserialize, Serialize};
use std::fmt::{Debug, Display, Formatter};
use std::sync::Arc;
use uuid::Uuid;
#[derive(Clone)]
pub struct NodeRemote<R: Ratchet> {
outbound_send_request_tx: BoundedSender<(NodeRequest, Ticket)>,
inner: Arc<CitadelNodeRemoteInner<R>>,
}
#[async_trait::async_trait]
#[auto_impl::auto_impl(Box, &mut, &, Arc)]
pub trait Remote<R: Ratchet>: Clone + Send {
async fn send(&self, request: NodeRequest) -> Result<Ticket, NetworkError> {
let ticket = self.get_next_ticket();
self.send_with_custom_ticket(ticket, request)
.await
.map(|_| ticket)
}
async fn send_with_custom_ticket(
&self,
ticket: Ticket,
request: NodeRequest,
) -> Result<(), NetworkError>;
async fn send_callback_subscription(
&self,
request: NodeRequest,
) -> Result<KernelStreamSubscription<R>, NetworkError>;
fn account_manager(&self) -> &AccountManager<R, R>;
fn get_next_ticket(&self) -> Ticket;
}
#[async_trait::async_trait]
impl<R: Ratchet> Remote<R> for NodeRemote<R> {
async fn send(&self, request: NodeRequest) -> Result<Ticket, NetworkError> {
NodeRemote::send(self, request).await
}
async fn send_with_custom_ticket(
&self,
ticket: Ticket,
request: NodeRequest,
) -> Result<(), NetworkError> {
NodeRemote::send_with_custom_ticket(self, ticket, request).await
}
async fn send_callback_subscription(
&self,
request: NodeRequest,
) -> Result<KernelStreamSubscription<R>, NetworkError> {
NodeRemote::send_callback_subscription(self, request).await
}
fn account_manager(&self) -> &AccountManager<R, R> {
NodeRemote::account_manager(self)
}
fn get_next_ticket(&self) -> Ticket {
NodeRemote::get_next_ticket(self)
}
}
impl<R: Ratchet> Debug for NodeRemote<R> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "CitadelNodeRemote")
}
}
impl<R: Ratchet> NodeRemote<R> {
pub(crate) fn new(
outbound_send_request_tx: BoundedSender<(NodeRequest, Ticket)>,
callback_handler: KernelAsyncCallbackHandler<R>,
account_manager: AccountManager<R, R>,
node_type: NodeType,
) -> Self {
Self {
outbound_send_request_tx,
inner: Arc::new(CitadelNodeRemoteInner {
callback_handler,
account_manager,
node_type,
}),
}
}
pub async fn send_with_custom_ticket(
&self,
ticket: Ticket,
request: NodeRequest,
) -> Result<(), NetworkError> {
self.outbound_send_request_tx
.send((request, ticket))
.await
.map_err(|err| {
let reason = err.to_string();
NetworkError::NodeRemoteSendError {
reason,
request: Box::new(err.0 .0),
}
})
}
pub async fn send(&self, request: NodeRequest) -> Result<Ticket, NetworkError> {
let ticket = self.get_next_ticket();
self.send_with_custom_ticket(ticket, request)
.await
.map(|_| ticket)
}
pub(crate) async fn send_callback_subscription_custom_ticket(
&self,
request: NodeRequest,
ticket: Ticket,
) -> Result<KernelStreamSubscription<R>, NetworkError> {
let callback_key = CallbackKey {
ticket,
session_cid: request.session_cid(),
};
let rx = self.inner.callback_handler.register_stream(callback_key)?;
match self.send_with_custom_ticket(ticket, request).await {
Ok(_) => Ok(rx),
Err(err) => {
log::error!(target: "citadel", "****** Error sending callback subscription: {err:?}");
self.inner.callback_handler.remove_listener(callback_key);
Err(err)
}
}
}
pub async fn send_callback_subscription(
&self,
request: NodeRequest,
) -> Result<KernelStreamSubscription<R>, NetworkError> {
let ticket = self.get_next_ticket();
self.send_callback_subscription_custom_ticket(request, ticket)
.await
}
pub async fn shutdown(&self) -> Result<(), NetworkError> {
let _ = self.send(NodeRequest::Shutdown).await?;
Ok(())
}
pub fn get_next_ticket(&self) -> Ticket {
uuid::Uuid::new_v4().as_u128().into()
}
#[allow(clippy::result_large_err)]
pub fn try_send_with_custom_ticket(
&self,
ticket: Ticket,
request: NodeRequest,
) -> Result<(), TrySendError<(NodeRequest, Ticket)>> {
self.outbound_send_request_tx.try_send((request, ticket))
}
#[allow(clippy::result_large_err)]
pub fn try_send(
&self,
request: NodeRequest,
) -> Result<(), TrySendError<(NodeRequest, Ticket)>> {
let ticket = self.get_next_ticket();
self.try_send_with_custom_ticket(ticket, request)
}
pub fn local_node_type(&self) -> &NodeType {
&self.inner.node_type
}
pub fn account_manager(&self) -> &AccountManager<R, R> {
&self.inner.account_manager
}
}
impl<R: Ratchet> Unpin for NodeRemote<R> {}
#[derive(
Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize, NoUninit,
)]
#[repr(C)]
pub struct Ticket(pub u128);
impl From<u128> for Ticket {
fn from(val: u128) -> Self {
Ticket(val)
}
}
impl From<usize> for Ticket {
fn from(val: usize) -> Self {
(val as u128).into()
}
}
impl Default for Ticket {
fn default() -> Self {
Self::from(Uuid::new_v4().as_u128())
}
}
impl Display for Ticket {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}