rama_http_core/client/conn/
http2.rs1use std::borrow::Cow;
4use std::fmt;
5use std::marker::PhantomData;
6use std::pin::Pin;
7use std::task::{Context, Poll};
8use std::time::Duration;
9
10use futures_util::ready;
11use rama_core::error::BoxError;
12use rama_core::rt::Executor;
13use rama_http_types::proto::h2::PseudoHeaderOrder;
14use rama_http_types::proto::h2::frame::{SettingOrder, SettingsConfig};
15use rama_http_types::{Request, Response};
16use tokio::io::{AsyncRead, AsyncWrite};
17use tracing::{debug, trace};
18
19use super::super::dispatch::{self, TrySendError};
20use crate::body::{Body, Incoming as IncomingBody};
21use crate::h2::frame::{Priority, StreamDependency};
22use crate::proto;
23
24pub struct SendRequest<B> {
26 dispatch: dispatch::UnboundedSender<Request<B>, Response<IncomingBody>>,
27}
28
29impl<B> Clone for SendRequest<B> {
30 fn clone(&self) -> SendRequest<B> {
31 SendRequest {
32 dispatch: self.dispatch.clone(),
33 }
34 }
35}
36
37#[must_use = "futures do nothing unless polled"]
44pub struct Connection<T, B>
45where
46 T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
47 B: Body<Data: Send + 'static, Error: Into<BoxError>> + Send + 'static + Unpin,
48{
49 inner: (PhantomData<T>, proto::h2::ClientTask<B, T>),
50}
51
52#[derive(Clone, Debug)]
59pub struct Builder {
60 pub(super) exec: Executor,
61 h2_builder: proto::h2::client::Config,
62 headers_pseudo_order: Option<PseudoHeaderOrder>,
63 headers_priority: Option<StreamDependency>,
64 priority: Option<Cow<'static, [Priority]>>,
65}
66
67pub async fn handshake<T, B>(
72 exec: Executor,
73 io: T,
74) -> crate::Result<(SendRequest<B>, Connection<T, B>)>
75where
76 T: AsyncRead + AsyncWrite + Send + Unpin + 'static,
77 B: Body<Data: Send + 'static, Error: Into<BoxError>> + Send + 'static + Unpin,
78{
79 Builder::new(exec).handshake(io).await
80}
81
82impl<B> SendRequest<B> {
85 pub fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
89 if self.is_closed() {
90 Poll::Ready(Err(crate::Error::new_closed()))
91 } else {
92 Poll::Ready(Ok(()))
93 }
94 }
95
96 pub async fn ready(&mut self) -> crate::Result<()> {
100 futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await
101 }
102
103 pub fn is_ready(&self) -> bool {
111 self.dispatch.is_ready()
112 }
113
114 pub fn is_closed(&self) -> bool {
116 self.dispatch.is_closed()
117 }
118}
119
120impl<B> SendRequest<B>
121where
122 B: Body<Data: Send + 'static, Error: Into<BoxError>> + Send + 'static + Unpin,
123{
124 pub fn send_request(
133 &mut self,
134 req: Request<B>,
135 ) -> impl Future<Output = crate::Result<Response<IncomingBody>>> {
136 let sent = self.dispatch.send(req);
137
138 async move {
139 match sent {
140 Ok(rx) => match rx.await {
141 Ok(Ok(resp)) => Ok(resp),
142 Ok(Err(err)) => Err(err),
143 Err(_canceled) => panic!("dispatch dropped without returning error"),
145 },
146 Err(_req) => {
147 debug!("connection was not ready");
148 Err(crate::Error::new_canceled().with("connection was not ready"))
149 }
150 }
151 }
152 }
153
154 pub fn try_send_request(
163 &mut self,
164 req: Request<B>,
165 ) -> impl Future<Output = Result<Response<IncomingBody>, TrySendError<Request<B>>>> {
166 let sent = self.dispatch.try_send(req);
167 async move {
168 match sent {
169 Ok(rx) => match rx.await {
170 Ok(Ok(res)) => Ok(res),
171 Ok(Err(err)) => Err(err),
172 Err(_) => panic!("dispatch dropped without returning error"),
174 },
175 Err(req) => {
176 debug!("connection was not ready");
177 let error = crate::Error::new_canceled().with("connection was not ready");
178 Err(TrySendError {
179 error,
180 message: Some(req),
181 })
182 }
183 }
184 }
185 }
186}
187
188impl<B> fmt::Debug for SendRequest<B> {
189 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
190 f.debug_struct("SendRequest").finish()
191 }
192}
193
194impl<T, B> Connection<T, B>
197where
198 T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
199 B: Body<Data: Send + 'static, Error: Into<BoxError>> + Send + 'static + Unpin,
200{
201 pub fn is_extended_connect_protocol_enabled(&self) -> bool {
211 self.inner.1.is_extended_connect_protocol_enabled()
212 }
213}
214
215impl<T, B> fmt::Debug for Connection<T, B>
216where
217 T: AsyncRead + AsyncWrite + fmt::Debug + Send + 'static + Unpin,
218 B: Body<Data: Send + 'static, Error: Into<BoxError>> + Send + 'static + Unpin,
219{
220 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
221 f.debug_struct("Connection").finish()
222 }
223}
224
225impl<T, B> Future for Connection<T, B>
226where
227 T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
228 B: Body<Data: Send + 'static, Error: Into<BoxError>> + Send + 'static + Unpin,
229{
230 type Output = crate::Result<()>;
231
232 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
233 match ready!(Pin::new(&mut self.inner.1).poll(cx))? {
234 proto::Dispatched::Shutdown => Poll::Ready(Ok(())),
235 proto::Dispatched::Upgrade(_pending) => unreachable!("http2 cannot upgrade"),
236 }
237 }
238}
239
240impl Builder {
243 #[inline]
245 pub fn new(exec: Executor) -> Builder {
246 Builder {
247 exec,
248 h2_builder: Default::default(),
249 headers_pseudo_order: None,
250 headers_priority: None,
251 priority: None,
252 }
253 }
254
255 pub fn apply_setting_config(&mut self, config: &SettingsConfig) -> &mut Self {
256 self.header_table_size(config.header_table_size)
257 .max_concurrent_streams(config.max_concurrent_streams)
258 .initial_stream_window_size(config.initial_window_size)
259 .max_frame_size(config.max_frame_size);
260
261 if let Some(value) = config.enable_push {
262 self.enable_push(value != 0);
263 }
264
265 if let Some(value) = config.max_header_list_size {
266 self.max_header_list_size(value);
267 }
268
269 if let Some(value) = config.enable_connect_protocol {
270 self.enable_connect_protocol(value);
271 }
272
273 if let Some(value) = config.unknown_setting_9 {
274 self.unknown_setting_9(value);
275 }
276
277 if let Some(order) = config.setting_order.clone() {
278 self.setting_order(order);
279 }
280
281 self
282 }
283
284 pub fn initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
293 if let Some(sz) = sz.into() {
294 self.h2_builder.adaptive_window = false;
295 self.h2_builder.initial_stream_window_size = sz;
296 }
297 self
298 }
299
300 pub fn initial_connection_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
306 if let Some(sz) = sz.into() {
307 self.h2_builder.adaptive_window = false;
308 self.h2_builder.initial_conn_window_size = sz;
309 }
310 self
311 }
312
313 pub fn initial_max_send_streams(&mut self, initial: impl Into<Option<usize>>) -> &mut Self {
324 if let Some(initial) = initial.into() {
325 self.h2_builder.initial_max_send_streams = initial;
326 }
327 self
328 }
329
330 pub fn adaptive_window(&mut self, enabled: bool) -> &mut Self {
336 use proto::h2::SPEC_WINDOW_SIZE;
337
338 self.h2_builder.adaptive_window = enabled;
339 if enabled {
340 self.h2_builder.initial_conn_window_size = SPEC_WINDOW_SIZE;
341 self.h2_builder.initial_stream_window_size = SPEC_WINDOW_SIZE;
342 }
343 self
344 }
345
346 pub fn max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
350 self.h2_builder.max_frame_size = sz.into();
351 self
352 }
353
354 pub fn max_header_list_size(&mut self, max: u32) -> &mut Self {
358 self.h2_builder.max_header_list_size = max;
359 self
360 }
361
362 pub fn header_table_size(&mut self, size: impl Into<Option<u32>>) -> &mut Self {
370 self.h2_builder.header_table_size = size.into();
371 self
372 }
373
374 pub fn max_concurrent_streams(&mut self, max: impl Into<Option<u32>>) -> &mut Self {
398 self.h2_builder.max_concurrent_streams = max.into();
399 self
400 }
401
402 pub fn keep_alive_interval(&mut self, interval: impl Into<Option<Duration>>) -> &mut Self {
409 self.h2_builder.keep_alive_interval = interval.into();
410 self
411 }
412
413 pub fn keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self {
420 self.h2_builder.keep_alive_timeout = timeout;
421 self
422 }
423
424 pub fn keep_alive_while_idle(&mut self, enabled: bool) -> &mut Self {
433 self.h2_builder.keep_alive_while_idle = enabled;
434 self
435 }
436
437 pub fn max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self {
446 self.h2_builder.max_concurrent_reset_streams = Some(max);
447 self
448 }
449
450 pub fn max_send_buf_size(&mut self, max: usize) -> &mut Self {
458 assert!(max <= u32::MAX as usize);
459 self.h2_builder.max_send_buffer_size = max;
460 self
461 }
462
463 pub fn max_pending_accept_reset_streams(&mut self, max: impl Into<Option<usize>>) -> &mut Self {
467 self.h2_builder.max_pending_accept_reset_streams = max.into();
468 self
469 }
470
471 pub fn enable_push(&mut self, enable: bool) -> &mut Self {
472 self.h2_builder.enable_push = enable;
473 self
474 }
475
476 pub fn enable_connect_protocol(&mut self, value: u32) -> &mut Self {
477 self.h2_builder.enable_connect_protocol = Some(value);
478 self
479 }
480
481 pub fn unknown_setting_9(&mut self, value: u32) -> &mut Self {
482 self.h2_builder.unknown_setting_9 = Some(value);
483 self
484 }
485
486 pub fn setting_order(&mut self, order: SettingOrder) -> &mut Self {
487 self.h2_builder.setting_order = Some(order);
488 self
489 }
490
491 pub fn headers_pseudo_order(&mut self, order: PseudoHeaderOrder) -> &mut Self {
492 self.headers_pseudo_order = Some(order);
493 self
494 }
495
496 pub fn headers_priority(&mut self, headers_priority: StreamDependency) -> &mut Self {
497 self.headers_priority = Some(headers_priority);
498 self
499 }
500
501 pub fn priority(&mut self, priority: impl Into<Cow<'static, [Priority]>>) -> &mut Self {
502 self.priority = Some(priority.into());
503 self
504 }
505
506 pub fn handshake<T, B>(
512 &self,
513 io: T,
514 ) -> impl Future<Output = crate::Result<(SendRequest<B>, Connection<T, B>)>>
515 where
516 T: AsyncRead + AsyncWrite + Send + Unpin + 'static,
517 B: Body<Data: Send + 'static, Error: Into<BoxError>> + Send + 'static + Unpin,
518 {
519 let opts = self.clone();
520
521 async move {
522 trace!("client handshake HTTP/2");
523
524 let mut client_builder = proto::h2::client::new_builder(&self.h2_builder);
525 if let Some(order) = self.headers_pseudo_order.clone() {
526 client_builder.headers_pseudo_order(order);
527 }
528 if let Some(priority) = self.headers_priority.clone() {
529 client_builder.headers_priority(priority);
530 }
531 if let Some(priority) = self.priority.clone() {
532 client_builder.priority(priority);
533 }
534
535 let (tx, rx) = dispatch::channel();
536
537 let h2 = proto::h2::client::handshake_with_builder(
538 client_builder,
539 io,
540 rx,
541 &opts.h2_builder,
542 opts.exec,
543 )
544 .await?;
545
546 Ok((
547 SendRequest {
548 dispatch: tx.unbound(),
549 },
550 Connection {
551 inner: (PhantomData, h2),
552 },
553 ))
554 }
555 }
556}
557
558#[cfg(test)]
559mod tests {
560 use rama_core::rt::Executor;
561 use rama_http_types::dep::http_body_util;
562 use tokio::io::{AsyncRead, AsyncWrite};
563
564 #[tokio::test]
565 #[ignore] async fn send_sync_executor_of_send_futures() {
567 #[allow(unused)]
568 async fn run(io: impl AsyncRead + AsyncWrite + Send + Unpin + 'static) {
569 let (_sender, conn) = crate::client::conn::http2::handshake::<
570 _,
571 http_body_util::Empty<bytes::Bytes>,
572 >(Executor::default(), io)
573 .await
574 .unwrap();
575
576 tokio::task::spawn(async move {
577 conn.await.unwrap();
578 });
579 }
580 }
581}