1use crate::context;
10use futures::prelude::*;
11use std::io;
12
13pub mod channel;
15pub use channel::{new, Channel};
16
17pub trait Client<'a, Req> {
19 type Response;
21
22 type Future: Future<Output = io::Result<Self::Response>> + 'a;
24
25 fn call(&'a mut self, ctx: context::Context, request: Req) -> Self::Future;
32
33 fn map_response<F, R>(self, f: F) -> MapResponse<Self, F>
35 where
36 F: FnMut(Self::Response) -> R,
37 Self: Sized,
38 {
39 MapResponse { inner: self, f }
40 }
41
42 fn with_request<F, Req2>(self, f: F) -> WithRequest<Self, F>
44 where
45 F: FnMut(Req2) -> Req,
46 Self: Sized,
47 {
48 WithRequest { inner: self, f }
49 }
50}
51
52#[derive(Clone, Debug)]
54pub struct MapResponse<C, F> {
55 inner: C,
56 f: F,
57}
58
59impl<'a, C, F, Req, Resp, Resp2> Client<'a, Req> for MapResponse<C, F>
60where
61 C: Client<'a, Req, Response = Resp>,
62 F: FnMut(Resp) -> Resp2 + 'a,
63{
64 type Response = Resp2;
65 type Future = futures::future::MapOk<<C as Client<'a, Req>>::Future, &'a mut F>;
66
67 fn call(&'a mut self, ctx: context::Context, request: Req) -> Self::Future {
68 self.inner.call(ctx, request).map_ok(&mut self.f)
69 }
70}
71
72#[derive(Clone, Debug)]
74pub struct WithRequest<C, F> {
75 inner: C,
76 f: F,
77}
78
79impl<'a, C, F, Req, Req2, Resp> Client<'a, Req2> for WithRequest<C, F>
80where
81 C: Client<'a, Req, Response = Resp>,
82 F: FnMut(Req2) -> Req,
83{
84 type Response = Resp;
85 type Future = <C as Client<'a, Req>>::Future;
86
87 fn call(&'a mut self, ctx: context::Context, request: Req2) -> Self::Future {
88 self.inner.call(ctx, (self.f)(request))
89 }
90}
91
92impl<'a, Req, Resp> Client<'a, Req> for Channel<Req, Resp>
93where
94 Req: 'a,
95 Resp: 'a,
96{
97 type Response = Resp;
98 type Future = channel::Call<'a, Req, Resp>;
99
100 fn call(&'a mut self, ctx: context::Context, request: Req) -> channel::Call<'a, Req, Resp> {
101 self.call(ctx, request)
102 }
103}
104
105#[non_exhaustive]
107#[derive(Clone, Debug)]
108pub struct Config {
109 pub max_in_flight_requests: usize,
113 pub pending_request_buffer: usize,
117}
118
119impl Default for Config {
120 fn default() -> Self {
121 Config {
122 max_in_flight_requests: 1_000,
123 pending_request_buffer: 100,
124 }
125 }
126}
127
128#[derive(Debug)]
131pub struct NewClient<C, D> {
132 pub client: C,
134 pub dispatch: D,
136}
137
138impl<C, D> NewClient<C, D>
139where
140 D: Future<Output = io::Result<()>> + Send + 'static,
141{
142 #[cfg(feature = "tokio1")]
144 pub fn spawn(self) -> io::Result<C> {
145 use log::error;
146
147 let dispatch = self
148 .dispatch
149 .unwrap_or_else(move |e| error!("Connection broken: {}", e));
150 tokio::spawn(dispatch);
151 Ok(self.client)
152 }
153}