1use std::error::Error as StdError;
2use std::future::Future;
3use std::io::{Cursor, IoSlice};
4use std::pin::Pin;
5use std::task::{Context, Poll};
6
7use bytes::Buf;
8use futures_core::ready;
9use h2::SendStream;
10use http::header::{HeaderName, CONNECTION, TE, TRANSFER_ENCODING, UPGRADE};
11use http::HeaderMap;
12use pin_project_lite::pin_project;
13
14use crate::body::Body;
15
16pub(crate) mod ping;
17pub(crate) mod upgrade;
18
19cfg_client! {
20 pub(crate) mod client;
21 pub(crate) use self::client::ClientTask;
22}
23
24cfg_server! {
25 pub(crate) mod server;
26 pub(crate) use self::server::Server;
27}
28
29pub(crate) const SPEC_WINDOW_SIZE: u32 = 65_535;
31
32static CONNECTION_HEADERS: [HeaderName; 4] = [
37 HeaderName::from_static("keep-alive"),
38 HeaderName::from_static("proxy-connection"),
39 TRANSFER_ENCODING,
40 UPGRADE,
41];
42
43fn strip_connection_headers(headers: &mut HeaderMap, is_request: bool) {
44 for header in &CONNECTION_HEADERS {
45 if headers.remove(header).is_some() {
46 warn!("Connection header illegal in HTTP/2: {}", header.as_str());
47 }
48 }
49
50 if is_request {
51 if headers
52 .get(TE)
53 .map_or(false, |te_header| te_header != "trailers")
54 {
55 warn!("TE headers not set to \"trailers\" are illegal in HTTP/2 requests");
56 headers.remove(TE);
57 }
58 } else if headers.remove(TE).is_some() {
59 warn!("TE headers illegal in HTTP/2 responses");
60 }
61
62 if let Some(header) = headers.remove(CONNECTION) {
63 warn!(
64 "Connection header illegal in HTTP/2: {}",
65 CONNECTION.as_str()
66 );
67 if let Ok(header_contents) = header.to_str() {
74 for name in header_contents.split(',') {
75 let name = name.trim();
76 headers.remove(name);
77 }
78 }
79 }
80}
81
82pin_project! {
85 pub(crate) struct PipeToSendStream<S>
86 where
87 S: Body,
88 {
89 body_tx: SendStream<SendBuf<S::Data>>,
90 data_done: bool,
91 #[pin]
92 stream: S,
93 }
94}
95
96impl<S> PipeToSendStream<S>
97where
98 S: Body,
99{
100 fn new(stream: S, tx: SendStream<SendBuf<S::Data>>) -> PipeToSendStream<S> {
101 PipeToSendStream {
102 body_tx: tx,
103 data_done: false,
104 stream,
105 }
106 }
107
108 #[cfg(feature = "client")]
109 fn send_reset(self: Pin<&mut Self>, reason: h2::Reason) {
110 self.project().body_tx.send_reset(reason);
111 }
112}
113
114impl<S> Future for PipeToSendStream<S>
115where
116 S: Body,
117 S::Error: Into<Box<dyn StdError + Send + Sync>>,
118{
119 type Output = crate::Result<()>;
120
121 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
122 let mut me = self.project();
123 loop {
124 me.body_tx.reserve_capacity(1);
128
129 if me.body_tx.capacity() == 0 {
130 loop {
131 match ready!(me.body_tx.poll_capacity(cx)) {
132 Some(Ok(0)) => {}
133 Some(Ok(_)) => break,
134 Some(Err(e)) => return Poll::Ready(Err(crate::Error::new_body_write(e))),
135 None => {
136 return Poll::Ready(Err(crate::Error::new_body_write(
140 "send stream capacity unexpectedly closed",
141 )));
142 }
143 }
144 }
145 } else if let Poll::Ready(reason) = me
146 .body_tx
147 .poll_reset(cx)
148 .map_err(crate::Error::new_body_write)?
149 {
150 debug!("stream received RST_STREAM: {:?}", reason);
151 return Poll::Ready(Err(crate::Error::new_body_write(::h2::Error::from(reason))));
152 }
153
154 match ready!(me.stream.as_mut().poll_frame(cx)) {
155 Some(Ok(frame)) => {
156 if frame.is_data() {
157 let chunk = frame.into_data().unwrap_or_else(|_| unreachable!());
158 let is_eos = me.stream.is_end_stream();
159 trace!(
160 "send body chunk: {} bytes, eos={}",
161 chunk.remaining(),
162 is_eos,
163 );
164
165 let buf = SendBuf::Buf(chunk);
166 me.body_tx
167 .send_data(buf, is_eos)
168 .map_err(crate::Error::new_body_write)?;
169
170 if is_eos {
171 return Poll::Ready(Ok(()));
172 }
173 } else if frame.is_trailers() {
174 me.body_tx.reserve_capacity(0);
176 me.body_tx
177 .send_trailers(frame.into_trailers().unwrap_or_else(|_| unreachable!()))
178 .map_err(crate::Error::new_body_write)?;
179 return Poll::Ready(Ok(()));
180 } else {
181 trace!("discarding unknown frame");
182 }
184 }
185 Some(Err(e)) => return Poll::Ready(Err(me.body_tx.on_user_err(e))),
186 None => {
187 return Poll::Ready(me.body_tx.send_eos_frame());
191 }
192 }
193 }
194 }
195}
196
197trait SendStreamExt {
198 fn on_user_err<E>(&mut self, err: E) -> crate::Error
199 where
200 E: Into<Box<dyn std::error::Error + Send + Sync>>;
201 fn send_eos_frame(&mut self) -> crate::Result<()>;
202}
203
204impl<B: Buf> SendStreamExt for SendStream<SendBuf<B>> {
205 fn on_user_err<E>(&mut self, err: E) -> crate::Error
206 where
207 E: Into<Box<dyn std::error::Error + Send + Sync>>,
208 {
209 let err = crate::Error::new_user_body(err);
210 debug!("send body user stream error: {}", err);
211 self.send_reset(err.h2_reason());
212 err
213 }
214
215 fn send_eos_frame(&mut self) -> crate::Result<()> {
216 trace!("send body eos");
217 self.send_data(SendBuf::None, true)
218 .map_err(crate::Error::new_body_write)
219 }
220}
221
222#[repr(usize)]
223enum SendBuf<B> {
224 Buf(B),
225 Cursor(Cursor<Box<[u8]>>),
226 None,
227}
228
229impl<B: Buf> Buf for SendBuf<B> {
230 #[inline]
231 fn remaining(&self) -> usize {
232 match *self {
233 Self::Buf(ref b) => b.remaining(),
234 Self::Cursor(ref c) => Buf::remaining(c),
235 Self::None => 0,
236 }
237 }
238
239 #[inline]
240 fn chunk(&self) -> &[u8] {
241 match *self {
242 Self::Buf(ref b) => b.chunk(),
243 Self::Cursor(ref c) => c.chunk(),
244 Self::None => &[],
245 }
246 }
247
248 #[inline]
249 fn advance(&mut self, cnt: usize) {
250 match *self {
251 Self::Buf(ref mut b) => b.advance(cnt),
252 Self::Cursor(ref mut c) => c.advance(cnt),
253 Self::None => {}
254 }
255 }
256
257 fn chunks_vectored<'a>(&'a self, dst: &mut [IoSlice<'a>]) -> usize {
258 match *self {
259 Self::Buf(ref b) => b.chunks_vectored(dst),
260 Self::Cursor(ref c) => c.chunks_vectored(dst),
261 Self::None => 0,
262 }
263 }
264}