hreq-h1 0.3.10

Asynchronous HTTP/1.1 (and 1.0) implementation
Documentation
//! Server implementation of the HTTP/1.1 protocol.
//!
//! # Example
//!
//! # Example
//!
//! ```rust, no_run
//! use hreq_h1::server;
//! use std::error::Error;
//! use async_std::net::TcpListener;
//! use http::{Response, StatusCode};
//!
//! #[async_std::main]
//! async fn main() -> Result<(), Box<dyn Error>> {
//!     let mut listener = TcpListener::bind("127.0.0.1:3000").await?;
//!
//!     // Accept all incoming TCP connections.
//!     loop {
//!         if let Ok((socket, _peer_addr)) = listener.accept().await {
//!
//!             // Spawn a new task to process each connection individually
//!             async_std::task::spawn(async move {
//!                 let mut h1 = server::handshake(socket);
//!
//!                 // Handle incoming requests from this socket, one by one.
//!                 while let Some(request) = h1.accept().await {
//!                     let (req, mut respond) = request.unwrap();
//!
//!                     println!("Receive request: {:?}", req);
//!
//!                     // Build a response with no body, since
//!                     // that is sent later.
//!                     let response = Response::builder()
//!                         .status(StatusCode::OK)
//!                         .body(())
//!                         .unwrap();
//!
//!                     // Send the response back to the client
//!                     let mut send_body = respond
//!                         .send_response(response, false).await.unwrap();
//!
//!                     send_body.send_data(b"Hello world!", true)
//!                         .await.unwrap();
//!                 }
//!             });
//!         }
//!     }
//!
//!    Ok(())
//! }
//!
//!

use crate::buf_reader::BufIo;
use crate::fast_buf::FastBuf;
use crate::http11::{poll_for_crlfcrlf, try_parse_req, write_http1x_res, READ_BUF_INIT_SIZE};
use crate::limit::allow_reuse;
use crate::limit::{LimitRead, LimitWrite};
use crate::mpsc::{Receiver, Sender};
use crate::share::is_closed_kind;
use crate::Error;
use crate::RecvStream;
use crate::SendStream;
use crate::{AsyncRead, AsyncWrite};
use futures_util::future::poll_fn;
use futures_util::ready;
use std::fmt;
use std::io;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};

/// Buffer size when writing a request.
const MAX_RESPONSE_SIZE: usize = 8192;

/// Max buffer size when reading a body.
const MAX_BODY_READ_SIZE: u64 = 8 * 1024 * 1024;

// The state and I/O of the connection is driven by the async calls from the various entities
// involved in accepting and responding to requests.
//
// 1. Connection::accept() drives while there is no current request.
// 2. RecvStream::poll_read() and SendResponse::send_response() while reading a request body and
//    responding to a request.
// 3. SendStream::send_data() while a response body is being sent.

/// "handshake" to create a connection.
///
/// See [module level doc](index.html) for an example.
pub fn handshake<S>(io: S) -> Connection<S>
where
    S: AsyncRead + AsyncWrite + Unpin + 'static,
{
    let inner = Arc::new(Mutex::new(Codec::new(io)));
    let (send, recv) = Receiver::new(1);
    let drive = SyncDriveExternal(Arc::new(Box::new(inner.clone())), send);

    Connection(inner, drive, recv)
}

/// Server connection for accepting incoming requests.
///
/// See [module level doc](index.html) for an example.
pub struct Connection<S>(Arc<Mutex<Codec<S>>>, SyncDriveExternal, Receiver<()>);

impl<S> Connection<S>
where
    S: AsyncRead + AsyncWrite + Unpin,
{
    /// Accept a new incoming request to handle.
    pub async fn accept(
        &mut self,
    ) -> Option<Result<(http::Request<RecvStream>, SendResponse), Error>> {
        poll_fn(|cx| Pin::new(&mut *self).poll_accept(cx))
            .await
            .map(|v| v.map_err(|x| x.into()))
    }

    /// Wait until the connection has sent/flushed all data and is ok to drop.
    pub async fn close(mut self) {
        poll_fn(|cx| Pin::new(&mut self).poll_close(cx)).await;
    }

    fn poll_accept(
        self: Pin<&mut Self>,
        cx: &mut Context,
    ) -> Poll<Option<Result<(http::Request<RecvStream>, SendResponse), io::Error>>> {
        let this = self.get_mut();

        // This will register on previous SyncDriveExternal being dropped.
        ready!(this.1.poll_pending_external(cx, &this.2));

        let drive_external = this.1.clone();

        let mut lock = this.0.lock().unwrap();

        lock.poll_server(cx, Some(drive_external), true)
    }

    fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
        let mut lock = self.0.lock().unwrap();

        // It doesn't matter what the return value is, we just need it to not be pending.
        ready!(lock.poll_server(cx, None, true));

        ().into()
    }
}

/// Handle to send a response and body back for a single request.
///
/// See [module level doc](index.html) for an example.
pub struct SendResponse {
    drive_external: SyncDriveExternal,
    tx_res: Sender<(http::Response<()>, bool, Receiver<(Vec<u8>, bool)>)>,
    req_expects_no_body: bool,
}

impl SendResponse {
    /// Send a response to a request. Notice that the body is sent separately afterwards.
    ///
    /// The lib will infer that there will be no response body if there is a `content-length: 0`
    /// header or a status code that should not have a body (1xx, 204, 304).
    ///
    /// `no_body` is an alternative way, in addition to headers and status, to inform the library
    /// there will be no body to send.
    ///
    /// It's an error to send a body when the status or headers indicate there should not be one.
    pub async fn send_response(
        self,
        response: http::Response<()>,
        no_body: bool,
    ) -> Result<SendStream, Error> {
        trace!("Send response: {:?}", response);

        // bounded to get back pressure
        let (tx_body, rx_body) = Receiver::new(1);

        let limit = LimitWrite::from_headers(response.headers());

        let status = response.status();

        // https://tools.ietf.org/html/rfc7230#page-31
        // any response with a 1xx (Informational), 204 (No Content), or
        // 304 (Not Modified) status code is always terminated by the first
        // empty line after the header fields, regardless of the header fields
        // present in the message, and thus cannot contain a message body.
        let ended = no_body
            || self.req_expects_no_body
            || limit.is_no_body()
            || status.is_informational()
            || status == http::StatusCode::NO_CONTENT
            || status == http::StatusCode::NOT_MODIFIED;

        let drive_external = Some(self.drive_external.clone());

        let send = SendStream::new(tx_body, limit, ended, drive_external);

        if !self.tx_res.send((response, ended, rx_body)) {
            return Err(io::Error::new(io::ErrorKind::Other, "Connection closed").into());
        }

        poll_fn(|cx| self.drive_external.poll_drive_external(cx)).await?;

        Ok(send)
    }
}
pub(crate) struct Codec<S> {
    io: BufIo<S>,
    state: State,
}

impl<S> Codec<S>
where
    S: AsyncRead + AsyncWrite + Unpin,
{
    fn new(io: S) -> Self {
        Codec {
            io: BufIo::with_capacity(READ_BUF_INIT_SIZE, io),
            state: State::RecvReq(RecvReq),
        }
    }
}

impl<S> Codec<S>
where
    S: AsyncRead + AsyncWrite + Unpin,
{
    fn poll_server(
        &mut self,
        cx: &mut Context,
        want_next_req: Option<SyncDriveExternal>,
        register_on_user_input: bool,
    ) -> Poll<Option<Result<(http::Request<RecvStream>, SendResponse), io::Error>>> {
        // Any error bubbling up closes the connection.
        match self.drive(cx, want_next_req, register_on_user_input) {
            Poll::Ready(Some(Err(e))) => {
                debug!("Close on error: {:?}", e);

                trace!("{:?} => Closed", self.state);
                self.state = State::Closed;

                Some(Err(e)).into()
            }
            r => r,
        }
    }

    fn drive(
        &mut self,
        cx: &mut Context,
        want_next_req: Option<SyncDriveExternal>,
        register_on_user_input: bool,
    ) -> Poll<Option<Result<(http::Request<RecvStream>, SendResponse), io::Error>>> {
        loop {
            ready!(Pin::new(&mut self.io).poll_finish_pending_write(cx))?;

            match &mut self.state {
                State::RecvReq(h) => {
                    if let Some(want_next_req) = want_next_req {
                        let (next_req, next_state) =
                            ready!(h.poll_next_req(cx, &mut self.io, want_next_req))?;

                        trace!("RecvReq => {:?}", next_state);
                        self.state = next_state;

                        if let Some(next_req) = next_req {
                            return Some(Ok(next_req)).into();
                        } else {
                            return None.into();
                        }
                    } else {
                        // poll_drive() called with the intention of just driving server state
                        // and not to handle the next read request.
                        return None.into();
                    }
                }
                State::SendRes(h) => {
                    let next_state =
                        ready!(h.poll_bidirect(cx, &mut self.io, register_on_user_input))?;

                    trace!("SendRes => {:?}", next_state);
                    self.state = next_state;
                }
                State::SendBody(h) => {
                    let next_state =
                        ready!(h.poll_send_body(cx, &mut self.io, register_on_user_input))?;

                    trace!("SendBody => {:?}", next_state);
                    self.state = next_state;
                }
                State::Closed => {
                    // Nothing to do
                    return None.into();
                }
            }
        }
    }
}

enum State {
    /// Receive next request.
    RecvReq(RecvReq),
    /// Send response, and (if appropriate) receive request body.
    SendRes(Bidirect),
    /// Send response body.
    SendBody(BodySender),
    /// Closed, error or cleanly.
    Closed,
}

impl fmt::Debug for State {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        match self {
            State::RecvReq(_) => write!(f, "RecvReq"),
            State::SendRes(_) => write!(f, "SendRes"),
            State::SendBody(_) => write!(f, "SendBody"),
            State::Closed => write!(f, "Closed"),
        }
    }
}

/// Waiting for the next request to arrive.
///
/// Reads a buffer for 2 x crlf to know we got an entire request header.
struct RecvReq;

impl RecvReq {
    fn poll_next_req<S>(
        &mut self,
        cx: &mut Context,
        io: &mut BufIo<S>,
        drive_external: SyncDriveExternal,
    ) -> Poll<Result<(Option<(http::Request<RecvStream>, SendResponse)>, State), io::Error>>
    where
        S: AsyncRead + AsyncWrite + Unpin,
    {
        let req = match ready!(poll_for_crlfcrlf(cx, io, try_parse_req)).and_then(|x| x) {
            Ok(v) => v,
            Err(e) => {
                if is_closed_kind(e.kind()) {
                    // remote just hung up before sending request, that's ok.
                    return Ok((None, State::Closed)).into();
                } else {
                    return Err(e).into();
                }
            }
        };

        if req.is_none() {
            return Err(io::Error::new(
                io::ErrorKind::InvalidData,
                "Failed to parse request",
            ))
            .into();
        }
        let req = req.expect("Didn't read full request");

        // Limiter to read the correct body amount from the socket.
        let limit = LimitRead::from_headers(req.headers(), false);

        let request_allows_reuse = allow_reuse(req.headers(), req.version());

        let no_req_body = limit.is_no_body();

        // https://tools.ietf.org/html/rfc7230#page-31
        // Any response to a HEAD request ... is always terminated by the first
        // empty line after the header fields, regardless of the header fields
        // present in the message, and thus cannot contain a message body.
        let req_expects_no_body = req.method() == http::Method::HEAD;

        // bound channel to get backpressure
        let (tx_body, rx_body) = Receiver::new(1);

        let (tx_res, rx_res) = Receiver::new(1);

        // Prepare the new "package" to be delivered out of the poll loop.
        let package = {
            //
            let recv = RecvStream::new(rx_body, no_req_body, Some(drive_external.clone()));

            let (parts, _) = req.into_parts();
            let req = http::Request::from_parts(parts, recv);

            let send = SendResponse {
                drive_external,
                tx_res,
                req_expects_no_body,
            };

            (req, send)
        };

        // Drop tx_body straight away if headers indicate we are not expecting any request body.
        let tx_body = if limit.is_no_body() {
            None
        } else {
            Some(tx_body)
        };

        let cur_read_size = limit.body_size().unwrap_or(8192).min(MAX_BODY_READ_SIZE) as usize;

        let bidirect = Bidirect {
            limit,
            request_allows_reuse,
            tx_body,
            rx_res: Some(rx_res),
            holder: None,
            cur_read_size,
        };

        Ok((Some(package), State::SendRes(bidirect))).into()
    }
}

/// Both receive a request body (if headers indicate it), and
/// send a response which is obtained from the library user.
struct Bidirect {
    // limiter/dechunker for reading incoming request body.
    limit: LimitRead,
    // remember this for when we are to go back into state RecvReq
    request_allows_reuse: bool,
    // send body chunks from socket to this sender.
    tx_body: Option<Sender<io::Result<Vec<u8>>>>,
    // receive a response (once), from this to pass to socket.
    rx_res: Option<Receiver<(http::Response<()>, bool, Receiver<(Vec<u8>, bool)>)>>,
    // Holder of data from rx_res used to receive/write a response body.
    holder: Option<(bool, LimitWrite, Receiver<(Vec<u8>, bool)>)>,
    // The current read buffer size for receving the request body.
    cur_read_size: usize,
}

impl Bidirect {
    fn poll_bidirect<S>(
        &mut self,
        cx: &mut Context,
        io: &mut BufIo<S>,
        register_on_user_input: bool,
    ) -> Poll<Result<State, io::Error>>
    where
        S: AsyncRead + AsyncWrite + Unpin,
    {
        // Alternate between attempting to send a user response and receving more body chunks.
        loop {
            // We keep on looping until both these are None which signals
            // the bidirect state is done.
            if self.rx_res.is_none() && self.tx_body.is_none() {
                break;
            }

            let mut send_resp_pending = false;

            // Handle user sending a response.
            if self.rx_res.is_some() {
                // register_on_user_input means we should register a Waker when polling for a response
                // from the user. We should not register two wakers for the same Context, which means
                // if we get Pending while register_on_user_input is false, we can proceed to also drive IO.
                match self.poll_send_resp(cx, io, register_on_user_input) {
                    Poll::Pending => {
                        send_resp_pending = true;
                    }
                    Poll::Ready(v) => v?,
                }
            }

            if send_resp_pending && (register_on_user_input || self.tx_body.is_none()) {
                // If register_on_user_input:
                // A Waker is registered in mpsc::Receiver::poll_recv.
                // We cannot continue with IO since that would risk
                // registering wakers in multiple places.
                //
                // If self.tx_body.is_none() we can't make progress on
                // IO, and send_resp will not make progress by anything less
                // than user input.

                return Poll::Pending;
            }

            // Read request body from socket and propagate to user.
            if self.tx_body.is_some() {
                ready!(self.poll_read_body(cx, io))?;
            }
        }

        // invariant: we must have the details required in holder.
        let (no_body, limit, rx_body) = self.holder.take().expect("Holder of rx_body");

        let next_state = if no_body || limit.is_no_body() {
            if self.request_allows_reuse {
                trace!("No body to send");
                State::RecvReq(RecvReq)
            } else {
                trace!("Request does not allow reuse");
                State::Closed
            }
        } else {
            State::SendBody(BodySender {
                request_allows_reuse: self.request_allows_reuse,
                rx_body,
            })
        };

        Ok(next_state).into()
    }

    fn poll_send_resp<S>(
        &mut self,
        cx: &mut Context,
        io: &mut BufIo<S>,
        register_on_user_input: bool,
    ) -> Poll<Result<(), io::Error>>
    where
        S: AsyncRead + AsyncWrite + Unpin,
    {
        // We shouldn't be here unless we have rx_res.
        let rx_res = self.rx_res.as_ref().unwrap();

        if let Some((res, end, rx_body)) =
            ready!(Pin::new(rx_res).poll_recv(cx, register_on_user_input))
        {
            // We got a response from the user.

            // Remember things for the next state, SendBody
            let limit = LimitWrite::from_headers(res.headers());
            self.holder = Some((end, limit, rx_body));

            let mut buf = FastBuf::with_capacity(MAX_RESPONSE_SIZE);

            let mut write_to = buf.borrow();

            let amount = write_http1x_res(&res, &mut write_to[..])?;

            // If write_http1x_res reports the correct number of bytes written to
            // the buffer, then this extend is safe.
            unsafe {
                write_to.extend(amount);
            }

            let mut to_send = Some(&buf[..]);

            // invariant: poll_drive deals with pending outgoing io before anything
            //            else. at this point we should not have any pending write io.
            assert!(io.can_poll_write());

            match Pin::new(io).poll_write_all(cx, &mut to_send, true) {
                Poll::Pending => {
                    // invariant: Pending without "taking" all to_send bytes is a fault in BufIo
                    assert!(to_send.is_none());
                }
                Poll::Ready(v) => v?,
            }

            // Remove rx_res since we don't need anything more from it. This makes
            // poll_bidirect() not go into poll_send_resp anymore.
            self.rx_res.take();
        } else {
            // The user dropped the SendResponse instance before sending a response.
            // This is a user fault.
            return Err(Error::User(
                "SendResponse dropped before sending any response".to_string(),
            )
            .into_io())
            .into();
        }

        Ok(()).into()
    }

    fn poll_read_body<S>(
        &mut self,
        cx: &mut Context,
        io: &mut BufIo<S>,
    ) -> Poll<Result<(), io::Error>>
    where
        S: AsyncRead + AsyncWrite + Unpin,
    {
        // We shouldn't be here unless we have tx_body.
        let tx_body = self.tx_body.as_mut().unwrap();

        // Ensure we can send off any incoming read chunk to the user. This makes for flow control.
        if !ready!(Pin::new(&*tx_body).poll_ready(cx, true)) {
            // User has dropped the RecvStream. That's ok, we will just discard
            // the entire incoming body.
        }

        io.ensure_read_capacity(self.cur_read_size);

        let buf = ready!(Pin::new(&mut *io).poll_fill_buf(cx, false))?;

        if buf.is_empty() {
            // End of incoming data before we have fulfilled the LimitRead.
            // configured by the headers.
            return Err(io::Error::new(
                io::ErrorKind::UnexpectedEof,
                "EOF before complete body received",
            ))
            .into();
        }

        let available_bytes = buf.len();

        let chunk = if self.limit.can_read_entire_vec() && io.can_take_read_buf() {
            // This is an optimization. If we're using a content-length and not
            // chunked, we can sometimes take the entire input buffer and therefore
            // avoiding some data copying.

            let chunk = io.take_read_buf();

            // To keep counting the size of the chunks
            self.limit.accept_entire_vec(&chunk);

            chunk
        } else {
            let mut chunk = FastBuf::with_capacity(available_bytes);

            let mut read_into = chunk.borrow();

            let amount = ready!(self.limit.poll_read(cx, io, &mut read_into[..]))?;

            // If poll_read is reporting the correct amount of bytes read into buf,
            // then this extend is safe.
            unsafe {
                read_into.extend(amount);
            }

            chunk.into_vec()
        };

        trace!("Received body chunk len={}", chunk.len());

        if !chunk.is_empty() {
            tx_body.send(Ok(chunk));
        } else if !self.limit.is_complete() {
            // https://tools.ietf.org/html/rfc7230#page-32
            // If the sender closes the connection or
            // the recipient times out before the indicated number of octets are
            // received, the recipient MUST consider the message to be
            // incomplete and close the connection.

            trace!("Close because read body is not complete");
            const EOF: io::ErrorKind = io::ErrorKind::UnexpectedEof;
            return Err(io::Error::new(EOF, "Partial body")).into();
        }

        if self.limit.is_complete() {
            // Remove tx_body Sender which indicates to the RecvStream that there is
            // no more body chunks to come.
            self.tx_body.take();
        }

        Ok(()).into()
    }
}

/// Sender of a response body.
struct BodySender {
    request_allows_reuse: bool,
    rx_body: Receiver<(Vec<u8>, bool)>,
}

impl BodySender {
    fn poll_send_body<S>(
        &mut self,
        cx: &mut Context,
        io: &mut BufIo<S>,
        register_on_user_input: bool,
    ) -> Poll<Result<State, io::Error>>
    where
        S: AsyncRead + AsyncWrite + Unpin,
    {
        // Keep try to send body chunks until we got no more to send or Pending.
        loop {
            // Always abort on Pending, but register_on_user_input controls whether this resulted in
            // any Waker being registered. This makes for flow control.
            let next = ready!(Pin::new(&self.rx_body).poll_recv(cx, register_on_user_input));

            // Pending writes must have been dealt with already at the beginning of poll_drive().
            assert!(io.can_poll_write());

            if let Some((chunk, end)) = next {
                let mut buf = Some(&chunk[..]);

                match Pin::new(&mut *io).poll_write_all(cx, &mut buf, end) {
                    Poll::Pending => {
                        // invariant: The buffer must still been taken by poll_write.
                        assert!(buf.is_none());
                        return Poll::Pending;
                    }
                    Poll::Ready(v) => v?,
                }

                if end {
                    let next_state = if self.request_allows_reuse {
                        trace!("Finished sending body");
                        State::RecvReq(RecvReq)
                    } else {
                        trace!("Request does not allow reuse");
                        State::Closed
                    };

                    return Ok(next_state).into();
                }
            } else {
                // This is a fault, we are expecting more body chunks and
                // the SendStream was dropped.
                warn!("SendStream dropped before sending end_of_body");

                return Err(io::Error::new(
                    io::ErrorKind::Other,
                    "Unexpected end of body",
                ))
                .into();
            }
        }
    }
}

impl<S> std::fmt::Debug for Connection<S> {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        write!(f, "Connection")
    }
}

impl fmt::Debug for SendResponse {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        write!(f, "SendResponse")
    }
}

// These unsafe require some explanation. We want to be able to call Codec<S>::poll_drive
// from both RecvStream and SendStream, however we don't want those two to be
// generic over S. That leads us down the path of dynamic dispatch and "hiding"
// S behind a Box<dyn DriveExternal>. So we implement that trait for Arc<Mutex<Codec<S>>>,
// sorted... but oh not.
//
// If we put Box<dyn DriveExternal> as a property in SendStream, rust will later "discover"
// this when it in an async context like async_std::spawn. Rust will say that DriveExternal
// is not Sync/Send and if we try to constrain it, that will in turn propagate to S, and
// we _don't_ want S to require Sync/Send.
//
// However. We always put S behind Arc<Mutex<Codec<S>>> and our treatment of S is
// absolutely Sync/Send because of that mutex. That leads us to wrapping
// Box<dyn DriveExternal> in some struct we can "unsafe impl Sync" for, and that's
// SyncDriveExternal.
unsafe impl Send for SyncDriveExternal {}
unsafe impl Sync for SyncDriveExternal {}

#[derive(Clone)]
pub(crate) struct SyncDriveExternal(Arc<Box<dyn DriveExternal>>, Sender<()>);

impl SyncDriveExternal {
    // count_external() tells us how many Arc<Box<dyn DriveExternal>> exists.
    // When we are not actively handling a request, this is 1.
    //
    // When a Sender is dropped (inside the cloned Box<dyn DriveExternal> in RecvStream
    // and SendStream), it wakes the Receiver, and we use this as a mechanism to "monitor"
    // when SyncDriveExternal instances are being dropped.
    fn poll_pending_external(&mut self, cx: &mut Context, recv: &Receiver<()>) -> Poll<()> {
        let external = self.count_external();
        trace!("poll_pending_external: {}", external);
        if self.count_external() == 1 {
            trace!("poll_pending_external: Ready");
            ().into()
        } else {
            match Pin::new(recv).poll_recv(cx, true) {
                Poll::Pending => {
                    trace!("poll_pending_external Pending");
                    Poll::Pending
                }
                Poll::Ready(_) => {
                    // invariant: there is always a Sender in MakeDriveExternal, and they
                    // never send anything.
                    unreachable!()
                }
            }
        }
    }

    fn count_external(&self) -> usize {
        Arc::weak_count(&self.0) + Arc::strong_count(&self.0)
    }
}

impl DriveExternal for SyncDriveExternal {
    fn poll_drive_external(&self, cx: &mut Context) -> Poll<Result<(), io::Error>> {
        self.0.poll_drive_external(cx)
    }
}

pub(crate) trait DriveExternal {
    fn poll_drive_external(&self, cx: &mut Context) -> Poll<Result<(), io::Error>>;
}

impl<S> DriveExternal for Arc<Mutex<Codec<S>>>
where
    S: AsyncRead + AsyncWrite + Unpin,
{
    fn poll_drive_external(&self, cx: &mut Context) -> Poll<Result<(), io::Error>> {
        let mut lock = self.lock().unwrap();

        match lock.poll_server(cx, None, false) {
            Poll::Pending => {
                let pending_io = lock.io.pending_rx() || lock.io.pending_tx();

                trace!("pending_io: {}", pending_io);

                // Only propagate Pending if it was due to io. We send register_on_user_input
                // false, which means that reading user input from SendResponse and SendStream
                // will not have registered a Waker. Pending due to IO most propagate as Pending.
                if pending_io {
                    Poll::Pending
                } else {
                    Ok(()).into()
                }
            }
            Poll::Ready(Some(Ok(_))) => {
                // invariant: want_next_req is false, this should not happend.
                unreachable!("Got next request in poll_drive_external");
            }
            // Propagate error
            Poll::Ready(Some(Err(e))) => Err(e).into(),
            //
            Poll::Ready(None) => Ok(()).into(),
        }
    }
}