use async_trait::async_trait;
use service_channel::{mpsc, oneshot};
use crate::{Context, Handler, Message, ReplyHandle, RoutedTopic, Service, Topic};
pub struct Envelope<S>(Box<dyn EnvelopProxy<S> + Send>);
impl<S> Envelope<S> {
pub fn new<M>(message: M) -> Self
where
M: Message + Send + 'static,
S: Handler<M> + Send,
M::Result: Send,
{
Self(Box::new(EnvelopWithMessage::new(message, None)))
}
pub fn new_with_result_channel<M>(
message: M,
result_channel: Option<oneshot::Sender<M::Result>>,
) -> Self
where
S: Handler<M> + Send,
M: Message + Send + 'static,
M::Result: Send,
{
Self(Box::new(EnvelopWithMessage::new(message, result_channel)))
}
pub fn new_subscribe_topic<T>(topic: T, result_channel: oneshot::Sender<T::Item>) -> Self
where
S: Service + Send,
T: Topic + RoutedTopic<S>,
{
Self(Box::new(SubscribeTopicEnvelope::<T>::new(
topic,
result_channel,
)))
}
pub fn new_publish_topic<T>(topic: T, item: T::Item) -> Self
where
S: Service + Send,
T: Topic + RoutedTopic<S>,
{
Self(Box::new(PublishTopicEnvelope::<T>::new(topic, item)))
}
pub fn new_subscribe_all_topic<T>(
topic: T,
result_channel: mpsc::UnboundedSender<T::Item>,
) -> Self
where
S: Service + Send,
T: Topic + RoutedTopic<S>,
{
Self(Box::new(SubscribeAllTopicEnvelope::<T>::new(
topic,
result_channel,
)))
}
pub(crate) fn from_boxed<M>(boxed: Box<EnvelopWithMessage<M>>) -> Self
where
S: Handler<M> + Send,
M: Message + Send + 'static,
M::Result: Send,
{
Self(boxed)
}
}
impl<S> Envelope<S>
where
S: Service + Send,
{
pub async fn handle(self, svc: &mut S, ctx: &mut Context<S, S::Stream>) {
self.0.handle(svc, ctx).await
}
}
#[async_trait]
pub(crate) trait EnvelopProxy<S: Service> {
async fn handle(mut self: Box<Self>, svc: &mut S, ctx: &mut Context<S, S::Stream>);
}
pub(crate) struct EnvelopWithMessage<M>
where
M: Message,
{
message: M,
result_channel: Option<oneshot::Sender<M::Result>>,
}
impl<M> EnvelopWithMessage<M>
where
M: Message,
{
pub(crate) fn new(message: M, result_channel: Option<oneshot::Sender<M::Result>>) -> Self {
Self {
message,
result_channel,
}
}
}
#[async_trait]
impl<S, M> EnvelopProxy<S> for EnvelopWithMessage<M>
where
M: Message + Send + 'static,
S: Handler<M> + Send,
M::Result: Send,
{
async fn handle(mut self: Box<Self>, svc: &mut S, ctx: &mut Context<S, S::Stream>) {
let message = self.message;
let result_channel = self.result_channel;
let handle = ReplyHandle::new(result_channel);
<S as Handler<M>>::handle_preferred(svc, message, ctx, handle).await;
}
}
pub(crate) struct SubscribeTopicEnvelope<T>
where
T: Topic,
{
topic: T,
result_channel: oneshot::Sender<T::Item>,
}
impl<T> SubscribeTopicEnvelope<T>
where
T: Topic,
{
pub(crate) fn new(topic: T, result_channel: oneshot::Sender<T::Item>) -> Self {
Self {
topic,
result_channel,
}
}
}
#[async_trait]
impl<S, T> EnvelopProxy<S> for SubscribeTopicEnvelope<T>
where
S: Service + Send,
T: Topic + RoutedTopic<S>,
{
async fn handle(self: Box<Self>, svc: &mut S, _ctx: &mut Context<S, S::Stream>) {
let Self {
topic,
result_channel,
} = *self;
T::endpoint(svc).subscribe(topic, result_channel);
}
}
pub(crate) struct PublishTopicEnvelope<T>
where
T: Topic,
{
topic: T,
item: T::Item,
}
impl<T> PublishTopicEnvelope<T>
where
T: Topic,
{
pub(crate) fn new(topic: T, item: T::Item) -> Self {
Self { topic, item }
}
}
#[async_trait]
impl<S, T> EnvelopProxy<S> for PublishTopicEnvelope<T>
where
S: Service + Send,
T: Topic + RoutedTopic<S>,
{
async fn handle(self: Box<Self>, svc: &mut S, _ctx: &mut Context<S, S::Stream>) {
let Self { topic, item } = *self;
T::endpoint(svc).publish(&topic, item);
}
}
pub(crate) struct SubscribeAllTopicEnvelope<T>
where
T: Topic,
{
topic: T,
result_channel: mpsc::UnboundedSender<T::Item>,
}
impl<T> SubscribeAllTopicEnvelope<T>
where
T: Topic,
{
pub(crate) fn new(topic: T, result_channel: mpsc::UnboundedSender<T::Item>) -> Self {
Self {
topic,
result_channel,
}
}
}
#[async_trait]
impl<S, T> EnvelopProxy<S> for SubscribeAllTopicEnvelope<T>
where
S: Service + Send,
T: Topic + RoutedTopic<S>,
{
async fn handle(self: Box<Self>, svc: &mut S, _ctx: &mut Context<S, S::Stream>) {
let Self {
topic,
result_channel,
} = *self;
T::endpoint(svc).subscribe_all(topic, result_channel);
}
}