http/
stream.rs

1use std::{
2    io::{self, Read, Write},
3    net::TcpStream,
4    time::Duration,
5};
6
7#[cfg(feature = "tls")]
8use rustls::{ConnectionCommon, SideData};
9
10pub trait HttpStream: Read + Write {
11    fn set_non_blocking(&mut self) -> io::Result<()> {
12        Ok(())
13    }
14
15    fn set_blocking(&mut self, timeout: Duration) -> io::Result<()> {
16        let _ = timeout;
17        Ok(())
18    }
19}
20
21pub trait IntoHttpStream {
22    type Stream: HttpStream + 'static;
23    fn into_http_stream(self) -> Self::Stream;
24}
25
26#[derive(Debug)]
27pub struct StringStream {
28    input: Vec<u8>,
29    offset: usize,
30    output: Vec<u8>,
31}
32
33impl Read for StringStream {
34    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
35        let n = self.peek(buf)?;
36        self.offset += n;
37        Ok(n)
38    }
39
40    fn read_to_end(&mut self, buf: &mut Vec<u8>) -> io::Result<usize> {
41        const CHUNK_SIZE: usize = 1024;
42        let mut chunk: [u8; CHUNK_SIZE] = [0; CHUNK_SIZE];
43        let n = self.offset;
44        while self.read(&mut chunk)? > 0 {
45            buf.write_all(&chunk)?;
46        }
47        Ok(self.offset - n)
48    }
49}
50
51impl Write for StringStream {
52    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
53        self.output.extend_from_slice(buf);
54        Ok(buf.len())
55    }
56
57    fn flush(&mut self) -> io::Result<()> {
58        Ok(())
59    }
60}
61
62impl StringStream {
63    pub fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
64        let min = usize::min(buf.len(), self.input.len() - self.offset);
65        buf[..min].copy_from_slice(&self.input[self.offset..self.offset + min]);
66        Ok(min)
67    }
68}
69
70impl HttpStream for StringStream {}
71
72impl IntoHttpStream for String {
73    type Stream = StringStream;
74
75    fn into_http_stream(self) -> Self::Stream {
76        let src_vec = self.into_bytes();
77        StringStream {
78            input: src_vec,
79            offset: 0,
80            output: Vec::new(),
81        }
82    }
83}
84
85impl IntoHttpStream for &str {
86    type Stream = StringStream;
87
88    fn into_http_stream(self) -> Self::Stream {
89        self.to_string().into_http_stream()
90    }
91}
92
93impl<S: HttpStream + 'static> IntoHttpStream for S {
94    type Stream = S;
95
96    fn into_http_stream(self) -> Self::Stream {
97        self
98    }
99}
100
101impl HttpStream for TcpStream {
102    fn set_blocking(&mut self, timeout: Duration) -> io::Result<()> {
103        self.set_read_timeout(Some(timeout))
104    }
105
106    fn set_non_blocking(&mut self) -> io::Result<()> {
107        self.set_read_timeout(None)
108    }
109}
110
111pub struct DummyStream;
112
113impl Read for DummyStream {
114    fn read(&mut self, _buf: &mut [u8]) -> io::Result<usize> {
115        Ok(0)
116    }
117}
118
119impl Write for DummyStream {
120    fn write(&mut self, _buf: &[u8]) -> io::Result<usize> {
121        Ok(0)
122    }
123
124    fn flush(&mut self) -> io::Result<()> {
125        Ok(())
126    }
127}
128
129impl HttpStream for DummyStream {}
130
131pub fn dummy() -> Box<dyn HttpStream> {
132    Box::new(DummyStream)
133}
134
135#[cfg(feature = "tls")]
136impl<C, SD, S: HttpStream> HttpStream for rustls::StreamOwned<C, S>
137where
138    SD: SideData,
139    C: core::ops::DerefMut<Target = ConnectionCommon<SD>>,
140{
141    fn set_non_blocking(&mut self) -> io::Result<()> {
142        self.sock.set_non_blocking()
143    }
144
145    fn set_blocking(&mut self, timeout: Duration) -> io::Result<()> {
146        self.sock.set_blocking(timeout)
147    }
148}