#![deny(missing_docs)]
use futures::{future::BoxFuture, FutureExt};
use std::marker::PhantomData;
use tower::{Layer, Service, ServiceExt};
use traits::IntoService;
pub mod error;
pub mod layer;
pub mod traits;
pub mod adapt;
#[cfg(feature = "retry")]
pub mod retry;
pub use layer::ExcLayer;
pub use {
adapt::Adaptor,
traits::{BoxCloneExcService, BoxExcService, ExcService, ExcServiceExt, IntoExc, Request},
};
use self::adapt::{Adapt, AdaptLayer, AdaptService};
pub use self::error::ExchangeError;
#[cfg(feature = "send")]
pub use self::traits::send::SendExcService;
#[derive(Debug)]
pub struct Exc<C, Req> {
channel: C,
_req: PhantomData<fn() -> Req>,
}
impl<C, Req> Clone for Exc<C, Req>
where
C: Clone,
{
fn clone(&self) -> Self {
Self {
channel: self.channel.clone(),
_req: PhantomData,
}
}
}
impl<C, Req> Exc<C, Req> {
#[inline]
pub fn into_inner(self) -> C {
self.channel
}
}
impl<C, Req> Exc<C, Req>
where
Req: Request,
C: ExcService<Req>,
{
pub fn new(service: C) -> Self {
Self {
channel: service,
_req: PhantomData,
}
}
pub async fn request(&mut self, request: Req) -> Result<Req::Response, ExchangeError> {
ServiceExt::<Req>::oneshot(self.channel.as_service(), request).await
}
#[cfg(feature = "limit")]
pub fn into_rate_limited(
self,
num: u64,
per: std::time::Duration,
) -> Exc<tower::limit::RateLimit<IntoService<C, Req>>, Req> {
use tower::limit::RateLimitLayer;
self.into_layered(&RateLimitLayer::new(num, per))
}
#[cfg(feature = "retry")]
pub fn into_retry(
self,
max_duration: std::time::Duration,
) -> Exc<tower::retry::Retry<crate::retry::Always, IntoService<C, Req>>, Req>
where
Req: Clone,
C: Clone,
{
use crate::retry::Always;
use tower::retry::RetryLayer;
self.into_layered(&RetryLayer::new(Always::with_max_duration(max_duration)))
}
pub fn into_adapted<R>(self) -> Exc<Adapt<IntoService<C, Req>, Req, R>, R>
where
R: Request,
IntoService<C, Req>: AdaptService<Req, R>,
{
self.into_layered(&AdaptLayer::default())
}
pub fn into_layered<T, R>(self, layer: &T) -> Exc<T::Service, R>
where
T: Layer<IntoService<C, Req>>,
R: Request,
T::Service: ExcService<R>,
{
Exc {
channel: layer.layer(self.channel.into_service()),
_req: PhantomData,
}
}
}
impl<C, Req, R> Service<R> for Exc<C, Req>
where
R: Request,
R::Response: Send + 'static,
Req: Adaptor<R>,
C: ExcService<Req>,
C::Future: Send + 'static,
{
type Response = R::Response;
type Error = ExchangeError;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
fn poll_ready(
&mut self,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
self.channel.poll_ready(cx)
}
fn call(&mut self, req: R) -> Self::Future {
let request = Req::from_request(req);
match request {
Ok(req) => {
let res = self.channel.call(req);
async move {
let resp = res.await?;
let resp = Req::into_response(resp)?;
Ok(resp)
}
.left_future()
}
Err(err) => futures::future::ready(Err(err)).right_future(),
}
.boxed()
}
}