tonic_rustls/channel/
mod.rs

1//! Client implementation and builder.
2
3mod endpoint;
4pub(crate) mod service;
5
6pub use endpoint::Endpoint;
7
8use self::service::{Connection, DynamicServiceStream, Executor, SharedExec};
9use tonic::body::Body;
10use bytes::Bytes;
11use http::{
12    uri::{InvalidUri, Uri},
13    Request, Response,
14};
15use hyper_util::client::legacy::connect::Connection as HyperConnection;
16use std::{
17    fmt,
18    future::Future,
19    hash::Hash,
20    pin::Pin,
21    task::{Context, Poll},
22};
23use tokio::sync::mpsc::{channel, Sender};
24
25use hyper::rt;
26use tower::balance::p2c::Balance;
27use tower::{
28    buffer::{future::ResponseFuture as BufferResponseFuture, Buffer},
29    discover::{Change, Discover},
30    util::BoxService,
31    Service,
32};
33
34type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
35
36const DEFAULT_BUFFER_SIZE: usize = 1024;
37
38/// A default batteries included `transport` channel.
39///
40/// This provides a fully featured http2 gRPC client based on `hyper`
41/// and `tower` services.
42///
43/// # Multiplexing requests
44///
45/// Sending a request on a channel requires a `&mut self` and thus can only send
46/// one request in flight. This is intentional and is required to follow the `Service`
47/// contract from the `tower` library which this channel implementation is built on
48/// top of.
49///
50/// `tower` itself has a concept of `poll_ready` which is the main mechanism to apply
51/// back pressure. `poll_ready` takes a `&mut self` and when it returns `Poll::Ready`
52/// we know the `Service` is able to accept only one request before we must `poll_ready`
53/// again. Due to this fact any `async fn` that wants to poll for readiness and submit
54/// the request must have a `&mut self` reference.
55///
56/// To work around this and to ease the use of the channel, `Channel` provides a
57/// `Clone` implementation that is _cheap_. This is because at the very top level
58/// the channel is backed by a `tower_buffer::Buffer` which runs the connection
59/// in a background task and provides a `mpsc` channel interface. Due to this
60/// cloning the `Channel` type is cheap and encouraged.
61#[derive(Clone)]
62pub struct Channel {
63    svc: Buffer<Request<Body>, BoxFuture<'static, Result<Response<Body>, crate::BoxError>>>,
64}
65
66/// A future that resolves to an HTTP response.
67///
68/// This is returned by the `Service::call` on [`Channel`].
69pub struct ResponseFuture {
70    inner: BufferResponseFuture<BoxFuture<'static, Result<Response<Body>, crate::BoxError>>>,
71}
72
73impl Channel {
74    /// Create an [`Endpoint`] builder that can create [`Channel`]s.
75    pub fn builder(uri: Uri) -> Endpoint {
76        Endpoint::from(uri)
77    }
78
79    /// Create an [`Endpoint`] from a static string.
80    ///
81    /// ```
82    /// # use tonic_rustls::Channel;
83    /// Channel::from_static("https://example.com");
84    /// ```
85    pub fn from_static(s: &'static str) -> Endpoint {
86        let uri = Uri::from_static(s);
87        Self::builder(uri)
88    }
89
90    /// Create an [`Endpoint`] from shared bytes.
91    ///
92    /// ```
93    /// # use tonic_rustls::Channel;
94    /// Channel::from_shared("https://example.com");
95    /// ```
96    pub fn from_shared(s: impl Into<Bytes>) -> Result<Endpoint, InvalidUri> {
97        let uri = Uri::from_maybe_shared(s.into())?;
98        Ok(Self::builder(uri))
99    }
100
101    /// Balance a list of [`Endpoint`]'s.
102    ///
103    /// This creates a [`Channel`] that will load balance across all the
104    /// provided endpoints.
105    pub fn balance_list(list: impl Iterator<Item = Endpoint>) -> Self {
106        let (channel, tx) = Self::balance_channel(DEFAULT_BUFFER_SIZE);
107        list.for_each(|endpoint| {
108            tx.try_send(Change::Insert(endpoint.uri.clone(), endpoint))
109                .unwrap();
110        });
111
112        channel
113    }
114
115    /// Balance a list of [`Endpoint`]'s.
116    ///
117    /// This creates a [`Channel`] that will listen to a stream of change events and will add or remove provided endpoints.
118    pub fn balance_channel<K>(capacity: usize) -> (Self, Sender<Change<K, Endpoint>>)
119    where
120        K: Hash + Eq + Send + Clone + 'static,
121    {
122        Self::balance_channel_with_executor(capacity, SharedExec::tokio())
123    }
124
125    /// Balance a list of [`Endpoint`]'s.
126    ///
127    /// This creates a [`Channel`] that will listen to a stream of change events and will add or remove provided endpoints.
128    ///
129    /// The [`Channel`] will use the given executor to spawn async tasks.
130    pub fn balance_channel_with_executor<K, E>(
131        capacity: usize,
132        executor: E,
133    ) -> (Self, Sender<Change<K, Endpoint>>)
134    where
135        K: Hash + Eq + Send + Clone + 'static,
136        E: Executor<Pin<Box<dyn Future<Output = ()> + Send>>> + Send + Sync + 'static,
137    {
138        let (tx, rx) = channel(capacity);
139        let list = DynamicServiceStream::new(rx);
140        (Self::balance(list, DEFAULT_BUFFER_SIZE, executor), tx)
141    }
142
143    pub(crate) fn new<C>(connector: C, endpoint: Endpoint) -> Self
144    where
145        C: Service<Uri> + Send + 'static,
146        C::Error: Into<crate::BoxError> + Send,
147        C::Future: Send,
148        C::Response: rt::Read + rt::Write + HyperConnection + Unpin + Send + 'static,
149    {
150        let buffer_size = endpoint.buffer_size.unwrap_or(DEFAULT_BUFFER_SIZE);
151        let executor = endpoint.executor.clone();
152
153        let svc = Connection::lazy(connector, endpoint);
154        let (svc, worker) = Buffer::pair(svc, buffer_size);
155
156        executor.execute(worker);
157
158        Channel { svc }
159    }
160
161    pub(crate) async fn connect<C>(connector: C, endpoint: Endpoint) -> Result<Self, super::Error>
162    where
163        C: Service<Uri> + Send + 'static,
164        C::Error: Into<crate::BoxError> + Send,
165        C::Future: Unpin + Send,
166        C::Response: rt::Read + rt::Write + HyperConnection + Unpin + Send + 'static,
167    {
168        let buffer_size = endpoint.buffer_size.unwrap_or(DEFAULT_BUFFER_SIZE);
169        let executor = endpoint.executor.clone();
170
171        let svc = Connection::connect(connector, endpoint)
172            .await
173            .map_err(super::Error::from_source)?;
174        let (svc, worker) = Buffer::pair(svc, buffer_size);
175        executor.execute(worker);
176
177        Ok(Channel { svc })
178    }
179
180    pub(crate) fn balance<D, E>(discover: D, buffer_size: usize, executor: E) -> Self
181    where
182        D: Discover<Service = Connection> + Unpin + Send + 'static,
183        D::Error: Into<crate::BoxError>,
184        D::Key: Hash + Send + Clone,
185        E: Executor<BoxFuture<'static, ()>> + Send + Sync + 'static,
186    {
187        let svc = Balance::new(discover);
188
189        let svc = BoxService::new(svc);
190        let (svc, worker) = Buffer::pair(svc, buffer_size);
191        executor.execute(Box::pin(worker));
192
193        Channel { svc }
194    }
195}
196
197impl Service<http::Request<Body>> for Channel {
198    type Response = http::Response<Body>;
199    type Error = super::Error;
200    type Future = ResponseFuture;
201
202    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
203        Service::poll_ready(&mut self.svc, cx).map_err(super::Error::from_source)
204    }
205
206    fn call(&mut self, request: http::Request<Body>) -> Self::Future {
207        let inner = Service::call(&mut self.svc, request);
208
209        ResponseFuture { inner }
210    }
211}
212
213impl Future for ResponseFuture {
214    type Output = Result<Response<Body>, super::Error>;
215
216    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
217        Pin::new(&mut self.inner)
218            .poll(cx)
219            .map_err(super::Error::from_source)
220    }
221}
222
223impl fmt::Debug for Channel {
224    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
225        f.debug_struct("Channel").finish()
226    }
227}
228
229impl fmt::Debug for ResponseFuture {
230    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
231        f.debug_struct("ResponseFuture").finish()
232    }
233}