simple_hyper_client/blocking/
body.rs

1/* Copyright (c) Fortanix, Inc.
2 *
3 * This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
6
7use super::client::KeepClientAlive;
8
9use hyper::body::{Buf, Bytes};
10use hyper::Body as HyperBody;
11use tokio::sync::mpsc;
12use tokio_stream::StreamExt;
13
14use std::future::Future;
15use std::{fmt, io};
16
17/// A body type for HTTP responses that implement `std::io::Read`
18pub struct Body {
19    pub(super) keep_client_alive: KeepClientAlive,
20    bytes: Bytes,
21    rx: mpsc::Receiver<io::Result<Bytes>>,
22}
23
24impl fmt::Debug for Body {
25    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
26        f.debug_struct("Body").finish()
27    }
28}
29
30impl Body {
31    pub(super) fn new(
32        mut hyper_body: HyperBody,
33    ) -> (impl Future<Output = ()> + Send + 'static, Self) {
34        let (tx, rx) = mpsc::channel(1);
35        let fut = async move {
36            loop {
37                tokio::select! {
38                    _ = tx.closed() => {
39                        break; // body has been dropped.
40                    }
41                    res = hyper_body.next() => {
42                        let res = match res {
43                            None => break, // EOF
44                            Some(Ok(chunk)) if chunk.is_empty() => continue,
45                            Some(Ok(chunk)) => Ok(chunk),
46                            Some(Err(e)) => Err(io::Error::new(io::ErrorKind::Other, e)),
47                        };
48                        if let Err(_) = tx.send(res).await {
49                            break; // body has been dropped.
50                        }
51                    }
52                }
53            }
54        };
55        let body = Body {
56            keep_client_alive: KeepClientAlive::empty(),
57            bytes: Bytes::new(),
58            rx,
59        };
60        (fut, body)
61    }
62}
63
64impl io::Read for Body {
65    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
66        if self.bytes.is_empty() {
67            match self.rx.blocking_recv() {
68                Some(Ok(bytes)) => {
69                    self.bytes = bytes;
70                }
71                Some(Err(e)) => return Err(e),
72                None => return Ok(0),
73            }
74        }
75        (&mut self.bytes).reader().read(buf)
76    }
77}
78
79#[cfg(test)]
80mod tests {
81    use super::*;
82    use hyper::Body as HyperBody;
83    use std::future::Future;
84    use std::io::{self, Read};
85    use std::thread;
86    use tokio::time::{self, Duration};
87
88    fn run_future<F: Future<Output = ()> + Send + 'static>(fut: F) {
89        thread::spawn(move || {
90            let rt = tokio::runtime::Builder::new_current_thread()
91                .enable_all()
92                .build()
93                .unwrap();
94            rt.block_on(fut);
95        });
96    }
97
98    #[test]
99    fn single_chunk() {
100        let body = HyperBody::from("hello, world!");
101        let (fut, mut reader) = Body::new(body);
102        run_future(fut);
103
104        let mut bytes = Vec::<u8>::new();
105        reader.read_to_end(&mut bytes).unwrap();
106        assert_eq!(bytes, b"hello, world!");
107    }
108
109    #[test]
110    fn multiple_chunks() {
111        let (mut sender, body) = HyperBody::channel();
112        let (fut, mut reader) = Body::new(body);
113
114        run_future(async move {
115            let h = tokio::spawn(fut);
116
117            sender.send_data("hello".into()).await.unwrap();
118            time::sleep(Duration::from_millis(10)).await;
119            sender.send_data(", ".into()).await.unwrap();
120            sender.send_data("world!".into()).await.unwrap();
121
122            drop(sender);
123            h.await.unwrap();
124        });
125
126        let mut bytes = Vec::<u8>::new();
127        reader.read_to_end(&mut bytes).unwrap();
128        assert_eq!(bytes, b"hello, world!");
129    }
130
131    #[test]
132    fn with_empty_chunk() {
133        let (mut sender, body) = HyperBody::channel();
134        let (fut, mut reader) = Body::new(body);
135
136        run_future(async move {
137            let h = tokio::spawn(fut);
138
139            sender.send_data("hello".into()).await.unwrap();
140            time::sleep(Duration::from_millis(10)).await;
141            sender.send_data("".into()).await.unwrap();
142            sender.send_data(", world!".into()).await.unwrap();
143
144            drop(sender);
145            h.await.unwrap();
146        });
147
148        let mut bytes = Vec::<u8>::new();
149        reader.read_to_end(&mut bytes).unwrap();
150        assert_eq!(bytes, b"hello, world!");
151    }
152
153    #[test]
154    fn hyper_error() {
155        let chunks: Vec<Result<_, io::Error>> = vec![
156            Ok("hello"),
157            Ok(" "),
158            Ok("world"),
159            Err(io::ErrorKind::BrokenPipe.into()),
160        ];
161        let stream = futures_util::stream::iter(chunks);
162        let body = HyperBody::wrap_stream(stream);
163        let (fut, mut reader) = Body::new(body);
164
165        run_future(fut);
166
167        let mut bytes = Vec::<u8>::new();
168        let err = reader.read_to_end(&mut bytes).unwrap_err();
169        assert_eq!(err.kind(), io::ErrorKind::Other);
170        assert_eq!(bytes, b"hello world");
171
172        let mut buf = [0u8; 8];
173        let n = reader.read(&mut buf).unwrap();
174        assert_eq!(n, 0);
175    }
176}