simple_hyper_client/blocking/
body.rs1use super::client::KeepClientAlive;
8
9use hyper::body::{Buf, Bytes};
10use hyper::Body as HyperBody;
11use tokio::sync::mpsc;
12use tokio_stream::StreamExt;
13
14use std::future::Future;
15use std::{fmt, io};
16
17pub struct Body {
19 pub(super) keep_client_alive: KeepClientAlive,
20 bytes: Bytes,
21 rx: mpsc::Receiver<io::Result<Bytes>>,
22}
23
24impl fmt::Debug for Body {
25 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
26 f.debug_struct("Body").finish()
27 }
28}
29
30impl Body {
31 pub(super) fn new(
32 mut hyper_body: HyperBody,
33 ) -> (impl Future<Output = ()> + Send + 'static, Self) {
34 let (tx, rx) = mpsc::channel(1);
35 let fut = async move {
36 loop {
37 tokio::select! {
38 _ = tx.closed() => {
39 break; }
41 res = hyper_body.next() => {
42 let res = match res {
43 None => break, Some(Ok(chunk)) if chunk.is_empty() => continue,
45 Some(Ok(chunk)) => Ok(chunk),
46 Some(Err(e)) => Err(io::Error::new(io::ErrorKind::Other, e)),
47 };
48 if let Err(_) = tx.send(res).await {
49 break; }
51 }
52 }
53 }
54 };
55 let body = Body {
56 keep_client_alive: KeepClientAlive::empty(),
57 bytes: Bytes::new(),
58 rx,
59 };
60 (fut, body)
61 }
62}
63
64impl io::Read for Body {
65 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
66 if self.bytes.is_empty() {
67 match self.rx.blocking_recv() {
68 Some(Ok(bytes)) => {
69 self.bytes = bytes;
70 }
71 Some(Err(e)) => return Err(e),
72 None => return Ok(0),
73 }
74 }
75 (&mut self.bytes).reader().read(buf)
76 }
77}
78
79#[cfg(test)]
80mod tests {
81 use super::*;
82 use hyper::Body as HyperBody;
83 use std::future::Future;
84 use std::io::{self, Read};
85 use std::thread;
86 use tokio::time::{self, Duration};
87
88 fn run_future<F: Future<Output = ()> + Send + 'static>(fut: F) {
89 thread::spawn(move || {
90 let rt = tokio::runtime::Builder::new_current_thread()
91 .enable_all()
92 .build()
93 .unwrap();
94 rt.block_on(fut);
95 });
96 }
97
98 #[test]
99 fn single_chunk() {
100 let body = HyperBody::from("hello, world!");
101 let (fut, mut reader) = Body::new(body);
102 run_future(fut);
103
104 let mut bytes = Vec::<u8>::new();
105 reader.read_to_end(&mut bytes).unwrap();
106 assert_eq!(bytes, b"hello, world!");
107 }
108
109 #[test]
110 fn multiple_chunks() {
111 let (mut sender, body) = HyperBody::channel();
112 let (fut, mut reader) = Body::new(body);
113
114 run_future(async move {
115 let h = tokio::spawn(fut);
116
117 sender.send_data("hello".into()).await.unwrap();
118 time::sleep(Duration::from_millis(10)).await;
119 sender.send_data(", ".into()).await.unwrap();
120 sender.send_data("world!".into()).await.unwrap();
121
122 drop(sender);
123 h.await.unwrap();
124 });
125
126 let mut bytes = Vec::<u8>::new();
127 reader.read_to_end(&mut bytes).unwrap();
128 assert_eq!(bytes, b"hello, world!");
129 }
130
131 #[test]
132 fn with_empty_chunk() {
133 let (mut sender, body) = HyperBody::channel();
134 let (fut, mut reader) = Body::new(body);
135
136 run_future(async move {
137 let h = tokio::spawn(fut);
138
139 sender.send_data("hello".into()).await.unwrap();
140 time::sleep(Duration::from_millis(10)).await;
141 sender.send_data("".into()).await.unwrap();
142 sender.send_data(", world!".into()).await.unwrap();
143
144 drop(sender);
145 h.await.unwrap();
146 });
147
148 let mut bytes = Vec::<u8>::new();
149 reader.read_to_end(&mut bytes).unwrap();
150 assert_eq!(bytes, b"hello, world!");
151 }
152
153 #[test]
154 fn hyper_error() {
155 let chunks: Vec<Result<_, io::Error>> = vec![
156 Ok("hello"),
157 Ok(" "),
158 Ok("world"),
159 Err(io::ErrorKind::BrokenPipe.into()),
160 ];
161 let stream = futures_util::stream::iter(chunks);
162 let body = HyperBody::wrap_stream(stream);
163 let (fut, mut reader) = Body::new(body);
164
165 run_future(fut);
166
167 let mut bytes = Vec::<u8>::new();
168 let err = reader.read_to_end(&mut bytes).unwrap_err();
169 assert_eq!(err.kind(), io::ErrorKind::Other);
170 assert_eq!(bytes, b"hello world");
171
172 let mut buf = [0u8; 8];
173 let n = reader.read(&mut buf).unwrap();
174 assert_eq!(n, 0);
175 }
176}