use crate::error::{BoxDynError, Error};
use crate::request::Request;
use crate::response::Response;
use futures::channel::mpsc::{SendError, Sender};
use futures::SinkExt;
use futures::{future::BoxFuture, Future, FutureExt};
use serde::Serialize;
use std::marker::PhantomData;
use std::{fmt, sync::Arc};
pub use tower::{
layer::layer_fn, layer::util::Identity, util::BoxCloneService, Layer, Service, ServiceBuilder,
};
pub struct CommonLayer<In, T, U, E> {
boxed: Arc<dyn Layer<In, Service = BoxCloneService<T, U, E>>>,
}
impl<In, T, U, E> CommonLayer<In, T, U, E> {
pub fn new<L>(inner_layer: L) -> Self
where
L: Layer<In> + 'static,
L::Service: Service<T, Response = U, Error = E> + Send + 'static + Clone,
<L::Service as Service<T>>::Future: Send + 'static,
E: std::error::Error,
{
let layer = layer_fn(move |inner: In| {
let out = inner_layer.layer(inner);
BoxCloneService::new(out)
});
Self {
boxed: Arc::new(layer),
}
}
}
impl<In, T, U, E> Layer<In> for CommonLayer<In, T, U, E> {
type Service = BoxCloneService<T, U, E>;
fn layer(&self, inner: In) -> Self::Service {
self.boxed.layer(inner)
}
}
impl<In, T, U, E> Clone for CommonLayer<In, T, U, E> {
fn clone(&self) -> Self {
Self {
boxed: Arc::clone(&self.boxed),
}
}
}
impl<In, T, U, E> fmt::Debug for CommonLayer<In, T, U, E> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("CommonLayer").finish()
}
}
pub mod extensions {
use std::{
ops::Deref,
task::{Context, Poll},
};
use tower::Service;
use crate::request::Request;
#[derive(Debug, Clone, Copy)]
pub struct Data<T>(T);
impl<T> Data<T> {
pub fn new(inner: T) -> Data<T> {
Data(inner)
}
}
impl<T> Deref for Data<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<S, T> tower::Layer<S> for Data<T>
where
T: Clone + Send + Sync + 'static,
{
type Service = AddExtension<S, T>;
fn layer(&self, inner: S) -> Self::Service {
AddExtension {
inner,
value: self.0.clone(),
}
}
}
#[derive(Clone, Copy, Debug)]
pub struct AddExtension<S, T> {
inner: S,
value: T,
}
impl<S, T, Req, Ctx> Service<Request<Req, Ctx>> for AddExtension<S, T>
where
S: Service<Request<Req, Ctx>>,
T: Clone + Send + Sync + 'static,
{
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;
#[inline]
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, mut req: Request<Req, Ctx>) -> Self::Future {
req.parts.data.insert(self.value.clone());
self.inner.call(req)
}
}
}
pub trait Ack<Task, Res> {
type Context;
type AckError: std::error::Error;
fn ack(
&mut self,
ctx: &Self::Context,
response: &Response<Res>,
) -> impl Future<Output = Result<(), Self::AckError>> + Send;
}
impl<T, Res: Clone + Send + Sync, Ctx: Clone + Send + Sync> Ack<T, Res>
for Sender<(Ctx, Response<Res>)>
{
type AckError = SendError;
type Context = Ctx;
async fn ack(
&mut self,
ctx: &Self::Context,
result: &Response<Res>,
) -> Result<(), Self::AckError> {
let ctx = ctx.clone();
self.send((ctx, result.clone())).await.unwrap();
Ok(())
}
}
#[derive(Debug)]
pub struct AckLayer<A, Req, Ctx, Res> {
ack: A,
job_type: PhantomData<Request<Req, Ctx>>,
res: PhantomData<Res>,
}
impl<A, Req, Ctx, Res> AckLayer<A, Req, Ctx, Res> {
pub fn new(ack: A) -> Self {
Self {
ack,
job_type: PhantomData,
res: PhantomData,
}
}
}
impl<A, Req, Ctx, S, Res> Layer<S> for AckLayer<A, Req, Ctx, Res>
where
S: Service<Request<Req, Ctx>> + Send + 'static,
S::Error: std::error::Error + Send + Sync + 'static,
S::Future: Send + 'static,
A: Ack<Req, S::Response> + Clone + Send + Sync + 'static,
{
type Service = AckService<S, A, Req, Ctx, S::Response>;
fn layer(&self, service: S) -> Self::Service {
AckService {
service,
ack: self.ack.clone(),
job_type: PhantomData,
res: PhantomData,
}
}
}
#[derive(Debug)]
pub struct AckService<SV, A, Req, Ctx, Res> {
service: SV,
ack: A,
job_type: PhantomData<Request<Req, Ctx>>,
res: PhantomData<Res>,
}
impl<Sv: Clone, A: Clone, Req, Ctx, Res> Clone for AckService<Sv, A, Req, Ctx, Res> {
fn clone(&self) -> Self {
Self {
ack: self.ack.clone(),
job_type: PhantomData,
service: self.service.clone(),
res: PhantomData,
}
}
}
impl<SV, A, Req, Res, Ctx> Service<Request<Req, Ctx>> for AckService<SV, A, Req, Ctx, Res>
where
SV: Service<Request<Req, Ctx>> + Send + Sync + 'static,
<SV as Service<Request<Req, Ctx>>>::Error: Into<BoxDynError> + Send + Sync + 'static,
<SV as Service<Request<Req, Ctx>>>::Future: std::marker::Send + 'static,
A: Ack<Req, <SV as Service<Request<Req, Ctx>>>::Response, Context = Ctx>
+ Send
+ 'static
+ Clone
+ Send
+ Sync,
Req: 'static + Send,
<SV as Service<Request<Req, Ctx>>>::Response: std::marker::Send + fmt::Debug + Sync + Serialize,
<A as Ack<Req, SV::Response>>::Context: Sync + Send + Clone,
<A as Ack<Req, <SV as Service<Request<Req, Ctx>>>::Response>>::Context: 'static,
Ctx: Clone,
{
type Response = SV::Response;
type Error = Error;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
fn poll_ready(
&mut self,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
self.service
.poll_ready(cx)
.map_err(|e| Error::Failed(Arc::new(e.into())))
}
fn call(&mut self, request: Request<Req, Ctx>) -> Self::Future {
let mut ack = self.ack.clone();
let ctx = request.parts.context.clone();
let attempt = request.parts.attempt.clone();
let task_id = request.parts.task_id.clone();
let fut = self.service.call(request);
let fut_with_ack = async move {
let res = fut.await.map_err(|err| {
let e: BoxDynError = err.into();
if let Some(custom_error) = e.downcast_ref::<Error>() {
return custom_error.clone();
}
Error::Failed(Arc::new(e))
});
let response = Response {
attempt,
inner: res,
task_id,
_priv: (),
};
if let Err(_e) = ack.ack(&ctx, &response).await {
}
response.inner
};
fut_with_ack.boxed()
}
}