http_body_io/
lib.rs

1mod body_writer;
2use std::num::NonZeroUsize;
3
4pub use body_writer::BodyWriter;
5
6mod body_reader;
7pub use body_reader::BodyReader;
8
9pub struct BodyIoError;
10
11/// Creates a new channel for sending and receiving body data.
12///
13/// The `bufsize` parameter is the maximum number of writes that can
14/// be buffered before the receiver must read some data. This means
15/// that the using a `BufWriter` may still be necessary to avoid
16/// excessive system calls.
17pub fn channel(bufsize: NonZeroUsize) -> (BodyWriter, BodyReader) {
18    let (tx, rx) = tokio::sync::mpsc::channel(bufsize.into());
19    (BodyWriter { sender: tx }, BodyReader { receiver: rx })
20}
21
22impl std::fmt::Display for BodyIoError {
23    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
24        write!(f, "BodyIoError")
25    }
26}
27
28impl std::fmt::Debug for BodyIoError {
29    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
30        write!(f, "BodyIoError")
31    }
32}
33
34impl std::error::Error for BodyIoError {}
35
36#[cfg(test)]
37mod tests {
38    use super::*;
39
40    const BUF_SIZE: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(1024) };
41
42    #[test]
43    fn test_body() {
44        use std::io::Write;
45        let (mut writer, _reader) = channel(BUF_SIZE);
46        writer.write_all(b"Hello, ").unwrap();
47    }
48
49    #[tokio::test]
50    async fn test_async_body() {
51        use futures::StreamExt;
52
53        use tokio::io::AsyncWriteExt;
54
55        let (mut writer, reader) = channel(BUF_SIZE);
56        writer.write_all(b"Hello, ").await.unwrap();
57        drop(writer);
58
59        let mut stream = http_body_util::BodyStream::new(reader);
60
61        let mut body = Vec::new();
62        while let Some(Ok(bytes)) = stream.next().await {
63            if let Some(bytes) = bytes.data_ref() {
64                body.extend_from_slice(bytes);
65            }
66        }
67
68        assert_eq!(body, b"Hello, ");
69    }
70
71    #[tokio::test]
72    async fn test_async_body_sync_write() {
73        use futures::StreamExt;
74
75        let (mut writer, reader) = channel(BUF_SIZE);
76
77        let writer_thread = std::thread::spawn(move || {
78            use std::io::Write;
79            writer.write_all(b"Hello, ").unwrap();
80        });
81
82        let mut stream = http_body_util::BodyStream::new(reader);
83
84        let mut body = Vec::new();
85        while let Some(Ok(bytes)) = stream.next().await {
86            if let Some(bytes) = bytes.data_ref() {
87                body.extend_from_slice(bytes);
88            }
89        }
90
91        assert_eq!(body, b"Hello, ");
92
93        writer_thread.join().unwrap();
94    }
95}