1use std::fmt;
2#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
3use std::future::Future;
4use std::pin::Pin;
5use std::task::{Context, Poll};
6
7use bytes::Bytes;
8#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
9use futures_channel::{mpsc, oneshot};
10#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
11use futures_util::{stream::FusedStream, Stream}; #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
13use http::HeaderMap;
14use http_body::{Body, Frame, SizeHint};
15
16#[cfg(all(
17 any(feature = "http1", feature = "http2"),
18 any(feature = "client", feature = "server")
19))]
20use super::DecodedLength;
21#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
22use crate::common::watch;
23#[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
24use crate::proto::h2::ping;
25
26#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
27type BodySender = mpsc::Sender<Result<Bytes, crate::Error>>;
28#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
29type TrailersSender = oneshot::Sender<HeaderMap>;
30
31#[must_use = "streams do nothing unless polled"]
47pub struct Incoming {
48 kind: Kind,
49}
50
51enum Kind {
52 Empty,
53 #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
54 Chan {
55 content_length: DecodedLength,
56 want_tx: watch::Sender,
57 data_rx: mpsc::Receiver<Result<Bytes, crate::Error>>,
58 trailers_rx: oneshot::Receiver<HeaderMap>,
59 },
60 #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
61 H2 {
62 content_length: DecodedLength,
63 data_done: bool,
64 ping: ping::Recorder,
65 recv: h2::RecvStream,
66 },
67 #[cfg(feature = "ffi")]
68 Ffi(crate::ffi::UserBody),
69}
70
71#[must_use = "Sender does nothing unless sent on"]
85#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
86pub(crate) struct Sender {
87 want_rx: watch::Receiver,
88 data_tx: BodySender,
89 trailers_tx: Option<TrailersSender>,
90}
91
92#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
93const WANT_PENDING: usize = 1;
94#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
95const WANT_READY: usize = 2;
96
97impl Incoming {
98 #[inline]
102 #[cfg(test)]
103 pub(crate) fn channel() -> (Sender, Incoming) {
104 Self::new_channel(DecodedLength::CHUNKED, false)
105 }
106
107 #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
108 pub(crate) fn new_channel(content_length: DecodedLength, wanter: bool) -> (Sender, Incoming) {
109 let (data_tx, data_rx) = mpsc::channel(0);
110 let (trailers_tx, trailers_rx) = oneshot::channel();
111
112 let want = if wanter { WANT_PENDING } else { WANT_READY };
115
116 let (want_tx, want_rx) = watch::channel(want);
117
118 let tx = Sender {
119 want_rx,
120 data_tx,
121 trailers_tx: Some(trailers_tx),
122 };
123 let rx = Incoming::new(Kind::Chan {
124 content_length,
125 want_tx,
126 data_rx,
127 trailers_rx,
128 });
129
130 (tx, rx)
131 }
132
133 fn new(kind: Kind) -> Incoming {
134 Incoming { kind }
135 }
136
137 #[allow(dead_code)]
138 pub(crate) fn empty() -> Incoming {
139 Incoming::new(Kind::Empty)
140 }
141
142 #[cfg(feature = "ffi")]
143 pub(crate) fn ffi() -> Incoming {
144 Incoming::new(Kind::Ffi(crate::ffi::UserBody::new()))
145 }
146
147 #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
148 pub(crate) fn h2(
149 recv: h2::RecvStream,
150 mut content_length: DecodedLength,
151 ping: ping::Recorder,
152 ) -> Self {
153 if !content_length.is_exact() && recv.is_end_stream() {
156 content_length = DecodedLength::ZERO;
157 }
158
159 Incoming::new(Kind::H2 {
160 data_done: false,
161 ping,
162 content_length,
163 recv,
164 })
165 }
166
167 #[cfg(feature = "ffi")]
168 pub(crate) fn as_ffi_mut(&mut self) -> &mut crate::ffi::UserBody {
169 match self.kind {
170 Kind::Ffi(ref mut body) => return body,
171 _ => {
172 self.kind = Kind::Ffi(crate::ffi::UserBody::new());
173 }
174 }
175
176 match self.kind {
177 Kind::Ffi(ref mut body) => body,
178 _ => unreachable!(),
179 }
180 }
181}
182
183impl Body for Incoming {
184 type Data = Bytes;
185 type Error = crate::Error;
186
187 fn poll_frame(
188 #[cfg_attr(
189 not(all(
190 any(feature = "http1", feature = "http2"),
191 any(feature = "client", feature = "server")
192 )),
193 allow(unused_mut)
194 )]
195 mut self: Pin<&mut Self>,
196 #[cfg_attr(
197 not(all(
198 any(feature = "http1", feature = "http2"),
199 any(feature = "client", feature = "server")
200 )),
201 allow(unused_variables)
202 )]
203 cx: &mut Context<'_>,
204 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
205 match self.kind {
206 Kind::Empty => Poll::Ready(None),
207 #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
208 Kind::Chan {
209 content_length: ref mut len,
210 ref mut data_rx,
211 ref mut want_tx,
212 ref mut trailers_rx,
213 } => {
214 want_tx.send(WANT_READY);
215
216 if !data_rx.is_terminated() {
217 if let Some(chunk) = ready!(Pin::new(data_rx).poll_next(cx)?) {
218 len.sub_if(chunk.len() as u64);
219 return Poll::Ready(Some(Ok(Frame::data(chunk))));
220 }
221 }
222
223 match ready!(Pin::new(trailers_rx).poll(cx)) {
225 Ok(t) => Poll::Ready(Some(Ok(Frame::trailers(t)))),
226 Err(_) => Poll::Ready(None),
227 }
228 }
229 #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
230 Kind::H2 {
231 ref mut data_done,
232 ref ping,
233 recv: ref mut h2,
234 content_length: ref mut len,
235 } => {
236 if !*data_done {
237 match ready!(h2.poll_data(cx)) {
238 Some(Ok(bytes)) => {
239 let _ = h2.flow_control().release_capacity(bytes.len());
240 len.sub_if(bytes.len() as u64);
241 ping.record_data(bytes.len());
242 return Poll::Ready(Some(Ok(Frame::data(bytes))));
243 }
244 Some(Err(e)) => {
245 return match e.reason() {
246 Some(h2::Reason::NO_ERROR) | Some(h2::Reason::CANCEL) => {
249 Poll::Ready(None)
250 }
251 _ => Poll::Ready(Some(Err(crate::Error::new_body(e)))),
252 };
253 }
254 None => {
255 *data_done = true;
256 }
258 }
259 }
260
261 match ready!(h2.poll_trailers(cx)) {
263 Ok(t) => {
264 ping.record_non_data();
265 Poll::Ready(Ok(t.map(Frame::trailers)).transpose())
266 }
267 Err(e) => Poll::Ready(Some(Err(crate::Error::new_h2(e)))),
268 }
269 }
270
271 #[cfg(feature = "ffi")]
272 Kind::Ffi(ref mut body) => body.poll_data(cx),
273 }
274 }
275
276 fn is_end_stream(&self) -> bool {
277 match self.kind {
278 Kind::Empty => true,
279 #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
280 Kind::Chan { content_length, .. } => content_length == DecodedLength::ZERO,
281 #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
282 Kind::H2 { recv: ref h2, .. } => h2.is_end_stream(),
283 #[cfg(feature = "ffi")]
284 Kind::Ffi(..) => false,
285 }
286 }
287
288 fn size_hint(&self) -> SizeHint {
289 #[cfg(all(
290 any(feature = "http1", feature = "http2"),
291 any(feature = "client", feature = "server")
292 ))]
293 fn opt_len(decoded_length: DecodedLength) -> SizeHint {
294 if let Some(content_length) = decoded_length.into_opt() {
295 SizeHint::with_exact(content_length)
296 } else {
297 SizeHint::default()
298 }
299 }
300
301 match self.kind {
302 Kind::Empty => SizeHint::with_exact(0),
303 #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
304 Kind::Chan { content_length, .. } => opt_len(content_length),
305 #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
306 Kind::H2 { content_length, .. } => opt_len(content_length),
307 #[cfg(feature = "ffi")]
308 Kind::Ffi(..) => SizeHint::default(),
309 }
310 }
311}
312
313impl fmt::Debug for Incoming {
314 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
315 #[derive(Debug)]
316 struct Streaming;
317 #[derive(Debug)]
318 struct Empty;
319
320 let mut builder = f.debug_tuple("Body");
321 match self.kind {
322 Kind::Empty => builder.field(&Empty),
323 #[cfg(any(
324 all(
325 any(feature = "http1", feature = "http2"),
326 any(feature = "client", feature = "server")
327 ),
328 feature = "ffi"
329 ))]
330 _ => builder.field(&Streaming),
331 };
332
333 builder.finish()
334 }
335}
336
337#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
338impl Sender {
339 pub(crate) fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
341 ready!(self.poll_want(cx)?);
343 self.data_tx
344 .poll_ready(cx)
345 .map_err(|_| crate::Error::new_closed())
346 }
347
348 fn poll_want(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
349 match self.want_rx.load(cx) {
350 WANT_READY => Poll::Ready(Ok(())),
351 WANT_PENDING => Poll::Pending,
352 watch::CLOSED => Poll::Ready(Err(crate::Error::new_closed())),
353 unexpected => unreachable!("want_rx value: {}", unexpected),
354 }
355 }
356
357 #[cfg(test)]
358 async fn ready(&mut self) -> crate::Result<()> {
359 futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await
360 }
361
362 #[cfg(test)]
364 #[allow(unused)]
365 pub(crate) async fn send_data(&mut self, chunk: Bytes) -> crate::Result<()> {
366 self.ready().await?;
367 self.data_tx
368 .try_send(Ok(chunk))
369 .map_err(|_| crate::Error::new_closed())
370 }
371
372 #[allow(unused)]
374 pub(crate) async fn send_trailers(&mut self, trailers: HeaderMap) -> crate::Result<()> {
375 let tx = match self.trailers_tx.take() {
376 Some(tx) => tx,
377 None => return Err(crate::Error::new_closed()),
378 };
379 tx.send(trailers).map_err(|_| crate::Error::new_closed())
380 }
381
382 #[cfg(feature = "http1")]
395 pub(crate) fn try_send_data(&mut self, chunk: Bytes) -> Result<(), Bytes> {
396 self.data_tx
397 .try_send(Ok(chunk))
398 .map_err(|err| err.into_inner().expect("just sent Ok"))
399 }
400
401 #[cfg(test)]
402 pub(crate) fn abort(mut self) {
403 self.send_error(crate::Error::new_body_write_aborted());
404 }
405
406 pub(crate) fn send_error(&mut self, err: crate::Error) {
407 let _ = self
408 .data_tx
409 .clone()
411 .try_send(Err(err));
412 }
413}
414
415#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
416impl fmt::Debug for Sender {
417 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
418 #[derive(Debug)]
419 struct Open;
420 #[derive(Debug)]
421 struct Closed;
422
423 let mut builder = f.debug_tuple("Sender");
424 match self.want_rx.peek() {
425 watch::CLOSED => builder.field(&Closed),
426 _ => builder.field(&Open),
427 };
428
429 builder.finish()
430 }
431}
432
433#[cfg(test)]
434mod tests {
435 use std::mem;
436 use std::task::Poll;
437
438 use super::{Body, DecodedLength, Incoming, Sender, SizeHint};
439 use http_body_util::BodyExt;
440
441 #[test]
442 fn test_size_of() {
443 let body_size = mem::size_of::<Incoming>();
447 let body_expected_size = mem::size_of::<u64>() * 5;
448 assert!(
449 body_size <= body_expected_size,
450 "Body size = {} <= {}",
451 body_size,
452 body_expected_size,
453 );
454
455 assert_eq!(
458 mem::size_of::<Sender>(),
459 mem::size_of::<usize>() * 5,
460 "Sender"
461 );
462
463 assert_eq!(
464 mem::size_of::<Sender>(),
465 mem::size_of::<Option<Sender>>(),
466 "Option<Sender>"
467 );
468 }
469
470 #[test]
471 fn size_hint() {
472 fn eq(body: Incoming, b: SizeHint, note: &str) {
473 let a = body.size_hint();
474 assert_eq!(a.lower(), b.lower(), "lower for {:?}", note);
475 assert_eq!(a.upper(), b.upper(), "upper for {:?}", note);
476 }
477
478 eq(Incoming::empty(), SizeHint::with_exact(0), "empty");
479
480 eq(Incoming::channel().1, SizeHint::new(), "channel");
481
482 eq(
483 Incoming::new_channel(DecodedLength::new(4), false).1,
484 SizeHint::with_exact(4),
485 "channel with length",
486 );
487 }
488
489 #[cfg(not(miri))]
490 #[tokio::test]
491 async fn channel_abort() {
492 let (tx, mut rx) = Incoming::channel();
493
494 tx.abort();
495
496 let err = rx.frame().await.unwrap().unwrap_err();
497 assert!(err.is_body_write_aborted(), "{:?}", err);
498 }
499
500 #[cfg(all(not(miri), feature = "http1"))]
501 #[tokio::test]
502 async fn channel_abort_when_buffer_is_full() {
503 let (mut tx, mut rx) = Incoming::channel();
504
505 tx.try_send_data("chunk 1".into()).expect("send 1");
506 tx.abort();
508
509 let chunk1 = rx
510 .frame()
511 .await
512 .expect("item 1")
513 .expect("chunk 1")
514 .into_data()
515 .unwrap();
516 assert_eq!(chunk1, "chunk 1");
517
518 let err = rx.frame().await.unwrap().unwrap_err();
519 assert!(err.is_body_write_aborted(), "{:?}", err);
520 }
521
522 #[cfg(feature = "http1")]
523 #[test]
524 fn channel_buffers_one() {
525 let (mut tx, _rx) = Incoming::channel();
526
527 tx.try_send_data("chunk 1".into()).expect("send 1");
528
529 let chunk2 = tx.try_send_data("chunk 2".into()).expect_err("send 2");
531 assert_eq!(chunk2, "chunk 2");
532 }
533
534 #[cfg(not(miri))]
535 #[tokio::test]
536 async fn channel_empty() {
537 let (_, mut rx) = Incoming::channel();
538
539 assert!(rx.frame().await.is_none());
540 }
541
542 #[test]
543 fn channel_ready() {
544 let (mut tx, _rx) = Incoming::new_channel(DecodedLength::CHUNKED, false);
545
546 let mut tx_ready = tokio_test::task::spawn(tx.ready());
547
548 assert!(tx_ready.poll().is_ready(), "tx is ready immediately");
549 }
550
551 #[test]
552 #[cfg(not(miri))] fn channel_wanter() {
554 let (mut tx, mut rx) =
555 Incoming::new_channel(DecodedLength::CHUNKED, true);
556
557 let mut tx_ready = tokio_test::task::spawn(tx.ready());
558 let mut rx_data = tokio_test::task::spawn(rx.frame());
559
560 assert!(
561 tx_ready.poll().is_pending(),
562 "tx isn't ready before rx has been polled"
563 );
564
565 assert!(rx_data.poll().is_pending(), "poll rx.data");
566 assert!(tx_ready.is_woken(), "rx poll wakes tx");
567
568 assert!(
569 tx_ready.poll().is_ready(),
570 "tx is ready after rx has been polled"
571 );
572 }
573
574 #[test]
575 #[cfg(not(miri))] fn channel_notices_closure() {
577 let (mut tx, rx) = Incoming::new_channel(DecodedLength::CHUNKED, true);
578
579 let mut tx_ready = tokio_test::task::spawn(tx.ready());
580
581 assert!(
582 tx_ready.poll().is_pending(),
583 "tx isn't ready before rx has been polled"
584 );
585
586 drop(rx);
587 assert!(tx_ready.is_woken(), "dropping rx wakes tx");
588
589 match tx_ready.poll() {
590 Poll::Ready(Err(ref e)) if e.is_closed() => (),
591 unexpected => panic!("tx poll ready unexpected: {:?}", unexpected),
592 }
593 }
594}