use std::{error::Error, fmt::Display};
use crate::{
envelope::{Envelope, ExecutorEnvelope, FutureEnvelope, ServiceMessage, StopEnvelope},
msg::{BoxFuture, Handler, Message},
service::{Service, ServiceContext},
};
use tokio::sync::{mpsc, oneshot};
pub struct Link<S>(pub(crate) mpsc::UnboundedSender<ServiceMessage<S>>);
pub struct MessageLink<M: Message>(Box<dyn MessageLinkTx<M>>);
trait MessageLinkTx<M: Message> {
fn tx(&self, msg: M, tx: Option<oneshot::Sender<M::Response>>) -> LinkResult<()>;
fn boxed_clone(&self) -> Box<dyn MessageLinkTx<M>>;
}
impl<S, M> MessageLinkTx<M> for mpsc::UnboundedSender<ServiceMessage<S>>
where
S: Service + Handler<M>,
M: Message,
{
fn tx(&self, msg: M, tx: Option<oneshot::Sender<M::Response>>) -> LinkResult<()> {
self.send(Envelope::new(msg, tx))
.map_err(|_| LinkError::Send)
}
fn boxed_clone(&self) -> Box<dyn MessageLinkTx<M>> {
Box::new(self.clone())
}
}
impl<M> MessageLink<M>
where
M: Message,
{
pub async fn send(&self, msg: M) -> LinkResult<M::Response> {
let (tx, rx) = oneshot::channel();
self.0.tx(msg, Some(tx))?;
rx.await.map_err(|_| LinkError::Recv)
}
pub fn do_send(&self, msg: M) -> LinkResult<()> {
self.0.tx(msg, None)
}
}
impl<S> Clone for Link<S> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
impl<M: Message> Clone for MessageLink<M> {
fn clone(&self) -> Self {
Self(self.0.boxed_clone())
}
}
#[derive(Debug)]
pub enum LinkError {
Send,
Recv,
}
impl Display for LinkError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
LinkError::Send => f.write_str("Failed to send to link"),
LinkError::Recv => f.write_str("Failed to receive from link"),
}
}
}
impl Error for LinkError {}
pub type LinkResult<T> = Result<T, LinkError>;
impl<S> Link<S>
where
S: Service,
{
pub fn is_closed(&self) -> bool {
self.0.is_closed()
}
pub fn message_link<M>(&self) -> MessageLink<M>
where
M: Message,
S: Handler<M>,
{
MessageLink(self.0.boxed_clone())
}
pub(crate) fn tx(&self, value: ServiceMessage<S>) -> LinkResult<()> {
match self.0.send(value) {
Ok(_) => Ok(()),
Err(_) => Err(LinkError::Send),
}
}
pub async fn wait<F, R>(&self, action: F) -> LinkResult<R>
where
for<'a> F:
FnOnce(&'a mut S, &'a mut ServiceContext<S>) -> BoxFuture<'a, R> + Send + 'static,
R: Send + 'static,
{
let (tx, rx) = oneshot::channel();
self.tx(FutureEnvelope::new(Box::new(action), Some(tx)))?;
rx.await.map_err(|_| LinkError::Recv)
}
pub fn do_wait<F, R>(&self, action: F) -> LinkResult<()>
where
for<'a> F:
FnOnce(&'a mut S, &'a mut ServiceContext<S>) -> BoxFuture<'a, R> + Send + 'static,
R: Send + 'static,
{
self.tx(FutureEnvelope::new(Box::new(action), None))
}
pub async fn send<M>(&self, msg: M) -> LinkResult<M::Response>
where
M: Message,
S: Handler<M>,
{
let (tx, rx) = oneshot::channel();
self.tx(Envelope::new(msg, Some(tx)))?;
rx.await.map_err(|_| LinkError::Recv)
}
pub fn do_send<M>(&self, msg: M) -> LinkResult<()>
where
M: Message,
S: Handler<M>,
{
self.tx(Envelope::new(msg, None))
}
pub async fn exec<F, R>(&self, action: F) -> LinkResult<R>
where
F: FnOnce(&mut S, &mut ServiceContext<S>) -> R + Send + 'static,
R: Send + 'static,
{
let (tx, rx) = oneshot::channel();
self.tx(ExecutorEnvelope::new(action, Some(tx)))?;
rx.await.map_err(|_| LinkError::Recv)
}
pub fn do_exec<F, R>(&self, action: F) -> LinkResult<()>
where
F: FnOnce(&mut S, &mut ServiceContext<S>) -> R + Send + 'static,
R: Send + 'static,
{
self.tx(ExecutorEnvelope::new(action, None))
}
pub fn stop(&self) {
let _ = self.tx(Box::new(StopEnvelope));
}
}