hreq_h1/
client.rs

1//! Client implementation of the HTTP/1.1 protocol.
2//!
3//! The client connection is split into two parts, one [`Connection`], which
4//! encapsulates the actual transport, and a [`SendRequest`] which is used
5//! to send (multiple) requests over the connection.
6//!
7//! # Example
8//!
9//! ```rust, no_run
10//! use hreq_h1::client;
11//! use std::error::Error;
12//! use async_std::net::TcpStream;
13//! use http::Request;
14//!
15//! #[async_std::main]
16//! async fn main() -> Result<(), Box<dyn Error>> {
17//!   // Establish TCP connection to the server.
18//!   let tcp = TcpStream::connect("127.0.0.1:5928").await?;
19//!
20//!   // h1 is the API handle to send requests
21//!   let (mut h1, connection) = client::handshake(tcp);
22//!
23//!   // Drive the connection independently of the API handle
24//!   async_std::task::spawn(async move {
25//!     if let Err(e) = connection.await {
26//!       println!("Connection closed: {:?}", e);
27//!     }
28//!   });
29//!
30//!   // POST request to. Note that body is sent below.
31//!   let req = Request::post("http://myspecial.server/recv")
32//!     .body(())?;
33//!
34//!   let (res, mut send_body) = h1.send_request(req, false)?;
35//!
36//!   send_body.send_data(b"This is the request body data", true).await?;
37//!
38//!   let (head, mut body) = res.await?.into_parts();
39//!
40//!   println!("Received response: {:?}", head);
41//!
42//!   // Read response body into this buffer.
43//!   let mut buf = [0_u8; 1024];
44//!   loop {
45//!      let amount = body.read(&mut buf).await?;
46//!
47//!      println!("RX: {:?}", &buf[0..amount]);
48//!
49//!      if amount == 0 {
50//!        break;
51//!      }
52//!   }
53//!
54//!   Ok(())
55//! }
56//! ```
57//!
58//! [`Connection`]: struct.Connection.html
59//! [`SendRequest`]: struct.SendRequest.html
60
61use crate::buf_reader::BufIo;
62use crate::err_closed;
63use crate::fast_buf::FastBuf;
64use crate::http11::{poll_for_crlfcrlf, try_parse_res, write_http1x_req, READ_BUF_INIT_SIZE};
65use crate::limit::{allow_reuse, headers_indicate_body};
66use crate::limit::{LimitRead, LimitWrite};
67use crate::mpsc::{Receiver, Sender};
68use crate::Error;
69use crate::{AsyncRead, AsyncWrite};
70use crate::{RecvStream, SendStream};
71use futures_util::ready;
72use std::fmt;
73use std::future::Future;
74use std::io;
75use std::pin::Pin;
76use std::task::{Context, Poll};
77
78/// Buffer size when writing a request.
79const MAX_REQUEST_SIZE: usize = 8192;
80
81/// Max buffer size when reading a body.
82const MAX_BODY_READ_SIZE: u64 = 8 * 1024 * 1024;
83
84/// Creates a new HTTP/1 client backed by some async `io` connection.
85///
86/// Returns a handle to send requests and a connection tuple. The connection
87/// is a future that must be polled to "drive" the client forward.
88///
89/// See [module level doc](index.html) for an example.
90pub fn handshake<S>(io: S) -> (SendRequest, Connection<S>)
91where
92    S: AsyncRead + AsyncWrite + Unpin,
93{
94    let (req_tx, req_rx) = Receiver::new(100);
95
96    let send_req = SendRequest::new(req_tx);
97
98    let conn = Connection(Codec::new(io, req_rx));
99
100    (send_req, conn)
101}
102
103/// Sender of new requests.
104///
105/// See [module level doc](index.html) for an example.
106#[derive(Clone)]
107pub struct SendRequest {
108    req_tx: Sender<Handle>,
109}
110
111impl SendRequest {
112    fn new(req_tx: Sender<Handle>) -> Self {
113        SendRequest { req_tx }
114    }
115
116    /// Send a new request.
117    ///
118    /// The nature of HTTP/1 means only one request can be sent at a time (no multiplexing).
119    /// Each request sent before the next has finished will be queued.
120    ///
121    /// The `no_body` argument indiciates there is no body to be sent. The returned `SendStream`
122    /// will not accept data if `no_body` is true.
123    ///
124    /// Errors if the connection is closed.
125    pub fn send_request(
126        &mut self,
127        req: http::Request<()>,
128        no_body: bool,
129    ) -> Result<(ResponseFuture, SendStream), Error> {
130        if req.method() == http::Method::CONNECT {
131            return Err(Error::User("hreq-h1 does not support CONNECT".into()));
132        }
133
134        trace!("Send request: {:?}", req);
135
136        // Channel to send response back.
137        let (res_tx, res_rx) = Receiver::new(1);
138
139        // bounded so we provide backpressure if socket is full.
140        let (body_tx, body_rx) = Receiver::new(1);
141
142        let limit = LimitWrite::from_headers(req.headers());
143
144        let no_send_body = no_body || limit.is_no_body();
145
146        // Don't provide an body_rx if headers or no_body flag indicates there is no body.
147        let body_rx = if no_send_body { None } else { Some(body_rx) };
148
149        // The handle for the codec/connection.
150        let next = Handle {
151            req,
152            body_rx,
153            res_tx: Some(res_tx),
154        };
155
156        if !self.req_tx.send(next) {
157            // errors on full or closed, and since it's unbound...
158            return err_closed("Can't enqueue request, connection is closed");
159        }
160
161        let fut = ResponseFuture(res_rx);
162        let send = SendStream::new(body_tx, limit, no_send_body, None);
163
164        Ok((fut, send))
165    }
166}
167
168/// Holder of all details for a new request.
169///
170/// This internally communicates with the `Connection`.
171struct Handle {
172    req: http::Request<()>,
173    body_rx: Option<Receiver<(Vec<u8>, bool)>>,
174    res_tx: Option<Sender<io::Result<http::Response<RecvStream>>>>,
175}
176
177/// Future for a `http::Response<RecvStream>>`
178pub struct ResponseFuture(Receiver<io::Result<http::Response<RecvStream>>>);
179
180impl Future for ResponseFuture {
181    type Output = Result<http::Response<RecvStream>, Error>;
182
183    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
184        let this = self.get_mut();
185
186        let res = ready!(Pin::new(&this.0).poll_recv(cx, true));
187
188        if let Some(v) = res {
189            // nested io::Error
190            let v = v?;
191
192            Ok(v).into()
193        } else {
194            err_closed("Response failed, connection is closed").into()
195        }
196    }
197}
198
199/// Future that manages the actual connection. Must be awaited to "drive" the connection.
200///
201/// See [module level doc](index.html) for an example.
202pub struct Connection<S>(Codec<S>);
203
204impl<S> Future for Connection<S>
205where
206    S: AsyncRead + AsyncWrite + Unpin,
207{
208    type Output = io::Result<()>;
209
210    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
211        let this = self.get_mut();
212        this.0.poll_client(cx)
213    }
214}
215
216#[allow(clippy::large_enum_variant)]
217enum State {
218    /// Send next request.
219    SendReq(SendReq),
220    /// Receive response and (if appropriate), send request body.
221    RecvRes(Bidirect),
222    /// Receive response body.
223    RecvBody(BodyReceiver),
224}
225
226impl State {
227    fn try_forward_error(&mut self, e: io::Error) -> io::Error {
228        match self {
229            State::SendReq(_) => e,
230            State::RecvRes(h) => {
231                if let Some(res_tx) = &mut h.handle.res_tx {
232                    let c = clone_error(&e);
233                    res_tx.send(Err(e));
234                    c
235                } else {
236                    e
237                }
238            }
239            State::RecvBody(h) => {
240                let c = clone_error(&e);
241                h.body_tx.send(Err(e));
242                c
243            }
244        }
245    }
246}
247
248fn clone_error(e: &io::Error) -> io::Error {
249    io::Error::new(e.kind(), e.to_string())
250}
251
252struct Codec<S> {
253    io: BufIo<S>,
254    state: State,
255    req_rx: Receiver<Handle>,
256}
257
258impl<S> Codec<S>
259where
260    S: AsyncRead + AsyncWrite + Unpin,
261{
262    fn new(io: S, req_rx: Receiver<Handle>) -> Self {
263        trace!("=> SendReq");
264        Codec {
265            io: BufIo::with_capacity(READ_BUF_INIT_SIZE, io),
266            state: State::SendReq(SendReq),
267            req_rx,
268        }
269    }
270
271    fn poll_client(&mut self, cx: &mut Context) -> Poll<Result<(), io::Error>> {
272        // Any error bubbling up closes the connection.
273        match self.drive(cx) {
274            Poll::Ready(Err(e)) => {
275                debug!("Close on error: {:?}", e);
276
277                // Attempt to forward the error to the client side. This is only
278                // possible in some states. We either get the original or a cloned
279                // error back to bubble up to the connection.
280                let e = self.state.try_forward_error(e);
281
282                trace!("{:?} => Closed", self.state);
283
284                Err(e).into()
285            }
286            r => r,
287        }
288    }
289
290    fn drive(&mut self, cx: &mut Context) -> Poll<Result<(), io::Error>> {
291        loop {
292            ready!(Pin::new(&mut self.io).poll_finish_pending_write(cx))?;
293
294            match &mut self.state {
295                State::SendReq(h) => {
296                    let next_state = ready!(h.poll_send_req(cx, &mut self.io, &self.req_rx))?;
297
298                    if let Some(next_state) = next_state {
299                        trace!("SendReq => {:?}", next_state);
300                        self.state = next_state;
301                    } else {
302                        // No more requests to send
303                        return Ok(()).into();
304                    }
305                }
306                State::RecvRes(h) => {
307                    let next_state = ready!(h.poll_bidirect(cx, &mut self.io))?;
308
309                    if let Some(next_state) = next_state {
310                        trace!("RecvRes => {:?}", next_state);
311                        self.state = next_state;
312                    } else {
313                        // No more requests to send
314                        return Ok(()).into();
315                    }
316                }
317                State::RecvBody(h) => {
318                    let next_state = ready!(h.poll_read_body(cx, &mut self.io))?;
319
320                    if let Some(next_state) = next_state {
321                        trace!("RecvBody => {:?}", next_state);
322                        self.state = next_state;
323                    } else {
324                        // No more requests to send
325                        return Ok(()).into();
326                    }
327                }
328            }
329        }
330    }
331}
332
333struct SendReq;
334
335impl SendReq {
336    fn poll_send_req<S>(
337        &mut self,
338        cx: &mut Context,
339        io: &mut BufIo<S>,
340        req_rx: &Receiver<Handle>,
341    ) -> Poll<io::Result<Option<State>>>
342    where
343        S: AsyncRead + AsyncWrite + Unpin,
344    {
345        let handle = match ready!(Pin::new(req_rx).poll_recv(cx, true)) {
346            Some(v) => v,
347            None => {
348                return Ok(None).into();
349            }
350        };
351
352        let mut buf = FastBuf::with_capacity(MAX_REQUEST_SIZE);
353
354        let mut write_to = buf.borrow();
355
356        let amount = write_http1x_req(&handle.req, &mut write_to)?;
357
358        // If write_http1x_req reports the correct number of bytes written to
359        // the buffer, this extend is correct.
360        unsafe {
361            write_to.extend(amount);
362        }
363
364        // invariant: Can't have any pending bytes to write now.
365        assert!(io.can_poll_write());
366
367        let mut to_send = Some(&buf[..]);
368
369        match Pin::new(io).poll_write_all(cx, &mut to_send, true) {
370            Poll::Pending => {
371                // invariant: BufIo must have taken control of to_send buf.
372                assert!(to_send.is_none());
373                // Fall through do state change. The Pending will be caught
374                // when looping in drive() and doing poll_finish_pending_write.
375            }
376            Poll::Ready(v) => v?,
377        }
378
379        let next_state = State::RecvRes(Bidirect {
380            handle,
381            response_allows_reuse: false, // set later in poll_response()
382            holder: None,
383        });
384
385        Ok(Some(next_state)).into()
386    }
387}
388
389/// State where we both wait for a server response as well as sending a request body.
390struct Bidirect {
391    // The request and means to communicate with the user.
392    handle: Handle,
393    /// Tells whether the response headers/version allows reuse of the connection.
394    /// Set by Bidirect::poll_response() when response is received.
395    response_allows_reuse: bool,
396    /// Holds the received a response whle we are not finished sending the request body.
397    holder: Option<(Sender<io::Result<Vec<u8>>>, LimitRead)>,
398}
399
400impl Bidirect {
401    fn poll_bidirect<S>(
402        &mut self,
403        cx: &mut Context,
404        io: &mut BufIo<S>,
405    ) -> Poll<io::Result<Option<State>>>
406    where
407        S: AsyncRead + AsyncWrite + Unpin,
408    {
409        loop {
410            if self.handle.res_tx.is_none() && self.handle.body_rx.is_none() {
411                break;
412            }
413
414            let mut res_tx_pending = false;
415            let mut body_tx_pending = false;
416
417            // The order of these two polls matter. We can only register one Waker
418            // for this poll. The incoming response might not come before we sent
419            // the entire request body. Sending the request body is also within the
420            // control of the user of the library. poll_send_body needs to be the
421            // latter of these two.
422
423            if self.handle.res_tx.is_some() {
424                match self.poll_response(cx, io) {
425                    Poll::Pending => {
426                        res_tx_pending = true;
427                    }
428                    Poll::Ready(v) => v?,
429                }
430            }
431
432            if self.handle.body_rx.is_some() {
433                match self.poll_send_body(cx, io) {
434                    Poll::Pending => {
435                        body_tx_pending = true;
436                    }
437                    Poll::Ready(v) => v?,
438                }
439            }
440
441            if res_tx_pending && (body_tx_pending || self.handle.body_rx.is_none())
442                || body_tx_pending && (res_tx_pending || self.handle.res_tx.is_none())
443            {
444                return Poll::Pending;
445            }
446        }
447
448        let request_allows_reuse =
449            allow_reuse(self.handle.req.headers(), self.handle.req.version());
450
451        let next_state = if let Some(holder) = self.holder.take() {
452            let (body_tx, limit) = holder;
453
454            let cur_read_size = limit.body_size().unwrap_or(8_192).min(MAX_BODY_READ_SIZE) as usize;
455
456            let brec = BodyReceiver {
457                request_allows_reuse,
458                response_allows_reuse: self.response_allows_reuse,
459                cur_read_size,
460                limit,
461                body_tx,
462            };
463
464            Some(State::RecvBody(brec))
465        } else if request_allows_reuse && self.response_allows_reuse {
466            trace!("No response body, reuse connection");
467            Some(State::SendReq(SendReq))
468        } else {
469            trace!("No response body, reuse not allowed");
470            None
471        };
472
473        Ok(next_state).into()
474    }
475
476    fn poll_response<S>(&mut self, cx: &mut Context, io: &mut BufIo<S>) -> Poll<io::Result<()>>
477    where
478        S: AsyncRead + AsyncWrite + Unpin,
479    {
480        let res = ready!(poll_for_crlfcrlf(cx, io, try_parse_res))??;
481
482        // invariant: poll_for_crlfcrlf should provide a full header and
483        //            try_parse_res should not be able to get a partial response.
484        let res = res.expect("Parsed partial response");
485
486        self.response_allows_reuse = allow_reuse(res.headers(), res.version());
487
488        let limit = LimitRead::from_headers(res.headers(), true);
489
490        // https://tools.ietf.org/html/rfc7230#page-31
491        // Any response to a HEAD request and any response with a 1xx
492        // (Informational), 204 (No Content), or 304 (Not Modified) status
493        // code is always terminated by the first empty line after the
494        // header fields, regardless of the header fields present in the
495        // message, and thus cannot contain a message body.
496        let status = res.status();
497        let is_no_body = limit.is_no_body()
498            || self.handle.req.method() == http::Method::HEAD
499            || status.is_informational()
500            || status == http::StatusCode::NO_CONTENT
501            || status == http::StatusCode::NOT_MODIFIED
502            // 301/302 could have a body. If it does, we expect there to
503            // be some header indicating it. However if there aren't,
504            // we assume there is no body (instead of using ReadToEnd limiter)
505            || status.is_redirection() && !headers_indicate_body(res.headers());
506
507        // TODO: handle CONNECT with a special state where connection becomes a tunnel
508
509        // bounded to have backpressure if client is reading slowly.
510        let (body_tx, body_rx) = Receiver::new(1);
511
512        // If there isn't a body, don't sent a holder. This is picked up in poll_bidirect to know
513        // which state is the next.
514        self.holder = if is_no_body {
515            None
516        } else {
517            Some((body_tx, limit))
518        };
519
520        let recv = RecvStream::new(body_rx, is_no_body, None);
521
522        let (parts, _) = res.into_parts();
523        let res = http::Response::from_parts(parts, recv);
524
525        // Taking the res_tx indicates to poll_bidirect that response is received.
526        let res_tx = self.handle.res_tx.take().expect("Missing res_tx");
527
528        if !res_tx.send(Ok(res)) {
529            // res_tx is unbounded, the only error possible is that the
530            // response future is dropped and client is not interested in response.
531            // This is not an error, we continue to drive the connection.
532            trace!("Failed to send http::Response to ResponseFuture");
533        }
534
535        Ok(()).into()
536    }
537
538    fn poll_send_body<S>(&mut self, cx: &mut Context, io: &mut BufIo<S>) -> Poll<io::Result<()>>
539    where
540        S: AsyncRead + AsyncWrite + Unpin,
541    {
542        let body_rx = self.handle.body_rx.as_ref().unwrap();
543
544        let (chunk, end) = match ready!(Pin::new(body_rx).poll_recv(cx, true)) {
545            Some(v) => v,
546            None => {
547                return Err(io::Error::new(
548                    io::ErrorKind::Other,
549                    "SendStream dropped before sending entire body",
550                ))
551                .into();
552            }
553        };
554
555        // invariant: io must not be blocked now.
556        assert!(io.can_poll_write());
557
558        let mut to_send = Some(&chunk[..]);
559
560        if end {
561            // By removing this we both signal to SendStream that no more body can
562            // be sent, as well as poll_bidirect() that we're done sending body.
563            self.handle.body_rx = None;
564        }
565
566        match Pin::new(io).poll_write_all(cx, &mut to_send, end) {
567            Poll::Pending => {
568                // invariant: BufIo must have taken the buffer
569                assert!(to_send.is_none());
570                return Poll::Pending;
571            }
572            Poll::Ready(v) => v?,
573        }
574
575        Ok(()).into()
576    }
577}
578
579struct BodyReceiver {
580    request_allows_reuse: bool,
581    response_allows_reuse: bool,
582    cur_read_size: usize,
583    limit: LimitRead,
584    body_tx: Sender<io::Result<Vec<u8>>>,
585}
586
587impl BodyReceiver {
588    fn poll_read_body<S>(
589        &mut self,
590        cx: &mut Context,
591        io: &mut BufIo<S>,
592    ) -> Poll<io::Result<Option<State>>>
593    where
594        S: AsyncRead + AsyncWrite + Unpin,
595    {
596        loop {
597            if self.limit.is_complete() {
598                break;
599            }
600
601            if !ready!(Pin::new(&self.body_tx).poll_ready(cx, true)) {
602                // RecvStream is dropped, that's ok we will receive and drop entire body.
603            }
604
605            let mut buf = FastBuf::with_capacity(self.cur_read_size);
606
607            let mut read_into = buf.borrow();
608
609            let amount = ready!(self.limit.poll_read(cx, io, &mut read_into))?;
610
611            if amount > 0 {
612                // If poll_read is correct, the buffer extend is safe.
613                unsafe {
614                    read_into.extend(amount);
615                }
616
617                if !self.body_tx.send(Ok(buf.into_vec())) {
618                    // RecvStream is dropped, that's ok we will receive and drop entire body.
619                }
620            } else if !self.limit.is_complete() {
621                // https://tools.ietf.org/html/rfc7230#page-32
622                // If the sender closes the connection or
623                // the recipient times out before the indicated number of octets are
624                // received, the recipient MUST consider the message to be
625                // incomplete and close the connection.
626                //
627                // https://tools.ietf.org/html/rfc7230#page-33
628                // A client that receives an incomplete response message, which can
629                // occur when a connection is closed prematurely or when decoding a
630                // supposedly chunked transfer coding fails, MUST record the message as
631                // incomplete.
632
633                trace!("Close because read body is not complete");
634                const EOF: io::ErrorKind = io::ErrorKind::UnexpectedEof;
635                return Err(io::Error::new(EOF, "Partial body")).into();
636            }
637        }
638
639        let next_state = if self.request_allows_reuse
640            && self.response_allows_reuse
641            && self.limit.is_reusable()
642        {
643            trace!("Reuse connection");
644            Some(State::SendReq(SendReq))
645        } else {
646            trace!("Connection is not reusable");
647            None
648        };
649
650        Ok(next_state).into()
651    }
652}
653
654impl fmt::Debug for State {
655    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
656        match self {
657            State::SendReq(_) => write!(f, "SendReq"),
658            State::RecvRes(_) => write!(f, "RecvRes"),
659            State::RecvBody(_) => write!(f, "RecvBody"),
660        }
661    }
662}
663
664impl fmt::Debug for SendRequest {
665    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
666        write!(f, "SendRequest")
667    }
668}
669
670impl fmt::Debug for ResponseFuture {
671    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
672        write!(f, "ResponseFuture")
673    }
674}
675
676impl<S> fmt::Debug for Connection<S> {
677    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
678        write!(f, "Connection")
679    }
680}