async_http1_lite/
stream.rs

1use core::{
2    marker::PhantomData,
3    ops::{Deref, DerefMut},
4    pin::Pin,
5    task::{Context, Poll},
6    time::Duration,
7};
8use std::io::{Error as IoError, ErrorKind as IoErrorKind};
9
10use async_sleep::Sleepble;
11use async_trait::async_trait;
12use futures_io::{AsyncRead, AsyncWrite};
13use http::{Request, Response};
14use http1_spec::{body_framing::BodyFraming, head_renderer::Head, ReasonPhrase};
15
16use crate::{
17    body::{DecoderBody, EncoderBody},
18    decoder::{Http1RequestDecoder, Http1ResponseDecoder},
19    encoder::{Http1RequestEncoder, Http1ResponseEncoder},
20};
21
22//
23//
24//
25#[async_trait]
26pub trait Http1StreamDecoder<S, SLEEP, H>
27where
28    S: AsyncRead + Unpin,
29    SLEEP: Sleepble,
30    H: Head,
31{
32    async fn read_head(&mut self, stream: &mut S) -> Result<(H, BodyFraming), IoError>;
33    async fn read_body(&mut self, stream: &mut S) -> Result<DecoderBody, IoError>;
34
35    fn set_read_timeout(&mut self, dur: Duration);
36}
37
38#[async_trait]
39pub trait Http1StreamEncoder<S, SLEEP, H>
40where
41    S: AsyncWrite + Unpin,
42    SLEEP: Sleepble,
43    H: Head,
44{
45    async fn write_head(
46        &mut self,
47        stream: &mut S,
48        head: H,
49        body_framing: BodyFraming,
50    ) -> Result<(), IoError>;
51    async fn write_body(&mut self, stream: &mut S, body: EncoderBody) -> Result<(), IoError>;
52
53    fn set_write_timeout(&mut self, dur: Duration);
54}
55
56//
57//
58//
59pub struct Http1Stream<S, SLEEP, D, DH, E, EH>
60where
61    S: AsyncRead + AsyncWrite + Unpin,
62    SLEEP: Sleepble,
63    D: Http1StreamDecoder<S, SLEEP, DH>,
64    DH: Head,
65    E: Http1StreamEncoder<S, SLEEP, EH>,
66    EH: Head,
67{
68    stream: S,
69    decoder: D,
70    encoder: E,
71    phantom: PhantomData<(SLEEP, DH, EH)>,
72}
73impl<S, SLEEP, D, DH, E, EH> Http1Stream<S, SLEEP, D, DH, E, EH>
74where
75    S: AsyncRead + AsyncWrite + Unpin,
76    SLEEP: Sleepble,
77    D: Http1StreamDecoder<S, SLEEP, DH>,
78    DH: Head,
79    E: Http1StreamEncoder<S, SLEEP, EH>,
80    EH: Head,
81{
82    pub(crate) fn new(stream: S, decoder: D, encoder: E) -> Self {
83        Self {
84            stream,
85            decoder,
86            encoder,
87            phantom: PhantomData,
88        }
89    }
90
91    //
92    pub fn set_write_timeout(&mut self, dur: Duration) {
93        self.encoder.set_write_timeout(dur)
94    }
95
96    pub fn set_read_timeout(&mut self, dur: Duration) {
97        self.decoder.set_read_timeout(dur)
98    }
99
100    //
101    pub async fn write_head(&mut self, head: EH, body_framing: BodyFraming) -> Result<(), IoError> {
102        self.encoder
103            .write_head(&mut self.stream, head, body_framing)
104            .await
105    }
106
107    pub async fn write_body(&mut self, body: EncoderBody) -> Result<(), IoError> {
108        self.encoder.write_body(&mut self.stream, body).await
109    }
110
111    //
112    pub async fn read_head(&mut self) -> Result<(DH, BodyFraming), IoError> {
113        self.decoder.read_head(&mut self.stream).await
114    }
115    pub async fn read_body(&mut self) -> Result<DecoderBody, IoError> {
116        self.decoder.read_body(&mut self.stream).await
117    }
118}
119
120//
121//
122//
123pub type Http1ClientStreamInner<S, SLEEP> = Http1Stream<
124    S,
125    SLEEP,
126    Http1ResponseDecoder,
127    (Response<()>, ReasonPhrase),
128    Http1RequestEncoder,
129    Request<()>,
130>;
131pub struct Http1ClientStream<S, SLEEP>
132where
133    S: AsyncRead + AsyncWrite + Unpin + Send,
134    SLEEP: Sleepble,
135{
136    inner: Http1ClientStreamInner<S, SLEEP>,
137}
138impl<S, SLEEP> Deref for Http1ClientStream<S, SLEEP>
139where
140    S: AsyncRead + AsyncWrite + Unpin + Send,
141    SLEEP: Sleepble,
142{
143    type Target = Http1ClientStreamInner<S, SLEEP>;
144
145    fn deref(&self) -> &Http1ClientStreamInner<S, SLEEP> {
146        &self.inner
147    }
148}
149impl<S, SLEEP> DerefMut for Http1ClientStream<S, SLEEP>
150where
151    S: AsyncRead + AsyncWrite + Unpin + Send,
152    SLEEP: Sleepble,
153{
154    fn deref_mut(&mut self) -> &mut Http1ClientStreamInner<S, SLEEP> {
155        &mut self.inner
156    }
157}
158impl<S, SLEEP> Http1ClientStream<S, SLEEP>
159where
160    S: AsyncRead + AsyncWrite + Unpin + Send,
161    SLEEP: Sleepble,
162{
163    pub fn new(stream: S) -> Self {
164        Self::with(
165            stream,
166            Http1ResponseDecoder::new(8 * 1024, None),
167            Http1RequestEncoder::new(8 * 1024),
168        )
169    }
170    pub fn with(stream: S, decoder: Http1ResponseDecoder, encoder: Http1RequestEncoder) -> Self {
171        Self {
172            inner: Http1ClientStreamInner::new(stream, decoder, encoder),
173        }
174    }
175
176    pub fn get_ref(&self) -> &S {
177        &self.inner.stream
178    }
179    pub fn get_mut(&mut self) -> &mut S {
180        &mut self.inner.stream
181    }
182    pub fn into_inner(self) -> Result<S, IoError> {
183        if self.decoder.has_unparsed_bytes() {
184            return Err(IoError::new(IoErrorKind::Other, "has unparsed bytes"));
185        }
186        Ok(self.inner.stream)
187    }
188
189    pub async fn write_request(&mut self, request: Request<Vec<u8>>) -> Result<(), IoError> {
190        let (parts, body) = request.into_parts();
191        let head = Request::from_parts(parts, ());
192
193        let body_framing = BodyFraming::ContentLength(body.len());
194
195        self.write_head(head, body_framing.clone()).await?;
196        match body_framing {
197            BodyFraming::Neither => {}
198            BodyFraming::ContentLength(n) if n == 0 => {}
199            _ => {
200                self.write_body(EncoderBody::Completed(body)).await?;
201            }
202        }
203
204        Ok(())
205    }
206
207    pub async fn read_response(&mut self) -> Result<(Response<Vec<u8>>, ReasonPhrase), IoError> {
208        let ((response, reason_phrase), body_framing) = self.read_head().await?;
209
210        let mut body = Vec::new();
211        match body_framing {
212            BodyFraming::Neither => {}
213            BodyFraming::ContentLength(n) if n == 0 => {}
214            _ => loop {
215                match self.read_body().await? {
216                    DecoderBody::Completed(bytes) => {
217                        body.extend_from_slice(&bytes);
218                        break;
219                    }
220                    DecoderBody::Partial(bytes) => {
221                        body.extend_from_slice(&bytes);
222                    }
223                }
224            },
225        }
226
227        let (parts, _) = response.into_parts();
228        let response = Response::from_parts(parts, body);
229
230        Ok((response, reason_phrase))
231    }
232}
233
234impl<S, SLEEP> AsyncRead for Http1ClientStream<S, SLEEP>
235where
236    S: AsyncRead + AsyncWrite + Unpin + Send,
237    SLEEP: Sleepble + Unpin,
238{
239    fn poll_read(
240        self: Pin<&mut Self>,
241        cx: &mut Context,
242        buf: &mut [u8],
243    ) -> Poll<Result<usize, IoError>> {
244        Pin::new(&mut self.get_mut().stream).poll_read(cx, buf)
245    }
246}
247
248impl<S, SLEEP> AsyncWrite for Http1ClientStream<S, SLEEP>
249where
250    S: AsyncRead + AsyncWrite + Unpin + Send,
251    SLEEP: Sleepble + Unpin,
252{
253    fn poll_write(
254        self: Pin<&mut Self>,
255        cx: &mut Context,
256        buf: &[u8],
257    ) -> Poll<Result<usize, IoError>> {
258        Pin::new(&mut self.get_mut().stream).poll_write(cx, buf)
259    }
260
261    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), IoError>> {
262        Pin::new(&mut self.get_mut().stream).poll_flush(cx)
263    }
264
265    fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), IoError>> {
266        Pin::new(&mut self.get_mut().stream).poll_close(cx)
267    }
268}
269
270//
271//
272//
273pub type Http1ServerStreamInner<S, SLEEP> = Http1Stream<
274    S,
275    SLEEP,
276    Http1RequestDecoder,
277    Request<()>,
278    Http1ResponseEncoder,
279    (Response<()>, ReasonPhrase),
280>;
281pub struct Http1ServerStream<S, SLEEP>
282where
283    S: AsyncRead + AsyncWrite + Unpin + Send,
284    SLEEP: Sleepble,
285{
286    inner: Http1ServerStreamInner<S, SLEEP>,
287}
288impl<S, SLEEP> Deref for Http1ServerStream<S, SLEEP>
289where
290    S: AsyncRead + AsyncWrite + Unpin + Send,
291    SLEEP: Sleepble,
292{
293    type Target = Http1ServerStreamInner<S, SLEEP>;
294
295    fn deref(&self) -> &Http1ServerStreamInner<S, SLEEP> {
296        &self.inner
297    }
298}
299impl<S, SLEEP> DerefMut for Http1ServerStream<S, SLEEP>
300where
301    S: AsyncRead + AsyncWrite + Unpin + Send,
302    SLEEP: Sleepble,
303{
304    fn deref_mut(&mut self) -> &mut Http1ServerStreamInner<S, SLEEP> {
305        &mut self.inner
306    }
307}
308impl<S, SLEEP> Http1ServerStream<S, SLEEP>
309where
310    S: AsyncRead + AsyncWrite + Unpin + Send,
311    SLEEP: Sleepble,
312{
313    pub fn new(stream: S) -> Self {
314        Self::with(
315            stream,
316            Http1RequestDecoder::new(8 * 1024, None),
317            Http1ResponseEncoder::new(8 * 1024),
318        )
319    }
320    pub fn with(stream: S, decoder: Http1RequestDecoder, encoder: Http1ResponseEncoder) -> Self {
321        Self {
322            inner: Http1ServerStreamInner::new(stream, decoder, encoder),
323        }
324    }
325
326    pub fn get_ref(&self) -> &S {
327        &self.inner.stream
328    }
329    pub fn get_mut(&mut self) -> &mut S {
330        &mut self.inner.stream
331    }
332    pub fn into_inner(self) -> Result<S, IoError> {
333        if self.decoder.has_unparsed_bytes() {
334            return Err(IoError::new(IoErrorKind::Other, "has unparsed bytes"));
335        }
336        Ok(self.inner.stream)
337    }
338
339    pub async fn write_response(
340        &mut self,
341        response: Response<Vec<u8>>,
342        reason_phrase: ReasonPhrase,
343    ) -> Result<(), IoError> {
344        let (parts, body) = response.into_parts();
345        let head = Response::from_parts(parts, ());
346
347        let body_framing = BodyFraming::ContentLength(body.len());
348
349        self.write_head((head, reason_phrase), body_framing.clone())
350            .await?;
351
352        match body_framing {
353            BodyFraming::Neither => {}
354            BodyFraming::ContentLength(n) if n == 0 => {}
355            _ => {
356                self.write_body(EncoderBody::Completed(body)).await?;
357            }
358        }
359
360        Ok(())
361    }
362
363    pub async fn read_request(&mut self) -> Result<Request<Vec<u8>>, IoError> {
364        let (request, body_framing) = self.read_head().await?;
365
366        let mut body = Vec::new();
367        match body_framing {
368            BodyFraming::Neither => {}
369            BodyFraming::ContentLength(n) if n == 0 => {}
370            _ => loop {
371                match self.read_body().await? {
372                    DecoderBody::Completed(bytes) => {
373                        body.extend_from_slice(&bytes);
374                        break;
375                    }
376                    DecoderBody::Partial(bytes) => {
377                        body.extend_from_slice(&bytes);
378                    }
379                }
380            },
381        }
382
383        let (parts, _) = request.into_parts();
384        let request = Request::from_parts(parts, body);
385
386        Ok(request)
387    }
388}
389
390impl<S, SLEEP> AsyncRead for Http1ServerStream<S, SLEEP>
391where
392    S: AsyncRead + AsyncWrite + Unpin + Send,
393    SLEEP: Sleepble + Unpin,
394{
395    fn poll_read(
396        self: Pin<&mut Self>,
397        cx: &mut Context,
398        buf: &mut [u8],
399    ) -> Poll<Result<usize, IoError>> {
400        Pin::new(&mut self.get_mut().stream).poll_read(cx, buf)
401    }
402}
403
404impl<S, SLEEP> AsyncWrite for Http1ServerStream<S, SLEEP>
405where
406    S: AsyncRead + AsyncWrite + Unpin + Send,
407    SLEEP: Sleepble + Unpin,
408{
409    fn poll_write(
410        self: Pin<&mut Self>,
411        cx: &mut Context,
412        buf: &[u8],
413    ) -> Poll<Result<usize, IoError>> {
414        Pin::new(&mut self.get_mut().stream).poll_write(cx, buf)
415    }
416
417    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), IoError>> {
418        Pin::new(&mut self.get_mut().stream).poll_flush(cx)
419    }
420
421    fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), IoError>> {
422        Pin::new(&mut self.get_mut().stream).poll_close(cx)
423    }
424}