tarpc_lib/client/
mod.rs

1// Copyright 2018 Google LLC
2//
3// Use of this source code is governed by an MIT-style
4// license that can be found in the LICENSE file or at
5// https://opensource.org/licenses/MIT.
6
7//! Provides a client that connects to a server and sends multiplexed requests.
8
9use crate::context;
10use futures::prelude::*;
11use std::io;
12
13/// Provides a [`Client`] backed by a transport.
14pub mod channel;
15pub use channel::{new, Channel};
16
17/// Sends multiplexed requests to, and receives responses from, a server.
18pub trait Client<'a, Req> {
19    /// The response type.
20    type Response;
21
22    /// The future response.
23    type Future: Future<Output = io::Result<Self::Response>> + 'a;
24
25    /// Initiates a request, sending it to the dispatch task.
26    ///
27    /// Returns a [`Future`] that resolves to this client and the future response
28    /// once the request is successfully enqueued.
29    ///
30    /// [`Future`]: futures::Future
31    fn call(&'a mut self, ctx: context::Context, request: Req) -> Self::Future;
32
33    /// Returns a Client that applies a post-processing function to the returned response.
34    fn map_response<F, R>(self, f: F) -> MapResponse<Self, F>
35    where
36        F: FnMut(Self::Response) -> R,
37        Self: Sized,
38    {
39        MapResponse { inner: self, f }
40    }
41
42    /// Returns a Client that applies a pre-processing function to the request.
43    fn with_request<F, Req2>(self, f: F) -> WithRequest<Self, F>
44    where
45        F: FnMut(Req2) -> Req,
46        Self: Sized,
47    {
48        WithRequest { inner: self, f }
49    }
50}
51
52/// A Client that applies a function to the returned response.
53#[derive(Clone, Debug)]
54pub struct MapResponse<C, F> {
55    inner: C,
56    f: F,
57}
58
59impl<'a, C, F, Req, Resp, Resp2> Client<'a, Req> for MapResponse<C, F>
60where
61    C: Client<'a, Req, Response = Resp>,
62    F: FnMut(Resp) -> Resp2 + 'a,
63{
64    type Response = Resp2;
65    type Future = futures::future::MapOk<<C as Client<'a, Req>>::Future, &'a mut F>;
66
67    fn call(&'a mut self, ctx: context::Context, request: Req) -> Self::Future {
68        self.inner.call(ctx, request).map_ok(&mut self.f)
69    }
70}
71
72/// A Client that applies a pre-processing function to the request.
73#[derive(Clone, Debug)]
74pub struct WithRequest<C, F> {
75    inner: C,
76    f: F,
77}
78
79impl<'a, C, F, Req, Req2, Resp> Client<'a, Req2> for WithRequest<C, F>
80where
81    C: Client<'a, Req, Response = Resp>,
82    F: FnMut(Req2) -> Req,
83{
84    type Response = Resp;
85    type Future = <C as Client<'a, Req>>::Future;
86
87    fn call(&'a mut self, ctx: context::Context, request: Req2) -> Self::Future {
88        self.inner.call(ctx, (self.f)(request))
89    }
90}
91
92impl<'a, Req, Resp> Client<'a, Req> for Channel<Req, Resp>
93where
94    Req: 'a,
95    Resp: 'a,
96{
97    type Response = Resp;
98    type Future = channel::Call<'a, Req, Resp>;
99
100    fn call(&'a mut self, ctx: context::Context, request: Req) -> channel::Call<'a, Req, Resp> {
101        self.call(ctx, request)
102    }
103}
104
105/// Settings that control the behavior of the client.
106#[non_exhaustive]
107#[derive(Clone, Debug)]
108pub struct Config {
109    /// The number of requests that can be in flight at once.
110    /// `max_in_flight_requests` controls the size of the map used by the client
111    /// for storing pending requests.
112    pub max_in_flight_requests: usize,
113    /// The number of requests that can be buffered client-side before being sent.
114    /// `pending_requests_buffer` controls the size of the channel clients use
115    /// to communicate with the request dispatch task.
116    pub pending_request_buffer: usize,
117}
118
119impl Default for Config {
120    fn default() -> Self {
121        Config {
122            max_in_flight_requests: 1_000,
123            pending_request_buffer: 100,
124        }
125    }
126}
127
128/// A channel and dispatch pair. The dispatch drives the sending and receiving of requests
129/// and must be polled continuously or spawned.
130#[derive(Debug)]
131pub struct NewClient<C, D> {
132    /// The new client.
133    pub client: C,
134    /// The client's dispatch.
135    pub dispatch: D,
136}
137
138impl<C, D> NewClient<C, D>
139where
140    D: Future<Output = io::Result<()>> + Send + 'static,
141{
142    /// Helper method to spawn the dispatch on the default executor.
143    #[cfg(feature = "tokio1")]
144    pub fn spawn(self) -> io::Result<C> {
145        use log::error;
146
147        let dispatch = self
148            .dispatch
149            .unwrap_or_else(move |e| error!("Connection broken: {}", e));
150        tokio::spawn(dispatch);
151        Ok(self.client)
152    }
153}