use std::{
convert::Infallible,
future::{Future, Ready},
marker::PhantomData,
pin::Pin,
task::{Context, Poll},
};
use futures_util::ready;
use pin_project_lite::pin_project;
use tower::{layer::util::Stack, Layer, Service};
use tracing::error;
use crate::{
body::BoxBody,
plugin::Plugin,
request::{FromParts, FromRequest},
response::IntoResponse,
routing::Route,
runtime_error::InternalFailureException,
};
use super::{Operation, OperationError, OperationShape};
#[derive(Debug, Clone)]
pub struct UpgradeLayer<Protocol, Operation, Exts> {
_protocol: PhantomData<Protocol>,
_operation: PhantomData<Operation>,
_exts: PhantomData<Exts>,
}
impl<P, Op, E> Default for UpgradeLayer<P, Op, E> {
fn default() -> Self {
Self {
_protocol: PhantomData,
_operation: PhantomData,
_exts: PhantomData,
}
}
}
impl<Protocol, Operation, Exts> UpgradeLayer<Protocol, Operation, Exts> {
pub fn new() -> Self {
Self::default()
}
}
impl<S, P, Op, E> Layer<S> for UpgradeLayer<P, Op, E> {
type Service = Upgrade<P, Op, E, S>;
fn layer(&self, inner: S) -> Self::Service {
Upgrade {
_protocol: PhantomData,
_operation: PhantomData,
_exts: PhantomData,
inner,
}
}
}
pub struct Upgrade<Protocol, Operation, Exts, S> {
_protocol: PhantomData<Protocol>,
_operation: PhantomData<Operation>,
_exts: PhantomData<Exts>,
inner: S,
}
impl<P, Op, E, S> Clone for Upgrade<P, Op, E, S>
where
S: Clone,
{
fn clone(&self) -> Self {
Self {
_protocol: PhantomData,
_operation: PhantomData,
_exts: PhantomData,
inner: self.inner.clone(),
}
}
}
pin_project! {
#[project = InnerProj]
#[project_replace = InnerProjReplace]
enum Inner<FromFut, HandlerFut> {
FromRequest {
#[pin]
inner: FromFut
},
Inner {
#[pin]
call: HandlerFut
}
}
}
type InnerAlias<Input, Exts, Protocol, B, Fut> = Inner<<(Input, Exts) as FromRequest<Protocol, B>>::Future, Fut>;
pin_project! {
pub struct UpgradeFuture<Protocol, Operation, Exts, B, S>
where
Operation: OperationShape,
(Operation::Input, Exts): FromRequest<Protocol, B>,
S: Service<(Operation::Input, Exts)>,
{
service: S,
#[pin]
inner: InnerAlias<Operation::Input, Exts, Protocol, B, S::Future>
}
}
impl<P, Op, Exts, B, S, PollError, OpError> Future for UpgradeFuture<P, Op, Exts, B, S>
where
Op: OperationShape,
Op::Input: FromRequest<P, B>,
Op::Output: IntoResponse<P>,
OpError: IntoResponse<P>,
Exts: FromParts<P>,
S: Service<(Op::Input, Exts), Response = Op::Output, Error = OperationError<OpError, PollError>>,
{
type Output = Result<http::Response<crate::body::BoxBody>, PollError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
let mut this = self.as_mut().project();
let this2 = this.inner.as_mut().project();
let call = match this2 {
InnerProj::FromRequest { inner } => {
let result = ready!(inner.poll(cx));
match result {
Ok(ok) => this.service.call(ok),
Err(err) => return Poll::Ready(Ok(err.into_response())),
}
}
InnerProj::Inner { call } => {
let result = ready!(call.poll(cx));
let output = match result {
Ok(ok) => ok.into_response(),
Err(OperationError::Model(err)) => err.into_response(),
Err(OperationError::PollReady(_)) => {
unreachable!("poll error should not be raised")
}
};
return Poll::Ready(Ok(output));
}
};
this.inner.as_mut().project_replace(Inner::Inner { call });
}
}
}
impl<P, Op, Exts, B, S, PollError, OpError> Service<http::Request<B>> for Upgrade<P, Op, Exts, S>
where
Op: OperationShape,
Op::Input: FromRequest<P, B>,
Op::Output: IntoResponse<P>,
OpError: IntoResponse<P>,
Exts: FromParts<P>,
S: Service<(Op::Input, Exts), Response = Op::Output, Error = OperationError<OpError, PollError>> + Clone,
{
type Response = http::Response<crate::body::BoxBody>;
type Error = PollError;
type Future = UpgradeFuture<P, Op, Exts, B, S>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx).map_err(|err| match err {
OperationError::PollReady(err) => err,
OperationError::Model(_) => unreachable!("operation error should not be raised"),
})
}
fn call(&mut self, req: http::Request<B>) -> Self::Future {
let clone = self.inner.clone();
let service = std::mem::replace(&mut self.inner, clone);
UpgradeFuture {
service,
inner: Inner::FromRequest {
inner: <(Op::Input, Exts) as FromRequest<P, B>>::from_request(req),
},
}
}
}
pub trait Upgradable<Protocol, Operation, Exts, B, Plugin> {
fn upgrade(self, plugin: &Plugin) -> Route<B>;
}
type UpgradedService<Pl, P, Op, Exts, S, L> =
<<Pl as Plugin<P, Op, S, L>>::Layer as Layer<Upgrade<P, Op, Exts, <Pl as Plugin<P, Op, S, L>>::Service>>>::Service;
impl<P, Op, Exts, B, Pl, S, L, PollError> Upgradable<P, Op, Exts, B, Pl> for Operation<S, L>
where
Op: OperationShape,
Op::Input: FromRequest<P, B>,
Op::Output: IntoResponse<P>,
Op::Error: IntoResponse<P>,
Exts: FromParts<P>,
Pl::Service:
Service<(Op::Input, Exts), Response = Op::Output, Error = OperationError<Op::Error, PollError>> + Clone,
Pl: Plugin<P, Op, S, L>,
Pl::Layer: Layer<Upgrade<P, Op, Exts, Pl::Service>>,
UpgradedService<Pl, P, Op, Exts, S, L>:
Service<http::Request<B>, Response = http::Response<BoxBody>, Error = Infallible> + Clone + Send + 'static,
<UpgradedService<Pl, P, Op, Exts, S, L> as Service<http::Request<B>>>::Future: Send + 'static,
{
fn upgrade(self, plugin: &Pl) -> Route<B> {
let mapped = plugin.map(self);
let layer = Stack::new(UpgradeLayer::new(), mapped.layer);
Route::new(layer.layer(mapped.inner))
}
}
pub struct FailOnMissingOperation;
impl<P, Op, Exts, B, Pl> Upgradable<P, Op, Exts, B, Pl> for FailOnMissingOperation
where
InternalFailureException: IntoResponse<P>,
P: 'static,
{
fn upgrade(self, _plugin: &Pl) -> Route<B> {
Route::new(MissingFailure { _protocol: PhantomData })
}
}
#[derive(Copy)]
pub struct MissingFailure<P> {
_protocol: PhantomData<fn(P)>,
}
impl<P> Clone for MissingFailure<P> {
fn clone(&self) -> Self {
MissingFailure { _protocol: PhantomData }
}
}
impl<R, P> Service<R> for MissingFailure<P>
where
InternalFailureException: IntoResponse<P>,
{
type Response = http::Response<BoxBody>;
type Error = Infallible;
type Future = Ready<Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, _request: R) -> Self::Future {
error!("the operation has not been set");
std::future::ready(Ok(InternalFailureException.into_response()))
}
}