use crate::{
msg::{BoxFuture, ErrorAction, ErrorHandler, Handler, Message, ResponseHandler, StreamHandler},
service::{Service, ServiceContext},
};
use std::task::ready;
use std::{future::Future, pin::Pin};
use tokio::sync::oneshot;
pub(crate) type ServiceMessage<S> = Box<dyn EnvelopeProxy<S>>;
pub(crate) enum ServiceAction<'a> {
Stop,
Continue,
Execute(BoxFuture<'a, ()>),
}
struct ExecuteFuture<'a, R> {
fut: BoxFuture<'a, R>,
tx: Option<oneshot::Sender<R>>,
}
impl<'a, R> ExecuteFuture<'a, R>
where
R: Send + 'static,
{
pub(crate) fn wrap(fut: BoxFuture<'a, R>, tx: Option<oneshot::Sender<R>>) -> BoxFuture<'a, ()> {
Box::pin(ExecuteFuture { fut, tx })
}
}
impl<'a, R> Future for ExecuteFuture<'a, R>
where
R: Send,
{
type Output = ();
fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let this = self.get_mut();
let result = ready!(Pin::new(&mut this.fut).poll(cx));
if let Some(tx) = this.tx.take() {
let _ = tx.send(result);
}
std::task::Poll::Ready(())
}
}
pub(crate) trait EnvelopeProxy<S: Service>: Send {
fn handle<'a>(
self: Box<Self>,
service: &'a mut S,
ctx: &'a mut ServiceContext<S>,
) -> ServiceAction<'a>;
}
pub(crate) struct Envelope<M: Message> {
msg: M,
tx: Option<oneshot::Sender<M::Response>>,
}
impl<M: Message> Envelope<M> {
pub(crate) fn new(msg: M, tx: Option<oneshot::Sender<M::Response>>) -> Box<Envelope<M>> {
Box::new(Envelope { msg, tx })
}
}
impl<S, M, R> EnvelopeProxy<S> for Envelope<M>
where
S: Handler<M, Response = R>,
S: Service,
M: Message,
R: ResponseHandler<S, M>,
{
fn handle<'a>(
self: Box<Self>,
service: &'a mut S,
ctx: &'a mut ServiceContext<S>,
) -> ServiceAction<'a> {
let res = service.handle(self.msg, ctx);
res.respond(service, ctx, self.tx);
ServiceAction::Continue
}
}
pub trait ServiceExecutor<S: Service>: Send {
type Response: Send;
fn execute<'a>(self, service: &'a mut S, ctx: &'a mut ServiceContext<S>) -> Self::Response;
}
impl<F, S, R> ServiceExecutor<S> for F
where
S: Service,
for<'a> F: FnOnce(&'a mut S, &'a mut ServiceContext<S>) -> R + Send,
R: Send,
{
type Response = R;
fn execute<'a>(self, service: &'a mut S, ctx: &'a mut ServiceContext<S>) -> Self::Response {
self(service, ctx)
}
}
pub(crate) struct ExecutorEnvelope<S, E>
where
S: Service,
E: ServiceExecutor<S>,
{
action: E,
tx: Option<oneshot::Sender<E::Response>>,
}
impl<S, E> ExecutorEnvelope<S, E>
where
S: Service,
E: ServiceExecutor<S>,
{
pub(crate) fn new(
action: E,
tx: Option<oneshot::Sender<E::Response>>,
) -> Box<ExecutorEnvelope<S, E>> {
Box::new(ExecutorEnvelope { action, tx })
}
}
impl<S, E> EnvelopeProxy<S> for ExecutorEnvelope<S, E>
where
S: Service,
E: ServiceExecutor<S>,
{
fn handle<'a>(
self: Box<Self>,
service: &'a mut S,
ctx: &'a mut ServiceContext<S>,
) -> ServiceAction<'a> {
let res = self.action.execute(service, ctx);
if let Some(tx) = self.tx {
let _ = tx.send(res);
}
ServiceAction::Continue
}
}
pub trait FutureProducer<S: Service>: Send + 'static {
type Response: Send + 'static;
fn produce<'a>(
self,
service: &'a mut S,
ctx: &'a mut ServiceContext<S>,
) -> BoxFuture<'a, Self::Response>;
fn produce_boxed<'a>(
self: Box<Self>,
service: &'a mut S,
ctx: &'a mut ServiceContext<S>,
) -> BoxFuture<'a, Self::Response>;
}
impl<S, F, R> FutureProducer<S> for F
where
S: Service,
for<'a> F: FnOnce(&'a mut S, &'a mut ServiceContext<S>) -> BoxFuture<'a, R> + Send + 'static,
R: Send + 'static,
{
type Response = R;
fn produce<'a>(
self,
service: &'a mut S,
ctx: &'a mut ServiceContext<S>,
) -> BoxFuture<'a, Self::Response> {
self(service, ctx)
}
fn produce_boxed<'a>(
self: Box<Self>,
service: &'a mut S,
ctx: &'a mut ServiceContext<S>,
) -> BoxFuture<'a, Self::Response> {
self(service, ctx)
}
}
pub(crate) struct BoxedFutureEnvelope<S, R> {
producer: Box<dyn FutureProducer<S, Response = R>>,
tx: Option<oneshot::Sender<R>>,
}
impl<S, R> BoxedFutureEnvelope<S, R>
where
S: Service,
R: Send + 'static,
{
pub(crate) fn new(
producer: Box<dyn FutureProducer<S, Response = R>>,
tx: Option<oneshot::Sender<R>>,
) -> Box<BoxedFutureEnvelope<S, R>> {
Box::new(BoxedFutureEnvelope { producer, tx })
}
}
impl<S, R> EnvelopeProxy<S> for BoxedFutureEnvelope<S, R>
where
S: Service,
R: Send + 'static,
{
fn handle<'a>(
self: Box<Self>,
service: &'a mut S,
ctx: &'a mut ServiceContext<S>,
) -> ServiceAction<'a> {
let fut = self.producer.produce_boxed(service, ctx);
ServiceAction::Execute(ExecuteFuture::wrap(fut, self.tx))
}
}
pub(crate) struct FutureEnvelope<S, P>
where
S: Service,
P: FutureProducer<S>,
{
producer: P,
tx: Option<oneshot::Sender<P::Response>>,
}
impl<S, P> FutureEnvelope<S, P>
where
S: Service,
P: FutureProducer<S>,
{
pub(crate) fn new(
producer: P,
tx: Option<oneshot::Sender<P::Response>>,
) -> Box<FutureEnvelope<S, P>> {
Box::new(FutureEnvelope { producer, tx })
}
}
impl<S, P> EnvelopeProxy<S> for FutureEnvelope<S, P>
where
S: Service,
P: FutureProducer<S>,
{
fn handle<'a>(
self: Box<Self>,
service: &'a mut S,
ctx: &'a mut ServiceContext<S>,
) -> ServiceAction<'a> {
let fut = self.producer.produce(service, ctx);
ServiceAction::Execute(ExecuteFuture::wrap(fut, self.tx))
}
}
pub(crate) struct StreamEnvelope<M>(M);
impl<M> StreamEnvelope<M> {
pub fn new(msg: M) -> Box<StreamEnvelope<M>> {
Box::new(StreamEnvelope(msg))
}
}
impl<S, M> EnvelopeProxy<S> for StreamEnvelope<M>
where
S: StreamHandler<M>,
S: Service,
M: Send,
{
fn handle<'a>(
self: Box<Self>,
service: &'a mut S,
ctx: &'a mut ServiceContext<S>,
) -> ServiceAction<'a> {
service.handle(self.0, ctx);
ServiceAction::Continue
}
}
pub(crate) struct ErrorEnvelope<M>(M);
impl<M> ErrorEnvelope<M> {
pub(crate) fn new(error: M) -> Box<ErrorEnvelope<M>> {
Box::new(ErrorEnvelope(error))
}
}
impl<S, M> EnvelopeProxy<S> for ErrorEnvelope<M>
where
S: Service + ErrorHandler<M>,
M: Send,
{
fn handle<'a>(
self: Box<Self>,
service: &'a mut S,
ctx: &'a mut ServiceContext<S>,
) -> ServiceAction<'a> {
match service.handle(self.0, ctx) {
ErrorAction::Continue => ServiceAction::Continue,
ErrorAction::Stop => ServiceAction::Stop,
}
}
}
pub(crate) struct StopEnvelope;
impl<S> EnvelopeProxy<S> for StopEnvelope
where
S: Service,
{
fn handle<'a>(
self: Box<Self>,
_service: &'a mut S,
_ctx: &'a mut ServiceContext<S>,
) -> ServiceAction<'a> {
ServiceAction::Stop
}
}