1mod body_writer;
2use std::num::NonZeroUsize;
3
4pub use body_writer::BodyWriter;
5
6mod body_reader;
7pub use body_reader::BodyReader;
8
9pub struct BodyIoError;
10
11pub fn channel(bufsize: NonZeroUsize) -> (BodyWriter, BodyReader) {
18 let (tx, rx) = tokio::sync::mpsc::channel(bufsize.into());
19 (BodyWriter { sender: tx }, BodyReader { receiver: rx })
20}
21
22impl std::fmt::Display for BodyIoError {
23 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
24 write!(f, "BodyIoError")
25 }
26}
27
28impl std::fmt::Debug for BodyIoError {
29 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
30 write!(f, "BodyIoError")
31 }
32}
33
34impl std::error::Error for BodyIoError {}
35
36#[cfg(test)]
37mod tests {
38 use super::*;
39
40 const BUF_SIZE: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(1024) };
41
42 #[test]
43 fn test_body() {
44 use std::io::Write;
45 let (mut writer, _reader) = channel(BUF_SIZE);
46 writer.write_all(b"Hello, ").unwrap();
47 }
48
49 #[tokio::test]
50 async fn test_async_body() {
51 use futures::StreamExt;
52
53 use tokio::io::AsyncWriteExt;
54
55 let (mut writer, reader) = channel(BUF_SIZE);
56 writer.write_all(b"Hello, ").await.unwrap();
57 drop(writer);
58
59 let mut stream = http_body_util::BodyStream::new(reader);
60
61 let mut body = Vec::new();
62 while let Some(Ok(bytes)) = stream.next().await {
63 if let Some(bytes) = bytes.data_ref() {
64 body.extend_from_slice(bytes);
65 }
66 }
67
68 assert_eq!(body, b"Hello, ");
69 }
70
71 #[tokio::test]
72 async fn test_async_body_sync_write() {
73 use futures::StreamExt;
74
75 let (mut writer, reader) = channel(BUF_SIZE);
76
77 let writer_thread = std::thread::spawn(move || {
78 use std::io::Write;
79 writer.write_all(b"Hello, ").unwrap();
80 });
81
82 let mut stream = http_body_util::BodyStream::new(reader);
83
84 let mut body = Vec::new();
85 while let Some(Ok(bytes)) = stream.next().await {
86 if let Some(bytes) = bytes.data_ref() {
87 body.extend_from_slice(bytes);
88 }
89 }
90
91 assert_eq!(body, b"Hello, ");
92
93 writer_thread.join().unwrap();
94 }
95}