fastly 0.3.2

Fastly Compute@Edge API
Documentation
pub use fastly_shared::{CacheOverride, FastlyRequestMetadata};

use crate::abi;
use crate::backend::validate_backend;
use crate::body::{Body, BodyHandle};
use crate::error::{anyhow, ensure, Error};
use crate::response::{handles_to_response, ResponseHandle};
use bytes::{BufMut, BytesMut};
use http::header::{HeaderName, HeaderValue};
use http::{Extensions, Method, Request, Response, Uri, Version};
use lazy_static::lazy_static;
use std::convert::{TryFrom, TryInto};
use std::io::Write;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
use std::sync::Mutex;

#[derive(Debug, Eq, Hash, PartialEq)]
#[repr(transparent)]
pub struct RequestHandle {
    handle: u32,
}

lazy_static! {
    pub(crate) static ref GOT_DOWNSTREAM: Mutex<bool> = Mutex::new(false);
}

impl RequestHandle {
    pub const INVALID: Self = RequestHandle {
        handle: fastly_shared::INVALID_REQUEST_HANDLE,
    };

    pub fn is_invalid(&self) -> bool {
        self == &Self::INVALID
    }

    /// Get an owned `RequestHandle` from a borrowed one.
    ///
    /// This should only be used when calling the raw ABI directly.
    pub(crate) unsafe fn handle(&self) -> Self {
        Self {
            handle: self.handle,
        }
    }

    pub fn downstream() -> Result<Self, Error> {
        let mut got = GOT_DOWNSTREAM.lock().unwrap();
        if *got {
            return Err(Error::msg(
                "cannot get more than one handle to the downstream request per execution",
            ));
        }

        let mut handle = RequestHandle::INVALID;
        let status = unsafe { abi::xqd_req_body_downstream_get(&mut handle, std::ptr::null_mut()) };
        if status.is_err() || handle.is_invalid() {
            Err(Error::msg("xqd_req_body_downstream_get failed"))
        } else {
            *got = true;
            Ok(handle)
        }
    }

    pub fn new() -> Result<Self, Error> {
        let mut handle = RequestHandle::INVALID;
        let status = unsafe { abi::xqd_req_new(&mut handle) };
        if status.is_err() || handle.is_invalid() {
            Err(Error::msg("xqd_req_new failed"))
        } else {
            Ok(handle)
        }
    }

    /// Read a request's header names via a buffer of the provided size.
    ///
    /// If there is a header name that is longer than the provided buffer, this will return an error;
    /// you can retry with a larger buffer size if necessary.
    pub fn get_header_names<'a>(
        &'a self,
        buf_size: usize,
    ) -> impl Iterator<Item = Result<HeaderName, Error>> + 'a {
        abi::MultiValueHostcall::new(
            b'\0',
            buf_size,
            move |buf, buf_size, cursor, ending_cursor, nwritten| unsafe {
                abi::xqd_req_header_names_get(
                    self.handle(),
                    buf,
                    buf_size,
                    cursor,
                    ending_cursor,
                    nwritten,
                )
            },
        )
        .map(|res| {
            res.and_then(|name_bytes| {
                HeaderName::from_bytes(&name_bytes)
                    .map_err(|e| anyhow!("invalid header name: {}", e))
            })
        })
    }

    pub fn get_header_values<'a>(
        &'a self,
        name: &'a HeaderName,
        buf_size: usize,
    ) -> impl Iterator<Item = Result<HeaderValue, Error>> + 'a {
        abi::MultiValueHostcall::new(
            b'\0',
            buf_size,
            move |buf, buf_size, cursor, ending_cursor, nwritten| unsafe {
                let name: &[u8] = name.as_ref();
                abi::xqd_req_header_values_get(
                    self.handle(),
                    name.as_ptr(),
                    name.len(),
                    buf,
                    buf_size,
                    cursor,
                    ending_cursor,
                    nwritten,
                )
            },
        )
        .map(|res| {
            res.map(|value_bytes| unsafe {
                // we trust that the hostcall is giving us valid header bytes
                HeaderValue::from_maybe_shared_unchecked(value_bytes)
            })
        })
    }

    pub fn set_header_values<'a, I>(&mut self, name: &HeaderName, values: I) -> Result<(), Error>
    where
        I: IntoIterator<Item = &'a HeaderValue>,
    {
        // build a buffer of all the values, each terminated by a nul byte
        let mut buf = vec![];
        for value in values {
            buf.put(value.as_bytes());
            buf.put_u8(b'\0');
        }

        let name: &[u8] = name.as_ref();
        let status = unsafe {
            abi::xqd_req_header_values_set(
                self.handle(),
                name.as_ptr(),
                name.len(),
                buf.as_ptr(),
                buf.len(),
            )
        };

        if status.is_err() {
            Err(Error::msg("xqd_req_header_values_set failed"))
        } else {
            Ok(())
        }
    }

    pub fn insert_header(&mut self, name: &HeaderName, value: &HeaderValue) -> Result<(), Error> {
        let name_bytes: &[u8] = name.as_ref();
        let value_bytes: &[u8] = value.as_ref();
        let status = unsafe {
            abi::xqd_req_header_insert(
                self.handle(),
                name_bytes.as_ptr(),
                name_bytes.len(),
                value_bytes.as_ptr(),
                value_bytes.len(),
            )
        };
        if status.is_err() {
            Err(Error::msg("xqd_req_header_insert returned error"))
        } else {
            Ok(())
        }
    }

    pub fn append_header(&mut self, name: &HeaderName, value: &HeaderValue) -> Result<(), Error> {
        let name_bytes: &[u8] = name.as_ref();
        let value_bytes: &[u8] = value.as_ref();
        let status = unsafe {
            abi::xqd_req_header_append(
                self.handle(),
                name_bytes.as_ptr(),
                name_bytes.len(),
                value_bytes.as_ptr(),
                value_bytes.len(),
            )
        };
        if status.is_err() {
            Err(Error::msg("xqd_req_header_append returned error"))
        } else {
            Ok(())
        }
    }

    pub fn get_version(&self) -> Result<Version, Error> {
        let mut version = 0;
        let status = unsafe { abi::xqd_req_version_get(self.handle(), &mut version) };
        if status.is_err() {
            Err(Error::msg("xqd_req_version_get failed"))
        } else {
            abi::HttpVersion::try_from(version)
                .map(Into::into)
                .map_err(Error::msg)
        }
    }

    pub fn set_version(&mut self, v: Version) -> Result<(), Error> {
        let status =
            unsafe { abi::xqd_req_version_set(self.handle(), abi::HttpVersion::from(v) as u32) };
        if status.is_err() {
            Err(Error::msg("xqd_req_version_get failed"))
        } else {
            Ok(())
        }
    }

    pub fn get_method(&self, max_length: usize) -> Result<Method, Error> {
        let mut method_bytes = Vec::with_capacity(max_length);
        let mut nwritten = 0;
        let status = unsafe {
            abi::xqd_req_method_get(
                self.handle(),
                method_bytes.as_mut_ptr(),
                method_bytes.capacity(),
                &mut nwritten,
            )
        };
        if status.is_err() {
            return Err(Error::msg("xqd_req_method_get failed"));
        }
        assert!(
            nwritten <= method_bytes.capacity(),
            "xqd_req_method_get wrote too many bytes"
        );
        unsafe {
            method_bytes.set_len(nwritten);
        }
        Method::from_bytes(&method_bytes)
            .map_err(|_| anyhow!("invalid method: {}", String::from_utf8_lossy(&method_bytes)))
    }

    pub fn set_method(&self, method: &Method) -> Result<(), Error> {
        let method_bytes = method.as_str().as_bytes();
        let status = unsafe {
            abi::xqd_req_method_set(self.handle(), method_bytes.as_ptr(), method_bytes.len())
        };
        if status.is_err() {
            Err(Error::msg("xqd_req_method_set failed"))
        } else {
            Ok(())
        }
    }

    pub fn get_uri(&self, max_length: usize) -> Result<Uri, Error> {
        let mut uri_bytes = BytesMut::with_capacity(max_length);
        let mut nwritten = 0;
        let status = unsafe {
            abi::xqd_req_uri_get(
                self.handle(),
                uri_bytes.as_mut_ptr(),
                uri_bytes.capacity(),
                &mut nwritten,
            )
        };
        if status.is_err() {
            return Err(Error::msg("xqd_req_uri_get failed"));
        }
        assert!(
            nwritten <= uri_bytes.capacity(),
            "xqd_req_uri_get wrote too many bytes"
        );
        unsafe {
            uri_bytes.set_len(nwritten);
        }
        Uri::from_maybe_shared(uri_bytes.freeze()).map_err(|e| anyhow!("invalid URI: {}", e))
    }

    pub fn set_uri(&mut self, uri: &Uri) -> Result<(), Error> {
        let uri_bytes = uri.to_string().into_bytes();
        let status =
            unsafe { abi::xqd_req_uri_set(self.handle(), uri_bytes.as_ptr(), uri_bytes.len()) };
        if status.is_err() {
            Err(Error::msg("xqd_req_uri_set failed"))
        } else {
            Ok(())
        }
    }

    pub fn send(
        self,
        body: BodyHandle,
        backend: &str,
    ) -> Result<(ResponseHandle, BodyHandle), Error> {
        let mut resp_handle = ResponseHandle::INVALID;
        let mut resp_body_handle = BodyHandle::INVALID;
        let status = unsafe {
            abi::xqd_req_send(
                self.handle(),
                body.handle(),
                backend.as_ptr(),
                backend.len(),
                &mut resp_handle,
                &mut resp_body_handle,
            )
        };
        if status.is_err() || resp_handle.is_invalid() || resp_body_handle.is_invalid() {
            Err(Error::msg("xqd_req_send failed"))
        } else {
            Ok((resp_handle, resp_body_handle))
        }
    }

    /// Send a request asynchronously via the given backend, returning as soon as the request has
    /// begun sending.
    ///
    /// The resulting [`PendingRequestHandle`](struct.PendingRequestHandle.html) can be evaluated
    /// using [`PendingRequestHandle::poll()`](struct.PendingRequestHandle.html#method.poll),
    /// [`PendingRequestHandle::wait()`](struct.PendingRequestHandle.html#method.wait), or
    /// [`select_handles`](fn.select_handles.html). It can also be discarded if the request was sent
    /// for effects it might have, and the response is unimportant.
    pub fn send_async(
        self,
        body: BodyHandle,
        backend: &str,
    ) -> Result<PendingRequestHandle, Error> {
        let mut pending_req_handle = PendingRequestHandle::INVALID;
        let status = unsafe {
            abi::xqd_req_send_async(
                self.handle(),
                body.handle(),
                backend.as_ptr(),
                backend.len(),
                &mut pending_req_handle,
            )
        };
        if status.is_err() || pending_req_handle.is_invalid() {
            Err(Error::msg("xqd_req_send_async failed"))
        } else {
            Ok(pending_req_handle)
        }
    }

    /// Set the cache override behavior for this request.
    ///
    /// This setting will override any cache directive headers returned in response to this request.
    pub fn set_cache_override(&mut self, cache_override: CacheOverride) {
        let (tag, ttl, swr) = cache_override.to_abi();
        let status = unsafe { abi::xqd_req_cache_override_set(self.handle(), tag, ttl, swr) };
        if status.is_err() {
            panic!("xqd_req_cache_override_set failed");
        }
    }
}

/// Get handles to the downstream request parts and body at the same time.
///
/// This will return an error if either the parts of the body have already been given.
//
// TODO ACF 2020-01-15: try to think up a better name for this
pub fn downstream_request_and_body_handles() -> Result<(RequestHandle, BodyHandle), Error> {
    let mut got_req = crate::request::GOT_DOWNSTREAM.lock().unwrap();
    let mut got_body = crate::body::GOT_DOWNSTREAM.lock().unwrap();
    if *got_req || *got_body {
        return Err(Error::msg(
            "cannot get more than one handle to the downstream request per execution",
        ));
    }

    let mut req_handle = RequestHandle::INVALID;
    let mut body_handle = BodyHandle::INVALID;
    let status = unsafe { abi::xqd_req_body_downstream_get(&mut req_handle, &mut body_handle) };
    if status.is_err() || req_handle.is_invalid() || body_handle.is_invalid() {
        Err(Error::msg("xqd_req_body_downstream_get failed"))
    } else {
        *got_req = true;
        *got_body = true;
        Ok((req_handle, body_handle))
    }
}

/// Get the downstream request.
///
/// This may be called exactly once per execution, otherwise an error is returned.
///
/// Note, this currently copies the entire body of the request into the guest. Future revisions
/// of this API will allow streaming through this interface.
//
// ACF 2019-11-27: this is a standalone function rather than a method on `RequestExt` because it has
// no receiver to help resolve `T: AsRef<[u8]>`
pub fn downstream_request() -> Result<Request<Body>, Error> {
    let (req_handle, body_handle) = downstream_request_and_body_handles()?;

    let mut req = Request::builder()
        .version(req_handle.get_version()?)
        .method(req_handle.get_method(crate::METHOD_MAX_LEN)?)
        .uri(req_handle.get_uri(crate::URI_MAX_LEN)?);

    for name in req_handle.get_header_names(crate::HEADER_NAME_MAX_LEN) {
        let name = name?;
        for value in req_handle.get_header_values(&name, crate::HEADER_VALUE_MAX_LEN) {
            req = req.header(&name, value?);
        }
    }

    Ok(req.body(body_handle.into())?)
}

/// Get the IP address of the downstream client, if it is known.
pub fn downstream_client_ip_addr() -> Option<IpAddr> {
    let mut octets = [0; 16];
    let mut nwritten = 0;

    let status =
        unsafe { abi::xqd_req_downstream_client_ip_addr(octets.as_mut_ptr(), &mut nwritten) };
    if status.is_err() {
        panic!("downstream_client_ip_addr failed");
    }
    match nwritten {
        0 => None,
        4 => {
            let octets: [u8; 4] = octets[0..4]
                .try_into()
                .expect("octets is at least 4 bytes long");
            let addr: Ipv4Addr = octets.into();
            Some(addr.into())
        }
        16 => {
            let addr: Ipv6Addr = octets.into();
            Some(addr.into())
        }
        _ => panic!("downstream_client_ip_addr wrote an unexpected number of bytes"),
    }
}

/// Get the raw bytes sent by the client in the ClientHello message
///
/// https://tools.ietf.org/html/rfc5246#section-7.4.1.2
pub fn downstream_tls_client_hello() -> Result<Vec<u8>, Error> {
    let mut ch_size = 0;
    let status =
        unsafe { abi::xqd_req_downstream_tls_client_hello(std::ptr::null_mut(), 0, &mut ch_size) };
    if status.is_err() && ch_size == 0 {
        panic!("couldn't get the downstream TLS client hello");
    }
    let mut buf = vec![0; ch_size];
    let status = unsafe {
        abi::xqd_req_downstream_tls_client_hello(buf.as_mut_ptr(), buf.len(), &mut ch_size)
    };
    if status.is_err() {
        panic!("couldn't get the downstream TLS cipher protocol");
    }
    Ok(buf)
}

/// Get the cipher suite used to secure the downstream client TLS connection.
///
/// The value returned will be consistent with the [OpenSSL name][openssl-mapping] for the cipher
/// suite, for example:
///
/// ```ignore
/// assert_eq!(downstream_tls_cipher_openssl_name(), "ECDHE-RSA-AES128-GCM-SHA256");
/// ```
///
/// [openssl-mapping]: https://testssl.sh/openssl-iana.mapping.html
pub fn downstream_tls_cipher_openssl_name() -> &'static str {
    lazy_static! {
        static ref OPENSSL_NAME: String = {
            // 3x bigger than any of the cipher suite names in OpenSSL today
            let mut buf = vec![0; 128];
            let mut nwritten = 0;
            let status = unsafe {
                abi::xqd_req_downstream_tls_cipher_openssl_name(
                    buf.as_mut_ptr(),
                    buf.len(),
                    &mut nwritten,
                )
            };
            if status.is_err() {
                panic!("couldn't get the downstream TLS cipher's OpenSSL name");
            }
            buf.truncate(nwritten);
            String::from_utf8(buf).expect("TLS cipher OpenSSL name must be valid UTF-8")
        };
    }

    OPENSSL_NAME.as_str()
}

/// Get the TLS protocol version used to secure the downstream client TLS connection.
///
/// ```ignore
/// assert_eq!(downstream_tls_protocol(), "TLSv1.2");
/// ```
pub fn downstream_tls_protocol() -> &'static str {
    lazy_static! {
        static ref PROTOCOL: String = {
            let mut buf = vec![0; 32];
            let mut nwritten = 0;
            let status = unsafe {
                abi::xqd_req_downstream_tls_protocol(buf.as_mut_ptr(), buf.len(), &mut nwritten)
            };
            if status.is_err() {
                panic!("couldn't get the downstream TLS cipher protocol");
            }
            buf.truncate(nwritten);
            String::from_utf8(buf).expect("TLS protocol must be valid UTF-8")
        };
    }

    PROTOCOL.as_str()
}

/// Returns a request's header names list as originally received, in the original order they
/// were received in.
///
/// If there is a header name that is longer than the provided buffer, this will return an error;
/// you can retry with a larger buffer size if necessary.
pub fn downstream_original_header_names_with_len(
    buf_size: usize,
) -> impl Iterator<Item = Result<String, Error>> {
    abi::MultiValueHostcall::new(
        b'\0',
        buf_size,
        move |buf, buf_size, cursor, ending_cursor, nwritten| unsafe {
            abi::xqd_req_original_header_names_get(buf, buf_size, cursor, ending_cursor, nwritten)
        },
    )
    .map(|res| {
        res.and_then(|name_bytes| {
            String::from_utf8(name_bytes.to_vec())
                .map_err(|e| anyhow!("invalid header name: {}", e))
        })
    })
}

pub fn downstream_original_header_names() -> impl Iterator<Item = Result<String, Error>> {
    downstream_original_header_names_with_len(crate::HEADER_NAME_MAX_LEN)
}

pub trait RequestExt: Sized {
    /// Send a request via the given backend, returning as soon as the headers are available.
    fn send(self, backend: &str) -> Result<Response<Body>, Error> {
        self.inner_to_body()?.send(backend)
    }

    /// Send a request asynchronously via the given backend, returning as soon as the request has
    /// begun sending.
    ///
    /// The resulting [`PendingRequest`](struct.PendingRequest.html) can be evaluated using
    /// [`PendingRequest::poll()`](struct.PendingRequest.html#method.poll),
    /// [`PendingRequest::wait()`](struct.PendingRequest.html#method.wait), or
    /// [`select`](fn.select.html). It can also be discarded if the request was sent for effects it
    /// might have, and the response is unimportant.
    fn send_async(self, backend: &str) -> Result<PendingRequest, Error> {
        self.inner_to_body()?.send_async(backend)
    }

    /// Replace the body of a request with a `Body` with the same contents.
    //
    // TODO ACF 2020-01-15: better name?
    fn inner_to_body(self) -> Result<Request<Body>, Error>;

    /// Replace the body of a request with the remaining contents of its body.
    ///
    /// Note that this will involve copying and buffering the body, and so should only be used for
    /// convenience on small request bodies.
    fn inner_to_bytes(self) -> Result<Request<Vec<u8>>, Error>;

    /// Get a reference to the Fastly-specific metadata associated with this request.
    fn fastly_metadata(&self) -> &FastlyRequestMetadata;

    /// Get a mutable reference to the Fastly-specific metadata associated with this request.
    fn fastly_metadata_mut(&mut self) -> &mut FastlyRequestMetadata;

    /// Set the cache override behavior for this request.
    ///
    /// This will override the behavior defined by the backend response headers, as well as any
    /// previous override calls on this request (e.g., `set_pass()`, `set_ttl()`).
    fn set_cache_override(&mut self, cache_override: CacheOverride) {
        self.fastly_metadata_mut().cache_override = cache_override;
    }

    /// Get the cache override behavior for this request.
    fn get_cache_override(&self) -> CacheOverride {
        self.fastly_metadata().cache_override
    }

    /// Set the cache override behavior for this request to Pass.
    ///
    /// This will override the behavior defined by the backend response headers, as well as any
    /// previous calls on this request to set the TTL or `stale-while-revalidate` time.
    fn set_pass(&mut self) {
        self.fastly_metadata_mut().cache_override.set_pass();
    }

    /// Set the cache override behavior for this request to use given TTL.
    ///
    /// This option can be combined with a `stale-while-revalidate` override.
    ///
    /// This will override the behavior defined by the backend response headers, as well as any
    /// previous calls on this request to set Pass.
    fn set_ttl(&mut self, ttl: u32) {
        self.fastly_metadata_mut().cache_override.set_ttl(ttl);
    }

    /// Set the cache override behavior for this request to use given `stale-while-revalidate` time.
    ///
    /// This option can be combined with a TTL override.
    ///
    /// This will override the behavior defined by the backend response headers, as well as any
    /// previous calls on this request to set Pass.
    fn set_stale_while_revalidate(&mut self, swr: u32) {
        self.fastly_metadata_mut()
            .cache_override
            .set_stale_while_revalidate(swr);
    }
}

fn request_to_handles(req: Request<Body>) -> Result<(RequestHandle, BodyHandle), Error> {
    let mut req_handle = RequestHandle::new()?;

    req_handle.set_version(req.version())?;
    req_handle.set_method(req.method())?;
    req_handle.set_uri(req.uri())?;

    for name in req.headers().keys() {
        req_handle.set_header_values(name, req.headers().get_all(name))?;
    }

    let md = req.fastly_metadata();
    req_handle.set_cache_override(md.cache_override);

    Ok((req_handle, req.into_body().into_handle()?))
}

/// Check whether a request looks suitable for sending to a backend.
///
/// Note that this is *not* meant to be a filter for things that could cause security issues, it is
/// only meant to catch errors before the hostcalls do in order to yield friendlier error messages.
fn validate_request(req: &Request<Body>, backend: &str) -> Result<(), Error> {
    validate_backend(backend)?;
    ensure!(
        req.uri().scheme().is_some() && req.uri().authority().is_some(),
        "request URIs must have a scheme (http/https) and an authority (host)"
    );
    Ok(())
}

fn get_or_default_metadata(exts: &Extensions) -> &FastlyRequestMetadata {
    const DEFAULT: FastlyRequestMetadata = FastlyRequestMetadata::default();

    if let Some(md) = exts.get::<FastlyRequestMetadata>() {
        md
    } else {
        &DEFAULT
    }
}

fn get_or_insert_metadata(exts: &mut Extensions) -> &mut FastlyRequestMetadata {
    if exts.get::<FastlyRequestMetadata>().is_none() {
        exts.insert(FastlyRequestMetadata::default());
    }
    exts.get_mut::<FastlyRequestMetadata>().unwrap()
}

/// Fastly-specific extensions to [`http::Request`][Request].
///
/// [Request]: https://docs.rs/http/latest/http/request/struct.Request.html
impl RequestExt for Request<Body> {
    fn send(self, backend: &str) -> Result<Response<Body>, Error> {
        validate_request(&self, backend)?;

        let (req_handle, req_body_handle) = request_to_handles(self)?;

        let (resp_handle, resp_body_handle) = req_handle.send(req_body_handle, backend)?;

        handles_to_response(resp_handle, resp_body_handle)
    }

    fn send_async(self, backend: &str) -> Result<PendingRequest, Error> {
        validate_request(&self, backend)?;

        let (req_handle, req_body_handle) = request_to_handles(self)?;

        let pending_req_handle = req_handle.send_async(req_body_handle, backend)?;

        Ok(pending_req_handle.into())
    }

    fn inner_to_body(self) -> Result<Request<Body>, Error> {
        Ok(self)
    }

    fn inner_to_bytes(self) -> Result<Request<Vec<u8>>, Error> {
        let (parts, body) = self.into_parts();
        Ok(Request::from_parts(parts, body.into_bytes()?))
    }

    fn fastly_metadata(&self) -> &FastlyRequestMetadata {
        get_or_default_metadata(self.extensions())
    }

    fn fastly_metadata_mut(&mut self) -> &mut FastlyRequestMetadata {
        get_or_insert_metadata(self.extensions_mut())
    }
}

impl RequestExt for Request<&[u8]> {
    fn inner_to_body(self) -> Result<Request<Body>, Error> {
        let mut body = Body::new()?;
        body.write_all(self.body())?;
        Ok(self.map(|_| body))
    }

    fn inner_to_bytes(self) -> Result<Request<Vec<u8>>, Error> {
        Ok(self.map(|b| b.to_vec()))
    }

    fn fastly_metadata(&self) -> &FastlyRequestMetadata {
        get_or_default_metadata(self.extensions())
    }

    fn fastly_metadata_mut(&mut self) -> &mut FastlyRequestMetadata {
        get_or_insert_metadata(self.extensions_mut())
    }
}

impl RequestExt for Request<Vec<u8>> {
    fn inner_to_body(self) -> Result<Request<Body>, Error> {
        let mut body = Body::new()?;
        body.write_all(self.body())?;
        Ok(self.map(|_| body))
    }

    fn inner_to_bytes(self) -> Result<Request<Vec<u8>>, Error> {
        Ok(self)
    }

    fn fastly_metadata(&self) -> &FastlyRequestMetadata {
        get_or_default_metadata(self.extensions())
    }

    fn fastly_metadata_mut(&mut self) -> &mut FastlyRequestMetadata {
        get_or_insert_metadata(self.extensions_mut())
    }
}

impl RequestExt for Request<&str> {
    fn inner_to_body(self) -> Result<Request<Body>, Error> {
        let mut body = Body::new()?;
        body.write_all(self.body().as_bytes())?;
        Ok(self.map(|_| body))
    }

    fn inner_to_bytes(self) -> Result<Request<Vec<u8>>, Error> {
        Ok(self.map(|b| b.as_bytes().to_vec()))
    }

    fn fastly_metadata(&self) -> &FastlyRequestMetadata {
        get_or_default_metadata(self.extensions())
    }

    fn fastly_metadata_mut(&mut self) -> &mut FastlyRequestMetadata {
        get_or_insert_metadata(self.extensions_mut())
    }
}

impl RequestExt for Request<String> {
    fn inner_to_body(self) -> Result<Request<Body>, Error> {
        let mut body = Body::new()?;
        body.write_all(self.body().as_bytes())?;
        Ok(self.map(|_| body))
    }

    fn inner_to_bytes(self) -> Result<Request<Vec<u8>>, Error> {
        Ok(self.map(|b| b.into_bytes()))
    }

    fn fastly_metadata(&self) -> &FastlyRequestMetadata {
        get_or_default_metadata(self.extensions())
    }

    fn fastly_metadata_mut(&mut self) -> &mut FastlyRequestMetadata {
        get_or_insert_metadata(self.extensions_mut())
    }
}

impl RequestExt for Request<()> {
    fn inner_to_body(self) -> Result<Request<Body>, Error> {
        let body = Body::new()?;
        Ok(self.map(|_| body))
    }

    fn inner_to_bytes(self) -> Result<Request<Vec<u8>>, Error> {
        Ok(self.map(|_| vec![]))
    }

    fn fastly_metadata(&self) -> &FastlyRequestMetadata {
        get_or_default_metadata(self.extensions())
    }

    fn fastly_metadata_mut(&mut self) -> &mut FastlyRequestMetadata {
        get_or_insert_metadata(self.extensions_mut())
    }
}

/// Fastly-specific extensions to [`http::request::Builder`][Builder].
///
/// [Builder]: https://docs.rs/http/latest/http/request/struct.Builder.html
pub trait RequestBuilderExt: Sized {
    /// Get a reference to the Fastly-specific metadata associated with this request.
    ///
    /// Returns `None` if this builder has an error.
    fn fastly_metadata_ref(&self) -> Option<&FastlyRequestMetadata>;

    /// Get a mutable reference to the Fastly-specific metadata associated with this request.
    ///
    /// Returns `None` if this builder has an error.
    fn fastly_metadata_mut(&mut self) -> Option<&mut FastlyRequestMetadata>;

    /// Set the cache override behavior for this request.
    ///
    /// This will override the behavior defined by the backend response headers, as well as any
    /// previous override calls on this builder (e.g., `pass()`, `ttl()`).
    fn cache_override(mut self, cache_override: CacheOverride) -> Self {
        self.fastly_metadata_mut()
            .map(|md| md.cache_override = cache_override);
        self
    }

    /// Get a reference to the cache override behavior for this request.
    ///
    /// Returns `None` if this builder has an error.
    fn cache_override_ref(&self) -> Option<&CacheOverride> {
        self.fastly_metadata_ref().map(|md| &md.cache_override)
    }

    /// Get a mutable reference to the cache override behavior for this request.
    ///
    /// Returns `None` if this builder has an error.
    fn cache_override_mut(&mut self) -> Option<&mut CacheOverride> {
        self.fastly_metadata_mut().map(|md| &mut md.cache_override)
    }

    /// Set the cache override behavior for this request to Pass.
    ///
    /// This will override the behavior defined by the backend response headers, as well as any
    /// previous calls on this builder to set the TTL or `stale-while-revalidate` time.
    fn pass(mut self) -> Self {
        self.cache_override_mut().map(|co| co.set_pass());
        self
    }

    /// Set the cache override behavior for this request to use given TTL.
    ///
    /// This option can be combined with a `stale-while-revalidate` override.
    ///
    /// This will override the behavior defined by the backend response headers, as well as any
    /// previous calls on this builder to set Pass.
    fn ttl(mut self, ttl: u32) -> Self {
        self.cache_override_mut().map(|co| co.set_ttl(ttl));
        self
    }

    /// Set the cache override behavior for this request to use given `stale-while-revalidate` time.
    ///
    /// This option can be combined with a TTL override.
    ///
    /// This will override the behavior defined by the backend response headers, as well as any
    /// previous calls on this builder to set Pass.
    fn stale_while_revalidate(mut self, swr: u32) -> Self {
        self.cache_override_mut()
            .map(|co| co.set_stale_while_revalidate(swr));
        self
    }
}

impl RequestBuilderExt for http::request::Builder {
    fn fastly_metadata_ref(&self) -> Option<&FastlyRequestMetadata> {
        self.extensions_ref().map(get_or_default_metadata)
    }

    fn fastly_metadata_mut(&mut self) -> Option<&mut FastlyRequestMetadata> {
        self.extensions_mut().map(get_or_insert_metadata)
    }
}

/// A handle to a pending asynchronous request returned by
/// [`RequestHandle::send_async()`](struct.RequestHandle.html#method.send_async).
///
/// A handle can be evaluated using
/// [`PendingRequestHandle::poll()`](struct.PendingRequestHandle.html#method.poll),
/// [`PendingRequestHandle::wait()`](struct.PendingRequestHandle.html#method.wait), or
/// [`select_handles`](fn.select_handles.html). It can also be discarded if the request was sent for
/// effects it might have, and the response is unimportant.
#[derive(Debug, Eq, Hash, PartialEq)]
#[repr(transparent)]
pub struct PendingRequestHandle {
    handle: u32,
}

impl From<PendingRequest> for PendingRequestHandle {
    fn from(pr: PendingRequest) -> Self {
        pr.handle
    }
}

impl PendingRequestHandle {
    pub(crate) const INVALID: Self = Self {
        handle: fastly_shared::INVALID_PENDING_REQUEST_HANDLE,
    };

    pub(crate) fn is_invalid(&self) -> bool {
        self == &Self::INVALID
    }

    /// Get an owned `PendingRequestHandle` from a borrowed one.
    ///
    /// This should only be used when calling the raw ABI directly.
    pub(crate) unsafe fn handle(&self) -> Self {
        Self {
            handle: self.handle,
        }
    }

    /// Try to get the result of a pending request without blocking.
    ///
    /// This function returns immediately with a [`PollHandleResult`](enum.PollHandleResult.html);
    /// if you want to block until a result is ready, use
    /// [`PendingRequestHandle::wait()`](struct.PendingRequestHandle.html#method.wait).
    pub fn poll(self) -> PollHandleResult {
        let mut is_done = -1;
        let mut resp_handle = ResponseHandle::INVALID;
        let mut body_handle = BodyHandle::INVALID;
        let status = unsafe {
            abi::xqd_pending_req_poll(
                self.handle(),
                &mut is_done,
                &mut resp_handle,
                &mut body_handle,
            )
        };
        if status.is_err() || is_done < 0 || is_done > 1 {
            // since we are providing the out-pointers, and an owned `PendingRequestHandle` in the
            // guest can only exist if it's present in the host, any error returns from the hostcall
            // would indicate an internal (host) bug
            panic!("xqd_pending_req_poll internal error");
        }
        let is_done = if is_done == 0 { false } else { true };
        if !is_done {
            return PollHandleResult::Pending(self);
        }
        if is_done && (resp_handle.is_invalid() || body_handle.is_invalid()) {
            PollHandleResult::Done(Err(Error::msg("asynchronous request failed")))
        } else {
            PollHandleResult::Done(Ok((resp_handle, body_handle)))
        }
    }

    /// Block until the result of a pending request is ready.
    ///
    /// If you want check whether the result is ready without blocking, use
    /// [`PendingRequestHandle::poll()`](struct.PendingRequestHandle.html#method.poll).
    pub fn wait(self) -> Result<(ResponseHandle, BodyHandle), Error> {
        let mut resp_handle = ResponseHandle::INVALID;
        let mut body_handle = BodyHandle::INVALID;
        let status =
            unsafe { abi::xqd_pending_req_wait(self.handle(), &mut resp_handle, &mut body_handle) };
        if status.is_err() || resp_handle.is_invalid() || body_handle.is_invalid() {
            // TODO ACF 2020-01-24: differentiate between propagating a failed request error, and an
            // internal error?
            return Err(Error::msg("xqd_pending_req_poll failed"));
        }
        Ok((resp_handle, body_handle))
    }
}

/// The result of a call to
/// [`PendingRequestHandle::poll()`](struct.PendingRequestHandle.html#method.poll).
pub enum PollHandleResult {
    /// The request is still in progress, and can be polled again using the given handle.
    Pending(PendingRequestHandle),
    /// The request has either completed or errored.
    Done(Result<(ResponseHandle, BodyHandle), Error>),
}

/// Given a collection of [`PendingRequestHandle`](struct.PendingRequestHandle.html)s, block until
/// the result of one of the handles is ready.
///
/// This function accepts any type which can become an iterator that yields handles; a common choice
/// is `Vec<PendingRequestHandle>`.
///
/// Returns a tuple `(result, index, remaining)`, where:
///
/// - `result` is the result of the handle that became ready.
///
/// - `index` is the index of the handle in the argument collection (e.g., the index of the handle
/// in a vector) that became ready.
///   
/// - `remaining` is a vector containing all of the handles that did not become ready. The order of
/// the handles in this vector is not guaranteed to match the order of the handles in the argument
/// collection.
///
/// ### Panics
///
/// Panics if the argument collection is empty, or contains more than
/// `fastly_shared::MAX_PENDING_REQS` handles.
pub fn select_handles<I>(
    pending_reqs: I,
) -> (
    Result<(ResponseHandle, BodyHandle), Error>,
    usize,
    Vec<PendingRequestHandle>,
)
where
    I: IntoIterator<Item = PendingRequestHandle>,
{
    let mut prs = pending_reqs
        .into_iter()
        .collect::<Vec<PendingRequestHandle>>();
    if prs.is_empty() || prs.len() > fastly_shared::MAX_PENDING_REQS as usize {
        panic!(
            "the number of selected handles must be at least 1, and less than {}",
            fastly_shared::MAX_PENDING_REQS
        );
    }
    let mut done_index = -1;
    let mut resp_handle = ResponseHandle::INVALID;
    let mut body_handle = BodyHandle::INVALID;

    let status = unsafe {
        abi::xqd_pending_req_select(
            prs.as_ptr(),
            prs.len(),
            &mut done_index,
            &mut resp_handle,
            &mut body_handle,
        )
    };

    if status.is_err() || done_index < 0 {
        // since we are providing the out-pointers, and an owned `PendingRequestHandle` in the guest
        // can only exist if it's present in the host, any error returns from the hostcall would
        // indicate an internal (host) bug
        panic!("xqd_pending_req_select internal error");
    }

    let done_index = done_index
        .try_into()
        .expect("xqd_pending_req_select returned an invalid index");

    // quickly remove the completed handle from the set to return
    prs.swap_remove(done_index);

    let res = if resp_handle.is_invalid() || body_handle.is_invalid() {
        Err(Error::msg("selected request failed"))
    } else {
        Ok((resp_handle, body_handle))
    };

    (res, done_index, prs)
}

/// A handle to a pending asynchronous request returned by
/// [`RequestExt::send_async()`](trait.RequestExt.html#tymethod.send_async).
///
/// A handle can be evaluated using
/// [`PendingRequest::poll()`](struct.PendingRequest.html#method.poll),
/// [`PendingRequest::wait()`](struct.PendingRequest.html#method.wait), or
/// [`select`](fn.select.html). It can also be discarded if the request was sent for effects it
/// might have, and the response is unimportant.
pub struct PendingRequest {
    handle: PendingRequestHandle,
}

impl From<PendingRequestHandle> for PendingRequest {
    fn from(handle: PendingRequestHandle) -> Self {
        Self { handle }
    }
}

impl PendingRequest {
    /// Try to get the result of a pending request without blocking.
    ///
    /// This function returns immediately with a [`PollResult`](enum.PollResult.html); if you want
    /// to block until a result is ready, use
    /// [`PendingRequest::wait()`](struct.PendingRequest.html#method.wait).
    pub fn poll(self) -> PollResult {
        match self.handle.poll() {
            PollHandleResult::Pending(prh) => PollResult::Pending(prh.into()),
            PollHandleResult::Done(Ok((resp_handle, resp_body_handle))) => {
                PollResult::Done(handles_to_response(resp_handle, resp_body_handle))
            }
            PollHandleResult::Done(Err(e)) => PollResult::Done(Err(e)),
        }
    }

    /// Block until the result of a pending request is ready.
    ///
    /// If you want check whether the result is ready without blocking, use
    /// [`PendingRequest::poll()`](struct.PendingRequest.html#method.poll).
    pub fn wait(self) -> Result<Response<Body>, Error> {
        let (resp_handle, resp_body_handle) = self.handle.wait()?;
        handles_to_response(resp_handle, resp_body_handle)
    }
}

/// The result of a call to [`PendingRequest::poll()`](struct.PendingRequest.html#method.poll).
pub enum PollResult {
    /// The request is still in progress, and can be polled again.
    Pending(PendingRequest),
    /// The request has either completed or errored.
    Done(Result<Response<Body>, Error>),
}

/// Given a collection of [`PendingRequest`](struct.PendingRequest.html)s, block until the result of
/// one of the requests is ready.
///
/// This function accepts any type which can become an iterator that yields requests; a common
/// choice is `Vec<PendingRequest>`.
///
/// Returns a tuple `(result, index, remaining)`, where:
///
/// - `result` is the result of the request that became ready.
///
/// - `index` is the index of the request in the argument collection (e.g., the index of the request
/// in a vector) that became ready.
///   
/// - `remaining` is a vector containing all of the requests that did not become ready. The order of
/// the requests in this vector is not guaranteed to match the order of the requests in the argument
/// collection.
///
/// ### Panics
///
/// Panics if the argument collection is empty, or contains more than
/// `fastly_shared::MAX_PENDING_REQS` requests.
pub fn select<I>(pending_reqs: I) -> (Result<Response<Body>, Error>, usize, Vec<PendingRequest>)
where
    I: IntoIterator<Item = PendingRequest>,
{
    let (res, done_index, rest) = select_handles(pending_reqs.into_iter().map(Into::into));
    let res = match res {
        Ok((resp_handle, resp_body_handle)) => handles_to_response(resp_handle, resp_body_handle),
        Err(e) => Err(e),
    };
    let rest = rest.into_iter().map(Into::into).collect();
    (res, done_index, rest)
}