1use crate::buf_reader::BufIo;
62use crate::err_closed;
63use crate::fast_buf::FastBuf;
64use crate::http11::{poll_for_crlfcrlf, try_parse_res, write_http1x_req, READ_BUF_INIT_SIZE};
65use crate::limit::{allow_reuse, headers_indicate_body};
66use crate::limit::{LimitRead, LimitWrite};
67use crate::mpsc::{Receiver, Sender};
68use crate::Error;
69use crate::{AsyncRead, AsyncWrite};
70use crate::{RecvStream, SendStream};
71use futures_util::ready;
72use std::fmt;
73use std::future::Future;
74use std::io;
75use std::pin::Pin;
76use std::task::{Context, Poll};
77
78const MAX_REQUEST_SIZE: usize = 8192;
80
81const MAX_BODY_READ_SIZE: u64 = 8 * 1024 * 1024;
83
84pub fn handshake<S>(io: S) -> (SendRequest, Connection<S>)
91where
92 S: AsyncRead + AsyncWrite + Unpin,
93{
94 let (req_tx, req_rx) = Receiver::new(100);
95
96 let send_req = SendRequest::new(req_tx);
97
98 let conn = Connection(Codec::new(io, req_rx));
99
100 (send_req, conn)
101}
102
103#[derive(Clone)]
107pub struct SendRequest {
108 req_tx: Sender<Handle>,
109}
110
111impl SendRequest {
112 fn new(req_tx: Sender<Handle>) -> Self {
113 SendRequest { req_tx }
114 }
115
116 pub fn send_request(
126 &mut self,
127 req: http::Request<()>,
128 no_body: bool,
129 ) -> Result<(ResponseFuture, SendStream), Error> {
130 if req.method() == http::Method::CONNECT {
131 return Err(Error::User("hreq-h1 does not support CONNECT".into()));
132 }
133
134 trace!("Send request: {:?}", req);
135
136 let (res_tx, res_rx) = Receiver::new(1);
138
139 let (body_tx, body_rx) = Receiver::new(1);
141
142 let limit = LimitWrite::from_headers(req.headers());
143
144 let no_send_body = no_body || limit.is_no_body();
145
146 let body_rx = if no_send_body { None } else { Some(body_rx) };
148
149 let next = Handle {
151 req,
152 body_rx,
153 res_tx: Some(res_tx),
154 };
155
156 if !self.req_tx.send(next) {
157 return err_closed("Can't enqueue request, connection is closed");
159 }
160
161 let fut = ResponseFuture(res_rx);
162 let send = SendStream::new(body_tx, limit, no_send_body, None);
163
164 Ok((fut, send))
165 }
166}
167
168struct Handle {
172 req: http::Request<()>,
173 body_rx: Option<Receiver<(Vec<u8>, bool)>>,
174 res_tx: Option<Sender<io::Result<http::Response<RecvStream>>>>,
175}
176
177pub struct ResponseFuture(Receiver<io::Result<http::Response<RecvStream>>>);
179
180impl Future for ResponseFuture {
181 type Output = Result<http::Response<RecvStream>, Error>;
182
183 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
184 let this = self.get_mut();
185
186 let res = ready!(Pin::new(&this.0).poll_recv(cx, true));
187
188 if let Some(v) = res {
189 let v = v?;
191
192 Ok(v).into()
193 } else {
194 err_closed("Response failed, connection is closed").into()
195 }
196 }
197}
198
199pub struct Connection<S>(Codec<S>);
203
204impl<S> Future for Connection<S>
205where
206 S: AsyncRead + AsyncWrite + Unpin,
207{
208 type Output = io::Result<()>;
209
210 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
211 let this = self.get_mut();
212 this.0.poll_client(cx)
213 }
214}
215
216#[allow(clippy::large_enum_variant)]
217enum State {
218 SendReq(SendReq),
220 RecvRes(Bidirect),
222 RecvBody(BodyReceiver),
224}
225
226impl State {
227 fn try_forward_error(&mut self, e: io::Error) -> io::Error {
228 match self {
229 State::SendReq(_) => e,
230 State::RecvRes(h) => {
231 if let Some(res_tx) = &mut h.handle.res_tx {
232 let c = clone_error(&e);
233 res_tx.send(Err(e));
234 c
235 } else {
236 e
237 }
238 }
239 State::RecvBody(h) => {
240 let c = clone_error(&e);
241 h.body_tx.send(Err(e));
242 c
243 }
244 }
245 }
246}
247
248fn clone_error(e: &io::Error) -> io::Error {
249 io::Error::new(e.kind(), e.to_string())
250}
251
252struct Codec<S> {
253 io: BufIo<S>,
254 state: State,
255 req_rx: Receiver<Handle>,
256}
257
258impl<S> Codec<S>
259where
260 S: AsyncRead + AsyncWrite + Unpin,
261{
262 fn new(io: S, req_rx: Receiver<Handle>) -> Self {
263 trace!("=> SendReq");
264 Codec {
265 io: BufIo::with_capacity(READ_BUF_INIT_SIZE, io),
266 state: State::SendReq(SendReq),
267 req_rx,
268 }
269 }
270
271 fn poll_client(&mut self, cx: &mut Context) -> Poll<Result<(), io::Error>> {
272 match self.drive(cx) {
274 Poll::Ready(Err(e)) => {
275 debug!("Close on error: {:?}", e);
276
277 let e = self.state.try_forward_error(e);
281
282 trace!("{:?} => Closed", self.state);
283
284 Err(e).into()
285 }
286 r => r,
287 }
288 }
289
290 fn drive(&mut self, cx: &mut Context) -> Poll<Result<(), io::Error>> {
291 loop {
292 ready!(Pin::new(&mut self.io).poll_finish_pending_write(cx))?;
293
294 match &mut self.state {
295 State::SendReq(h) => {
296 let next_state = ready!(h.poll_send_req(cx, &mut self.io, &self.req_rx))?;
297
298 if let Some(next_state) = next_state {
299 trace!("SendReq => {:?}", next_state);
300 self.state = next_state;
301 } else {
302 return Ok(()).into();
304 }
305 }
306 State::RecvRes(h) => {
307 let next_state = ready!(h.poll_bidirect(cx, &mut self.io))?;
308
309 if let Some(next_state) = next_state {
310 trace!("RecvRes => {:?}", next_state);
311 self.state = next_state;
312 } else {
313 return Ok(()).into();
315 }
316 }
317 State::RecvBody(h) => {
318 let next_state = ready!(h.poll_read_body(cx, &mut self.io))?;
319
320 if let Some(next_state) = next_state {
321 trace!("RecvBody => {:?}", next_state);
322 self.state = next_state;
323 } else {
324 return Ok(()).into();
326 }
327 }
328 }
329 }
330 }
331}
332
333struct SendReq;
334
335impl SendReq {
336 fn poll_send_req<S>(
337 &mut self,
338 cx: &mut Context,
339 io: &mut BufIo<S>,
340 req_rx: &Receiver<Handle>,
341 ) -> Poll<io::Result<Option<State>>>
342 where
343 S: AsyncRead + AsyncWrite + Unpin,
344 {
345 let handle = match ready!(Pin::new(req_rx).poll_recv(cx, true)) {
346 Some(v) => v,
347 None => {
348 return Ok(None).into();
349 }
350 };
351
352 let mut buf = FastBuf::with_capacity(MAX_REQUEST_SIZE);
353
354 let mut write_to = buf.borrow();
355
356 let amount = write_http1x_req(&handle.req, &mut write_to)?;
357
358 unsafe {
361 write_to.extend(amount);
362 }
363
364 assert!(io.can_poll_write());
366
367 let mut to_send = Some(&buf[..]);
368
369 match Pin::new(io).poll_write_all(cx, &mut to_send, true) {
370 Poll::Pending => {
371 assert!(to_send.is_none());
373 }
376 Poll::Ready(v) => v?,
377 }
378
379 let next_state = State::RecvRes(Bidirect {
380 handle,
381 response_allows_reuse: false, holder: None,
383 });
384
385 Ok(Some(next_state)).into()
386 }
387}
388
389struct Bidirect {
391 handle: Handle,
393 response_allows_reuse: bool,
396 holder: Option<(Sender<io::Result<Vec<u8>>>, LimitRead)>,
398}
399
400impl Bidirect {
401 fn poll_bidirect<S>(
402 &mut self,
403 cx: &mut Context,
404 io: &mut BufIo<S>,
405 ) -> Poll<io::Result<Option<State>>>
406 where
407 S: AsyncRead + AsyncWrite + Unpin,
408 {
409 loop {
410 if self.handle.res_tx.is_none() && self.handle.body_rx.is_none() {
411 break;
412 }
413
414 let mut res_tx_pending = false;
415 let mut body_tx_pending = false;
416
417 if self.handle.res_tx.is_some() {
424 match self.poll_response(cx, io) {
425 Poll::Pending => {
426 res_tx_pending = true;
427 }
428 Poll::Ready(v) => v?,
429 }
430 }
431
432 if self.handle.body_rx.is_some() {
433 match self.poll_send_body(cx, io) {
434 Poll::Pending => {
435 body_tx_pending = true;
436 }
437 Poll::Ready(v) => v?,
438 }
439 }
440
441 if res_tx_pending && (body_tx_pending || self.handle.body_rx.is_none())
442 || body_tx_pending && (res_tx_pending || self.handle.res_tx.is_none())
443 {
444 return Poll::Pending;
445 }
446 }
447
448 let request_allows_reuse =
449 allow_reuse(self.handle.req.headers(), self.handle.req.version());
450
451 let next_state = if let Some(holder) = self.holder.take() {
452 let (body_tx, limit) = holder;
453
454 let cur_read_size = limit.body_size().unwrap_or(8_192).min(MAX_BODY_READ_SIZE) as usize;
455
456 let brec = BodyReceiver {
457 request_allows_reuse,
458 response_allows_reuse: self.response_allows_reuse,
459 cur_read_size,
460 limit,
461 body_tx,
462 };
463
464 Some(State::RecvBody(brec))
465 } else if request_allows_reuse && self.response_allows_reuse {
466 trace!("No response body, reuse connection");
467 Some(State::SendReq(SendReq))
468 } else {
469 trace!("No response body, reuse not allowed");
470 None
471 };
472
473 Ok(next_state).into()
474 }
475
476 fn poll_response<S>(&mut self, cx: &mut Context, io: &mut BufIo<S>) -> Poll<io::Result<()>>
477 where
478 S: AsyncRead + AsyncWrite + Unpin,
479 {
480 let res = ready!(poll_for_crlfcrlf(cx, io, try_parse_res))??;
481
482 let res = res.expect("Parsed partial response");
485
486 self.response_allows_reuse = allow_reuse(res.headers(), res.version());
487
488 let limit = LimitRead::from_headers(res.headers(), true);
489
490 let status = res.status();
497 let is_no_body = limit.is_no_body()
498 || self.handle.req.method() == http::Method::HEAD
499 || status.is_informational()
500 || status == http::StatusCode::NO_CONTENT
501 || status == http::StatusCode::NOT_MODIFIED
502 || status.is_redirection() && !headers_indicate_body(res.headers());
506
507 let (body_tx, body_rx) = Receiver::new(1);
511
512 self.holder = if is_no_body {
515 None
516 } else {
517 Some((body_tx, limit))
518 };
519
520 let recv = RecvStream::new(body_rx, is_no_body, None);
521
522 let (parts, _) = res.into_parts();
523 let res = http::Response::from_parts(parts, recv);
524
525 let res_tx = self.handle.res_tx.take().expect("Missing res_tx");
527
528 if !res_tx.send(Ok(res)) {
529 trace!("Failed to send http::Response to ResponseFuture");
533 }
534
535 Ok(()).into()
536 }
537
538 fn poll_send_body<S>(&mut self, cx: &mut Context, io: &mut BufIo<S>) -> Poll<io::Result<()>>
539 where
540 S: AsyncRead + AsyncWrite + Unpin,
541 {
542 let body_rx = self.handle.body_rx.as_ref().unwrap();
543
544 let (chunk, end) = match ready!(Pin::new(body_rx).poll_recv(cx, true)) {
545 Some(v) => v,
546 None => {
547 return Err(io::Error::new(
548 io::ErrorKind::Other,
549 "SendStream dropped before sending entire body",
550 ))
551 .into();
552 }
553 };
554
555 assert!(io.can_poll_write());
557
558 let mut to_send = Some(&chunk[..]);
559
560 if end {
561 self.handle.body_rx = None;
564 }
565
566 match Pin::new(io).poll_write_all(cx, &mut to_send, end) {
567 Poll::Pending => {
568 assert!(to_send.is_none());
570 return Poll::Pending;
571 }
572 Poll::Ready(v) => v?,
573 }
574
575 Ok(()).into()
576 }
577}
578
579struct BodyReceiver {
580 request_allows_reuse: bool,
581 response_allows_reuse: bool,
582 cur_read_size: usize,
583 limit: LimitRead,
584 body_tx: Sender<io::Result<Vec<u8>>>,
585}
586
587impl BodyReceiver {
588 fn poll_read_body<S>(
589 &mut self,
590 cx: &mut Context,
591 io: &mut BufIo<S>,
592 ) -> Poll<io::Result<Option<State>>>
593 where
594 S: AsyncRead + AsyncWrite + Unpin,
595 {
596 loop {
597 if self.limit.is_complete() {
598 break;
599 }
600
601 if !ready!(Pin::new(&self.body_tx).poll_ready(cx, true)) {
602 }
604
605 let mut buf = FastBuf::with_capacity(self.cur_read_size);
606
607 let mut read_into = buf.borrow();
608
609 let amount = ready!(self.limit.poll_read(cx, io, &mut read_into))?;
610
611 if amount > 0 {
612 unsafe {
614 read_into.extend(amount);
615 }
616
617 if !self.body_tx.send(Ok(buf.into_vec())) {
618 }
620 } else if !self.limit.is_complete() {
621 trace!("Close because read body is not complete");
634 const EOF: io::ErrorKind = io::ErrorKind::UnexpectedEof;
635 return Err(io::Error::new(EOF, "Partial body")).into();
636 }
637 }
638
639 let next_state = if self.request_allows_reuse
640 && self.response_allows_reuse
641 && self.limit.is_reusable()
642 {
643 trace!("Reuse connection");
644 Some(State::SendReq(SendReq))
645 } else {
646 trace!("Connection is not reusable");
647 None
648 };
649
650 Ok(next_state).into()
651 }
652}
653
654impl fmt::Debug for State {
655 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
656 match self {
657 State::SendReq(_) => write!(f, "SendReq"),
658 State::RecvRes(_) => write!(f, "RecvRes"),
659 State::RecvBody(_) => write!(f, "RecvBody"),
660 }
661 }
662}
663
664impl fmt::Debug for SendRequest {
665 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
666 write!(f, "SendRequest")
667 }
668}
669
670impl fmt::Debug for ResponseFuture {
671 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
672 write!(f, "ResponseFuture")
673 }
674}
675
676impl<S> fmt::Debug for Connection<S> {
677 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
678 write!(f, "Connection")
679 }
680}