1#![deny(missing_docs)]
2
3use futures::{future::BoxFuture, FutureExt};
6use std::marker::PhantomData;
7use tower::{Layer, Service, ServiceExt};
8use traits::IntoService;
9
10pub mod error;
12
13pub mod layer;
15
16pub mod traits;
18
19pub mod adapt;
21
22#[cfg(feature = "retry")]
23pub mod retry;
25
26pub use layer::ExcLayer;
27pub use {
28 adapt::Adaptor,
29 traits::{BoxCloneExcService, BoxExcService, ExcService, ExcServiceExt, IntoExc, Request},
30};
31
32use self::adapt::{Adapt, AdaptLayer, AdaptService};
33pub use self::error::ExchangeError;
34
35#[cfg(feature = "send")]
36pub use self::traits::send::SendExcService;
37
38#[derive(Debug)]
51pub struct Exc<C, Req> {
52 channel: C,
53 _req: PhantomData<fn() -> Req>,
54}
55
56impl<C, Req> Clone for Exc<C, Req>
57where
58 C: Clone,
59{
60 fn clone(&self) -> Self {
61 Self {
62 channel: self.channel.clone(),
63 _req: PhantomData,
64 }
65 }
66}
67
68impl<C, Req> Exc<C, Req> {
69 #[inline]
71 pub fn into_inner(self) -> C {
72 self.channel
73 }
74}
75
76impl<C, Req> Exc<C, Req>
77where
78 Req: Request,
79 C: ExcService<Req>,
80{
81 pub fn new(service: C) -> Self {
83 Self {
84 channel: service,
85 _req: PhantomData,
86 }
87 }
88
89 pub async fn request(&mut self, request: Req) -> Result<Req::Response, ExchangeError> {
91 ServiceExt::<Req>::oneshot(self.channel.as_service(), request).await
92 }
93
94 #[cfg(feature = "limit")]
96 pub fn into_rate_limited(
97 self,
98 num: u64,
99 per: std::time::Duration,
100 ) -> Exc<tower::limit::RateLimit<IntoService<C, Req>>, Req> {
101 use tower::limit::RateLimitLayer;
102 self.into_layered(&RateLimitLayer::new(num, per))
103 }
104
105 #[cfg(feature = "retry")]
106 pub fn into_retry(
108 self,
109 max_duration: std::time::Duration,
110 ) -> Exc<tower::retry::Retry<crate::retry::Always, IntoService<C, Req>>, Req>
111 where
112 Req: Clone,
113 C: Clone,
114 {
115 use crate::retry::Always;
116 use tower::retry::RetryLayer;
117
118 self.into_layered(&RetryLayer::new(Always::with_max_duration(max_duration)))
119 }
120
121 pub fn into_adapted<R>(self) -> Exc<Adapt<IntoService<C, Req>, Req, R>, R>
123 where
124 R: Request,
125 IntoService<C, Req>: AdaptService<Req, R>,
126 {
127 self.into_layered(&AdaptLayer::default())
128 }
129
130 pub fn into_layered<T, R>(self, layer: &T) -> Exc<T::Service, R>
132 where
133 T: Layer<IntoService<C, Req>>,
134 R: Request,
135 T::Service: ExcService<R>,
136 {
137 Exc {
138 channel: layer.layer(self.channel.into_service()),
139 _req: PhantomData,
140 }
141 }
142}
143
144impl<C, Req, R> Service<R> for Exc<C, Req>
145where
146 R: Request,
147 R::Response: Send + 'static,
148 Req: Adaptor<R>,
149 C: ExcService<Req>,
150 C::Future: Send + 'static,
151{
152 type Response = R::Response;
153 type Error = ExchangeError;
154 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
155
156 fn poll_ready(
157 &mut self,
158 cx: &mut std::task::Context<'_>,
159 ) -> std::task::Poll<Result<(), Self::Error>> {
160 self.channel.poll_ready(cx)
161 }
162
163 fn call(&mut self, req: R) -> Self::Future {
164 let request = Req::from_request(req);
165 match request {
166 Ok(req) => {
167 let res = self.channel.call(req);
168 async move {
169 let resp = res.await?;
170 let resp = Req::into_response(resp)?;
171 Ok(resp)
172 }
173 .left_future()
174 }
175 Err(err) => futures::future::ready(Err(err)).right_future(),
176 }
177 .boxed()
178 }
179}