1use core::{
2 marker::PhantomData,
3 ops::{Deref, DerefMut},
4 pin::Pin,
5 task::{Context, Poll},
6 time::Duration,
7};
8use std::io::{Error as IoError, ErrorKind as IoErrorKind};
9
10use async_sleep::Sleepble;
11use async_trait::async_trait;
12use futures_io::{AsyncRead, AsyncWrite};
13use http::{Request, Response};
14use http1_spec::{body_framing::BodyFraming, head_renderer::Head, ReasonPhrase};
15
16use crate::{
17 body::{DecoderBody, EncoderBody},
18 decoder::{Http1RequestDecoder, Http1ResponseDecoder},
19 encoder::{Http1RequestEncoder, Http1ResponseEncoder},
20};
21
22#[async_trait]
26pub trait Http1StreamDecoder<S, SLEEP, H>
27where
28 S: AsyncRead + Unpin,
29 SLEEP: Sleepble,
30 H: Head,
31{
32 async fn read_head(&mut self, stream: &mut S) -> Result<(H, BodyFraming), IoError>;
33 async fn read_body(&mut self, stream: &mut S) -> Result<DecoderBody, IoError>;
34
35 fn set_read_timeout(&mut self, dur: Duration);
36}
37
38#[async_trait]
39pub trait Http1StreamEncoder<S, SLEEP, H>
40where
41 S: AsyncWrite + Unpin,
42 SLEEP: Sleepble,
43 H: Head,
44{
45 async fn write_head(
46 &mut self,
47 stream: &mut S,
48 head: H,
49 body_framing: BodyFraming,
50 ) -> Result<(), IoError>;
51 async fn write_body(&mut self, stream: &mut S, body: EncoderBody) -> Result<(), IoError>;
52
53 fn set_write_timeout(&mut self, dur: Duration);
54}
55
56pub struct Http1Stream<S, SLEEP, D, DH, E, EH>
60where
61 S: AsyncRead + AsyncWrite + Unpin,
62 SLEEP: Sleepble,
63 D: Http1StreamDecoder<S, SLEEP, DH>,
64 DH: Head,
65 E: Http1StreamEncoder<S, SLEEP, EH>,
66 EH: Head,
67{
68 stream: S,
69 decoder: D,
70 encoder: E,
71 phantom: PhantomData<(SLEEP, DH, EH)>,
72}
73impl<S, SLEEP, D, DH, E, EH> Http1Stream<S, SLEEP, D, DH, E, EH>
74where
75 S: AsyncRead + AsyncWrite + Unpin,
76 SLEEP: Sleepble,
77 D: Http1StreamDecoder<S, SLEEP, DH>,
78 DH: Head,
79 E: Http1StreamEncoder<S, SLEEP, EH>,
80 EH: Head,
81{
82 pub(crate) fn new(stream: S, decoder: D, encoder: E) -> Self {
83 Self {
84 stream,
85 decoder,
86 encoder,
87 phantom: PhantomData,
88 }
89 }
90
91 pub fn set_write_timeout(&mut self, dur: Duration) {
93 self.encoder.set_write_timeout(dur)
94 }
95
96 pub fn set_read_timeout(&mut self, dur: Duration) {
97 self.decoder.set_read_timeout(dur)
98 }
99
100 pub async fn write_head(&mut self, head: EH, body_framing: BodyFraming) -> Result<(), IoError> {
102 self.encoder
103 .write_head(&mut self.stream, head, body_framing)
104 .await
105 }
106
107 pub async fn write_body(&mut self, body: EncoderBody) -> Result<(), IoError> {
108 self.encoder.write_body(&mut self.stream, body).await
109 }
110
111 pub async fn read_head(&mut self) -> Result<(DH, BodyFraming), IoError> {
113 self.decoder.read_head(&mut self.stream).await
114 }
115 pub async fn read_body(&mut self) -> Result<DecoderBody, IoError> {
116 self.decoder.read_body(&mut self.stream).await
117 }
118}
119
120pub type Http1ClientStreamInner<S, SLEEP> = Http1Stream<
124 S,
125 SLEEP,
126 Http1ResponseDecoder,
127 (Response<()>, ReasonPhrase),
128 Http1RequestEncoder,
129 Request<()>,
130>;
131pub struct Http1ClientStream<S, SLEEP>
132where
133 S: AsyncRead + AsyncWrite + Unpin + Send,
134 SLEEP: Sleepble,
135{
136 inner: Http1ClientStreamInner<S, SLEEP>,
137}
138impl<S, SLEEP> Deref for Http1ClientStream<S, SLEEP>
139where
140 S: AsyncRead + AsyncWrite + Unpin + Send,
141 SLEEP: Sleepble,
142{
143 type Target = Http1ClientStreamInner<S, SLEEP>;
144
145 fn deref(&self) -> &Http1ClientStreamInner<S, SLEEP> {
146 &self.inner
147 }
148}
149impl<S, SLEEP> DerefMut for Http1ClientStream<S, SLEEP>
150where
151 S: AsyncRead + AsyncWrite + Unpin + Send,
152 SLEEP: Sleepble,
153{
154 fn deref_mut(&mut self) -> &mut Http1ClientStreamInner<S, SLEEP> {
155 &mut self.inner
156 }
157}
158impl<S, SLEEP> Http1ClientStream<S, SLEEP>
159where
160 S: AsyncRead + AsyncWrite + Unpin + Send,
161 SLEEP: Sleepble,
162{
163 pub fn new(stream: S) -> Self {
164 Self::with(
165 stream,
166 Http1ResponseDecoder::new(8 * 1024, None),
167 Http1RequestEncoder::new(8 * 1024),
168 )
169 }
170 pub fn with(stream: S, decoder: Http1ResponseDecoder, encoder: Http1RequestEncoder) -> Self {
171 Self {
172 inner: Http1ClientStreamInner::new(stream, decoder, encoder),
173 }
174 }
175
176 pub fn get_ref(&self) -> &S {
177 &self.inner.stream
178 }
179 pub fn get_mut(&mut self) -> &mut S {
180 &mut self.inner.stream
181 }
182 pub fn into_inner(self) -> Result<S, IoError> {
183 if self.decoder.has_unparsed_bytes() {
184 return Err(IoError::new(IoErrorKind::Other, "has unparsed bytes"));
185 }
186 Ok(self.inner.stream)
187 }
188
189 pub async fn write_request(&mut self, request: Request<Vec<u8>>) -> Result<(), IoError> {
190 let (parts, body) = request.into_parts();
191 let head = Request::from_parts(parts, ());
192
193 let body_framing = BodyFraming::ContentLength(body.len());
194
195 self.write_head(head, body_framing.clone()).await?;
196 match body_framing {
197 BodyFraming::Neither => {}
198 BodyFraming::ContentLength(n) if n == 0 => {}
199 _ => {
200 self.write_body(EncoderBody::Completed(body)).await?;
201 }
202 }
203
204 Ok(())
205 }
206
207 pub async fn read_response(&mut self) -> Result<(Response<Vec<u8>>, ReasonPhrase), IoError> {
208 let ((response, reason_phrase), body_framing) = self.read_head().await?;
209
210 let mut body = Vec::new();
211 match body_framing {
212 BodyFraming::Neither => {}
213 BodyFraming::ContentLength(n) if n == 0 => {}
214 _ => loop {
215 match self.read_body().await? {
216 DecoderBody::Completed(bytes) => {
217 body.extend_from_slice(&bytes);
218 break;
219 }
220 DecoderBody::Partial(bytes) => {
221 body.extend_from_slice(&bytes);
222 }
223 }
224 },
225 }
226
227 let (parts, _) = response.into_parts();
228 let response = Response::from_parts(parts, body);
229
230 Ok((response, reason_phrase))
231 }
232}
233
234impl<S, SLEEP> AsyncRead for Http1ClientStream<S, SLEEP>
235where
236 S: AsyncRead + AsyncWrite + Unpin + Send,
237 SLEEP: Sleepble + Unpin,
238{
239 fn poll_read(
240 self: Pin<&mut Self>,
241 cx: &mut Context,
242 buf: &mut [u8],
243 ) -> Poll<Result<usize, IoError>> {
244 Pin::new(&mut self.get_mut().stream).poll_read(cx, buf)
245 }
246}
247
248impl<S, SLEEP> AsyncWrite for Http1ClientStream<S, SLEEP>
249where
250 S: AsyncRead + AsyncWrite + Unpin + Send,
251 SLEEP: Sleepble + Unpin,
252{
253 fn poll_write(
254 self: Pin<&mut Self>,
255 cx: &mut Context,
256 buf: &[u8],
257 ) -> Poll<Result<usize, IoError>> {
258 Pin::new(&mut self.get_mut().stream).poll_write(cx, buf)
259 }
260
261 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), IoError>> {
262 Pin::new(&mut self.get_mut().stream).poll_flush(cx)
263 }
264
265 fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), IoError>> {
266 Pin::new(&mut self.get_mut().stream).poll_close(cx)
267 }
268}
269
270pub type Http1ServerStreamInner<S, SLEEP> = Http1Stream<
274 S,
275 SLEEP,
276 Http1RequestDecoder,
277 Request<()>,
278 Http1ResponseEncoder,
279 (Response<()>, ReasonPhrase),
280>;
281pub struct Http1ServerStream<S, SLEEP>
282where
283 S: AsyncRead + AsyncWrite + Unpin + Send,
284 SLEEP: Sleepble,
285{
286 inner: Http1ServerStreamInner<S, SLEEP>,
287}
288impl<S, SLEEP> Deref for Http1ServerStream<S, SLEEP>
289where
290 S: AsyncRead + AsyncWrite + Unpin + Send,
291 SLEEP: Sleepble,
292{
293 type Target = Http1ServerStreamInner<S, SLEEP>;
294
295 fn deref(&self) -> &Http1ServerStreamInner<S, SLEEP> {
296 &self.inner
297 }
298}
299impl<S, SLEEP> DerefMut for Http1ServerStream<S, SLEEP>
300where
301 S: AsyncRead + AsyncWrite + Unpin + Send,
302 SLEEP: Sleepble,
303{
304 fn deref_mut(&mut self) -> &mut Http1ServerStreamInner<S, SLEEP> {
305 &mut self.inner
306 }
307}
308impl<S, SLEEP> Http1ServerStream<S, SLEEP>
309where
310 S: AsyncRead + AsyncWrite + Unpin + Send,
311 SLEEP: Sleepble,
312{
313 pub fn new(stream: S) -> Self {
314 Self::with(
315 stream,
316 Http1RequestDecoder::new(8 * 1024, None),
317 Http1ResponseEncoder::new(8 * 1024),
318 )
319 }
320 pub fn with(stream: S, decoder: Http1RequestDecoder, encoder: Http1ResponseEncoder) -> Self {
321 Self {
322 inner: Http1ServerStreamInner::new(stream, decoder, encoder),
323 }
324 }
325
326 pub fn get_ref(&self) -> &S {
327 &self.inner.stream
328 }
329 pub fn get_mut(&mut self) -> &mut S {
330 &mut self.inner.stream
331 }
332 pub fn into_inner(self) -> Result<S, IoError> {
333 if self.decoder.has_unparsed_bytes() {
334 return Err(IoError::new(IoErrorKind::Other, "has unparsed bytes"));
335 }
336 Ok(self.inner.stream)
337 }
338
339 pub async fn write_response(
340 &mut self,
341 response: Response<Vec<u8>>,
342 reason_phrase: ReasonPhrase,
343 ) -> Result<(), IoError> {
344 let (parts, body) = response.into_parts();
345 let head = Response::from_parts(parts, ());
346
347 let body_framing = BodyFraming::ContentLength(body.len());
348
349 self.write_head((head, reason_phrase), body_framing.clone())
350 .await?;
351
352 match body_framing {
353 BodyFraming::Neither => {}
354 BodyFraming::ContentLength(n) if n == 0 => {}
355 _ => {
356 self.write_body(EncoderBody::Completed(body)).await?;
357 }
358 }
359
360 Ok(())
361 }
362
363 pub async fn read_request(&mut self) -> Result<Request<Vec<u8>>, IoError> {
364 let (request, body_framing) = self.read_head().await?;
365
366 let mut body = Vec::new();
367 match body_framing {
368 BodyFraming::Neither => {}
369 BodyFraming::ContentLength(n) if n == 0 => {}
370 _ => loop {
371 match self.read_body().await? {
372 DecoderBody::Completed(bytes) => {
373 body.extend_from_slice(&bytes);
374 break;
375 }
376 DecoderBody::Partial(bytes) => {
377 body.extend_from_slice(&bytes);
378 }
379 }
380 },
381 }
382
383 let (parts, _) = request.into_parts();
384 let request = Request::from_parts(parts, body);
385
386 Ok(request)
387 }
388}
389
390impl<S, SLEEP> AsyncRead for Http1ServerStream<S, SLEEP>
391where
392 S: AsyncRead + AsyncWrite + Unpin + Send,
393 SLEEP: Sleepble + Unpin,
394{
395 fn poll_read(
396 self: Pin<&mut Self>,
397 cx: &mut Context,
398 buf: &mut [u8],
399 ) -> Poll<Result<usize, IoError>> {
400 Pin::new(&mut self.get_mut().stream).poll_read(cx, buf)
401 }
402}
403
404impl<S, SLEEP> AsyncWrite for Http1ServerStream<S, SLEEP>
405where
406 S: AsyncRead + AsyncWrite + Unpin + Send,
407 SLEEP: Sleepble + Unpin,
408{
409 fn poll_write(
410 self: Pin<&mut Self>,
411 cx: &mut Context,
412 buf: &[u8],
413 ) -> Poll<Result<usize, IoError>> {
414 Pin::new(&mut self.get_mut().stream).poll_write(cx, buf)
415 }
416
417 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), IoError>> {
418 Pin::new(&mut self.get_mut().stream).poll_flush(cx)
419 }
420
421 fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), IoError>> {
422 Pin::new(&mut self.get_mut().stream).poll_close(cx)
423 }
424}