exc_service/
lib.rs

1#![deny(missing_docs)]
2
3//! Define the [`Request`] and [`ExcService`] traits, and provide some useful helper traits.
4
5use futures::{future::BoxFuture, FutureExt};
6use std::marker::PhantomData;
7use tower::{Layer, Service, ServiceExt};
8use traits::IntoService;
9
10/// Exchange Error.
11pub mod error;
12
13/// Layer.
14pub mod layer;
15
16/// Traits.
17pub mod traits;
18
19/// The adapt layer.
20pub mod adapt;
21
22#[cfg(feature = "retry")]
23/// Retry utils.
24pub mod retry;
25
26pub use layer::ExcLayer;
27pub use {
28    adapt::Adaptor,
29    traits::{BoxCloneExcService, BoxExcService, ExcService, ExcServiceExt, IntoExc, Request},
30};
31
32use self::adapt::{Adapt, AdaptLayer, AdaptService};
33pub use self::error::ExchangeError;
34
35#[cfg(feature = "send")]
36pub use self::traits::send::SendExcService;
37
38/// The core service wrapper of this crate, which implements
39/// [`ExcService<T>`] *if* the request type of the underlying
40/// service implements [`Adaptor<T>`].
41///
42/// With the help of [`Exc`], we can use a single type to represent
43/// all the services that an exchange can provide.
44///
45/// For example, let `Exchange` be an api endpoint implementation of a exchange,
46/// which implements [`Service<R>`], where `R` is the request type of the api endpoint.
47/// Then `Exc<Exchange, R>` will implement `Service<SubscribeTickers>` and
48/// `Service<PlaceOrder>`, as long as `R` implements both `Adaptor<SubscribeTickers>`
49/// and `Adaptor<PlaceOrder>`.
50#[derive(Debug)]
51pub struct Exc<C, Req> {
52    channel: C,
53    _req: PhantomData<fn() -> Req>,
54}
55
56impl<C, Req> Clone for Exc<C, Req>
57where
58    C: Clone,
59{
60    fn clone(&self) -> Self {
61        Self {
62            channel: self.channel.clone(),
63            _req: PhantomData,
64        }
65    }
66}
67
68impl<C, Req> Exc<C, Req> {
69    /// Into the inner channel.
70    #[inline]
71    pub fn into_inner(self) -> C {
72        self.channel
73    }
74}
75
76impl<C, Req> Exc<C, Req>
77where
78    Req: Request,
79    C: ExcService<Req>,
80{
81    /// Create from the given [`ExcService`].
82    pub fn new(service: C) -> Self {
83        Self {
84            channel: service,
85            _req: PhantomData,
86        }
87    }
88
89    /// Make a request using the underlying channel directly.
90    pub async fn request(&mut self, request: Req) -> Result<Req::Response, ExchangeError> {
91        ServiceExt::<Req>::oneshot(self.channel.as_service(), request).await
92    }
93
94    /// Apply rate-limit layer to the channel.
95    #[cfg(feature = "limit")]
96    pub fn into_rate_limited(
97        self,
98        num: u64,
99        per: std::time::Duration,
100    ) -> Exc<tower::limit::RateLimit<IntoService<C, Req>>, Req> {
101        use tower::limit::RateLimitLayer;
102        self.into_layered(&RateLimitLayer::new(num, per))
103    }
104
105    #[cfg(feature = "retry")]
106    /// Apply retry layer to the channel.
107    pub fn into_retry(
108        self,
109        max_duration: std::time::Duration,
110    ) -> Exc<tower::retry::Retry<crate::retry::Always, IntoService<C, Req>>, Req>
111    where
112        Req: Clone,
113        C: Clone,
114    {
115        use crate::retry::Always;
116        use tower::retry::RetryLayer;
117
118        self.into_layered(&RetryLayer::new(Always::with_max_duration(max_duration)))
119    }
120
121    /// Adapt the request type of the underlying channel to the target type `R`.
122    pub fn into_adapted<R>(self) -> Exc<Adapt<IntoService<C, Req>, Req, R>, R>
123    where
124        R: Request,
125        IntoService<C, Req>: AdaptService<Req, R>,
126    {
127        self.into_layered(&AdaptLayer::default())
128    }
129
130    /// Apply a layer to the underlying channel.
131    pub fn into_layered<T, R>(self, layer: &T) -> Exc<T::Service, R>
132    where
133        T: Layer<IntoService<C, Req>>,
134        R: Request,
135        T::Service: ExcService<R>,
136    {
137        Exc {
138            channel: layer.layer(self.channel.into_service()),
139            _req: PhantomData,
140        }
141    }
142}
143
144impl<C, Req, R> Service<R> for Exc<C, Req>
145where
146    R: Request,
147    R::Response: Send + 'static,
148    Req: Adaptor<R>,
149    C: ExcService<Req>,
150    C::Future: Send + 'static,
151{
152    type Response = R::Response;
153    type Error = ExchangeError;
154    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
155
156    fn poll_ready(
157        &mut self,
158        cx: &mut std::task::Context<'_>,
159    ) -> std::task::Poll<Result<(), Self::Error>> {
160        self.channel.poll_ready(cx)
161    }
162
163    fn call(&mut self, req: R) -> Self::Future {
164        let request = Req::from_request(req);
165        match request {
166            Ok(req) => {
167                let res = self.channel.call(req);
168                async move {
169                    let resp = res.await?;
170                    let resp = Req::into_response(resp)?;
171                    Ok(resp)
172                }
173                .left_future()
174            }
175            Err(err) => futures::future::ready(Err(err)).right_future(),
176        }
177        .boxed()
178    }
179}