tonic_rustls/channel/
mod.rs1mod endpoint;
4pub(crate) mod service;
5
6pub use endpoint::Endpoint;
7
8use self::service::{Connection, DynamicServiceStream, Executor, SharedExec};
9use tonic::body::Body;
10use bytes::Bytes;
11use http::{
12 uri::{InvalidUri, Uri},
13 Request, Response,
14};
15use hyper_util::client::legacy::connect::Connection as HyperConnection;
16use std::{
17 fmt,
18 future::Future,
19 hash::Hash,
20 pin::Pin,
21 task::{Context, Poll},
22};
23use tokio::sync::mpsc::{channel, Sender};
24
25use hyper::rt;
26use tower::balance::p2c::Balance;
27use tower::{
28 buffer::{future::ResponseFuture as BufferResponseFuture, Buffer},
29 discover::{Change, Discover},
30 util::BoxService,
31 Service,
32};
33
34type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
35
36const DEFAULT_BUFFER_SIZE: usize = 1024;
37
38#[derive(Clone)]
62pub struct Channel {
63 svc: Buffer<Request<Body>, BoxFuture<'static, Result<Response<Body>, crate::BoxError>>>,
64}
65
66pub struct ResponseFuture {
70 inner: BufferResponseFuture<BoxFuture<'static, Result<Response<Body>, crate::BoxError>>>,
71}
72
73impl Channel {
74 pub fn builder(uri: Uri) -> Endpoint {
76 Endpoint::from(uri)
77 }
78
79 pub fn from_static(s: &'static str) -> Endpoint {
86 let uri = Uri::from_static(s);
87 Self::builder(uri)
88 }
89
90 pub fn from_shared(s: impl Into<Bytes>) -> Result<Endpoint, InvalidUri> {
97 let uri = Uri::from_maybe_shared(s.into())?;
98 Ok(Self::builder(uri))
99 }
100
101 pub fn balance_list(list: impl Iterator<Item = Endpoint>) -> Self {
106 let (channel, tx) = Self::balance_channel(DEFAULT_BUFFER_SIZE);
107 list.for_each(|endpoint| {
108 tx.try_send(Change::Insert(endpoint.uri.clone(), endpoint))
109 .unwrap();
110 });
111
112 channel
113 }
114
115 pub fn balance_channel<K>(capacity: usize) -> (Self, Sender<Change<K, Endpoint>>)
119 where
120 K: Hash + Eq + Send + Clone + 'static,
121 {
122 Self::balance_channel_with_executor(capacity, SharedExec::tokio())
123 }
124
125 pub fn balance_channel_with_executor<K, E>(
131 capacity: usize,
132 executor: E,
133 ) -> (Self, Sender<Change<K, Endpoint>>)
134 where
135 K: Hash + Eq + Send + Clone + 'static,
136 E: Executor<Pin<Box<dyn Future<Output = ()> + Send>>> + Send + Sync + 'static,
137 {
138 let (tx, rx) = channel(capacity);
139 let list = DynamicServiceStream::new(rx);
140 (Self::balance(list, DEFAULT_BUFFER_SIZE, executor), tx)
141 }
142
143 pub(crate) fn new<C>(connector: C, endpoint: Endpoint) -> Self
144 where
145 C: Service<Uri> + Send + 'static,
146 C::Error: Into<crate::BoxError> + Send,
147 C::Future: Send,
148 C::Response: rt::Read + rt::Write + HyperConnection + Unpin + Send + 'static,
149 {
150 let buffer_size = endpoint.buffer_size.unwrap_or(DEFAULT_BUFFER_SIZE);
151 let executor = endpoint.executor.clone();
152
153 let svc = Connection::lazy(connector, endpoint);
154 let (svc, worker) = Buffer::pair(svc, buffer_size);
155
156 executor.execute(worker);
157
158 Channel { svc }
159 }
160
161 pub(crate) async fn connect<C>(connector: C, endpoint: Endpoint) -> Result<Self, super::Error>
162 where
163 C: Service<Uri> + Send + 'static,
164 C::Error: Into<crate::BoxError> + Send,
165 C::Future: Unpin + Send,
166 C::Response: rt::Read + rt::Write + HyperConnection + Unpin + Send + 'static,
167 {
168 let buffer_size = endpoint.buffer_size.unwrap_or(DEFAULT_BUFFER_SIZE);
169 let executor = endpoint.executor.clone();
170
171 let svc = Connection::connect(connector, endpoint)
172 .await
173 .map_err(super::Error::from_source)?;
174 let (svc, worker) = Buffer::pair(svc, buffer_size);
175 executor.execute(worker);
176
177 Ok(Channel { svc })
178 }
179
180 pub(crate) fn balance<D, E>(discover: D, buffer_size: usize, executor: E) -> Self
181 where
182 D: Discover<Service = Connection> + Unpin + Send + 'static,
183 D::Error: Into<crate::BoxError>,
184 D::Key: Hash + Send + Clone,
185 E: Executor<BoxFuture<'static, ()>> + Send + Sync + 'static,
186 {
187 let svc = Balance::new(discover);
188
189 let svc = BoxService::new(svc);
190 let (svc, worker) = Buffer::pair(svc, buffer_size);
191 executor.execute(Box::pin(worker));
192
193 Channel { svc }
194 }
195}
196
197impl Service<http::Request<Body>> for Channel {
198 type Response = http::Response<Body>;
199 type Error = super::Error;
200 type Future = ResponseFuture;
201
202 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
203 Service::poll_ready(&mut self.svc, cx).map_err(super::Error::from_source)
204 }
205
206 fn call(&mut self, request: http::Request<Body>) -> Self::Future {
207 let inner = Service::call(&mut self.svc, request);
208
209 ResponseFuture { inner }
210 }
211}
212
213impl Future for ResponseFuture {
214 type Output = Result<Response<Body>, super::Error>;
215
216 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
217 Pin::new(&mut self.inner)
218 .poll(cx)
219 .map_err(super::Error::from_source)
220 }
221}
222
223impl fmt::Debug for Channel {
224 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
225 f.debug_struct("Channel").finish()
226 }
227}
228
229impl fmt::Debug for ResponseFuture {
230 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
231 f.debug_struct("ResponseFuture").finish()
232 }
233}