use crate::core::connection::request_connection::RequestConnection;
use crate::core::connection::RequestConnectionEnum;
use crate::prelude::*;
use linear_type::Linear;
use std::fmt::Debug;
use std::marker::PhantomData;
use std::sync::Arc;
use tracing::warn;
pub trait IsOutRequestHub<M: IsInboundMessage>: Send + Sync + 'static + HasActivate {
fn from_parent_and_sender(
actor_name: &str,
sender: &tokio::sync::mpsc::UnboundedSender<M>,
) -> Self;
}
#[derive(Debug)]
pub struct RequestWithReplyChannel<Request, Reply> {
pub request: Request,
pub(crate) reply_channel: Linear<tokio::sync::oneshot::Sender<ReplyMessage<Reply>>>,
}
pub trait IsRequestWithReplyChannel: Send + Sync + 'static + Debug {
type Request;
type Reply;
}
impl<
Request: Send + Sync + 'static + Clone + Debug,
Reply: Send + Sync + 'static + Clone + Debug,
> IsRequestWithReplyChannel for RequestWithReplyChannel<Request, Reply>
{
type Request = Reply;
type Reply = Reply;
}
impl<Request, Reply: Debug> RequestWithReplyChannel<Request, Reply> {
pub fn reply_from_request<F>(self, func: F)
where
F: FnOnce(Request) -> Reply,
{
let request = self.request;
let reply = func(request);
let reply_channel = self.reply_channel.into_inner();
reply_channel.send(ReplyMessage { reply }).unwrap();
}
pub fn reply(self, reply: Reply) {
let reply_channel = self.reply_channel.into_inner();
reply_channel.send(ReplyMessage { reply }).unwrap();
}
}
#[derive(Debug, Clone, Default)]
pub struct ReplyMessage<Reply> {
pub reply: Reply,
}
pub struct OutRequestChannel<Request, Reply, M: IsInboundMessage> {
pub name: String,
pub actor_name: String,
pub(crate) connection_register: RequestConnectionEnum<RequestWithReplyChannel<Request, Reply>>,
pub(crate) sender: tokio::sync::mpsc::UnboundedSender<M>,
}
impl<Request, Reply, M: IsInboundMessage> HasActivate for OutRequestChannel<Request, Reply, M> {
fn extract(&mut self) -> Self {
Self {
name: self.name.clone(),
actor_name: self.actor_name.clone(),
connection_register: self.connection_register.extract(),
sender: self.sender.clone(),
}
}
fn activate(&mut self) {
self.connection_register.activate();
}
}
impl<
Request: Clone + Send + Sync + std::fmt::Debug + 'static,
Reply: Clone + Send + Sync + std::fmt::Debug + 'static,
M: IsInboundMessageNew<ReplyMessage<Reply>>,
> OutRequestChannel<Request, Reply, M>
{
pub fn new(
name: String,
actor_name: &str,
sender: &tokio::sync::mpsc::UnboundedSender<M>,
) -> Self {
Self {
name: name.clone(),
actor_name: actor_name.to_owned(),
connection_register: RequestConnectionEnum::new(),
sender: sender.clone(),
}
}
pub fn connect<Me: IsInRequestMessageNew<RequestWithReplyChannel<Request, Reply>>>(
&mut self,
_ctx: &mut Hollywood,
inbound: &mut InRequestChannel<RequestWithReplyChannel<Request, Reply>, Me>,
) {
self.connection_register.push(Arc::new(RequestConnection {
sender: inbound.sender.as_ref().clone(),
inbound_channel: inbound.name.clone(),
phantom: PhantomData {},
}));
}
pub fn send_request(&self, msg: Request) {
let (reply_sender, reply_receiver) = tokio::sync::oneshot::channel();
let msg = RequestWithReplyChannel {
request: msg,
reply_channel: Linear::new(reply_sender),
};
self.connection_register.send(msg);
let sender = self.sender.clone();
let name = self.name.clone();
tokio::spawn(async move {
match reply_receiver.await {
Ok(r) => match sender.send(M::new(name, r)) {
Ok(_) => {}
Err(e) => {
warn!("Error sending request: {:?}", e);
}
},
Err(e) => {
warn!("Reply receiver error: {:?}", e);
}
};
});
}
}
#[derive(Debug, Clone, Default)]
pub struct NullOutRequests {}
impl<M: IsInboundMessage> IsOutRequestHub<M> for NullOutRequests {
fn from_parent_and_sender(
_actor_name: &str,
_sender: &tokio::sync::mpsc::UnboundedSender<M>,
) -> Self {
Self {}
}
}
impl HasActivate for NullOutRequests {
fn extract(&mut self) -> Self {
Self {}
}
fn activate(&mut self) {}
}