fastly 0.13.0

Fastly Compute API
Documentation
use super::{SendError, SendErrorCause};
use crate::convert::{ToHeaderName, ToHeaderValue};
use crate::{async_io, Backend, Request, Response};
use fastly_shared::MAX_PENDING_REQS;
use fastly_sys::fastly_http_req::PendingResponseKind;
use std::task::Poll;

pub mod handle;
mod state_machine;
pub use handle::{select_handles, PendingRequestHandle, PollHandleResult};
pub(crate) use state_machine::BackgroundRevalidation;
use state_machine::PendingRequestStateMachine;

/// A handle to a pending asynchronous request returned by [`Request::send_async()`] or
/// [`Request::send_async_streaming()`].
///
/// A handle can be evaluated using [`PendingRequest::poll()`], [`PendingRequest::wait()`], or
/// [`select`]. It can also be discarded if the request was sent for effects it might have, and the
/// response is unimportant.
///
/// Internally, pending requests may advance through multiple asynchronous stages. A readiness event
/// may only indicate that the request can advance to its next stage, not that a final [`Response`]
/// is already available.
pub struct PendingRequest {
    pending: PendingRequestStateMachine,
    finalizer: Finalizer,
}

impl PendingRequest {
    /// Create a new pending request.
    ///
    /// Note that this constructor is *not* exposed in the public interface. Users should never
    /// directly invoke this constructor, and will receive a pending request by calling
    /// [`Request::send_async()`] or [`Request::send_async_streaming()`].
    pub(super) fn new(handle: PendingRequestHandle, backend: Backend, sent_req: Request) -> Self {
        Self {
            pending: PendingRequestStateMachine::new(handle),
            finalizer: Finalizer {
                apply_cache_headers: false,
                backend,
                sent_req,
            },
        }
    }

    /// Create a pending request whose completion is driven by the guest-side caching state machine.
    ///
    /// This is needed to support `after_send` hooks and similar guest-side
    /// caching features within the async `PendingRequest` model.
    pub(super) fn with_guest_caching(
        req: Request,
        backend: Backend,
        sent_req: Request,
    ) -> Result<Self, SendError> {
        let backend_name = backend.name().to_owned();
        let pending = match PendingRequestStateMachine::with_guest_caching(req, &backend_name) {
            Ok(pending) => pending,
            Err(err) => return Err(SendError::new(backend_name, sent_req, err)),
        };

        Ok(Self {
            pending,
            finalizer: Finalizer {
                apply_cache_headers: true,
                backend,
                sent_req,
            },
        })
    }

    fn wait_handle(&self) -> u32 {
        self.pending.wait_handle()
    }

    /// Append a header to the [Response] once it is ready.
    ///
    /// Note that when caching this is applied after the cache has been updated. If you
    /// want to change the headers that are inserted into the cache, then use
    /// [Request::set_after_send].
    pub fn append_response_header(
        &mut self,
        name: impl ToHeaderName,
        value: impl ToHeaderValue,
        target: PendingResponseKind,
    ) {
        self.pending.append_response_header(name, value, target);
    }

    /// Insert a header (replacing any existing values of the same name) on the [Response]
    /// once it is ready.
    ///
    /// Note that when caching this is applied after the cache has been updated. If you
    /// want to change the headers that are inserted into the cache, then use
    /// [Request::set_after_send].
    pub fn set_response_header(
        &mut self,
        name: impl ToHeaderName,
        value: impl ToHeaderValue,
        target: PendingResponseKind,
    ) {
        self.pending.set_response_header(name, value, target);
    }

    /// Remove a header from the [Response] once it is available.
    ///
    /// Note that when caching this is applied after the cache has been updated. If you
    /// want to change the headers that are inserted into the cache, then use
    /// [Request::set_after_send].
    pub fn remove_response_header(&mut self, name: impl ToHeaderName, target: PendingResponseKind) {
        self.pending.remove_response_header(name, target);
    }

    /// Builder-style variant of [PendingRequest::append_response_header].
    ///
    /// Note that this defaults to [PendingResponseKind::Any]. Use
    /// [PendingRequest::append_response_header] to change behaviour.
    pub fn with_response_header(
        mut self,
        name: impl ToHeaderName,
        value: impl ToHeaderValue,
    ) -> Self {
        self.append_response_header(name, value, PendingResponseKind::Any);
        self
    }

    /// Builder-style variant of [PendingRequest::set_response_header].
    ///
    /// Note that this defaults to [PendingResponseKind::Any]. Use
    /// [PendingRequest::set_response_header] to change behaviour.
    pub fn with_set_response_header(
        mut self,
        name: impl ToHeaderName,
        value: impl ToHeaderValue,
    ) -> Self {
        self.set_response_header(name, value, PendingResponseKind::Any);
        self
    }

    /// Builder-style variant of [PendingRequest::remove_response_header].
    ///
    /// Note that this defaults to [PendingResponseKind::Any]. Use
    /// [PendingRequest::remove_response_header] to change behaviour.
    pub fn without_response_header(mut self, name: impl ToHeaderName) -> Self {
        self.remove_response_header(name, PendingResponseKind::Any);
        self
    }

    /// Send the [Response] to the client as soon as any cache updates are resolved.
    ///
    /// If this is a direct pass or hit-for-pass request, and no cache insertions are
    /// necessary, then the [PendingRequest] will continue in the background allowing
    /// the sandbox to terminate early so that its resources may be freed up for other
    /// incoming requests.
    pub fn send_to_client(self) -> Result<(), SendError> {
        self.pending.send_to_client(self.finalizer)
    }

    /// Try to get the result of a pending request without blocking.
    ///
    /// This function returns immediately with a [`PollResult`]; if you want to block until a result
    /// is ready, use [`PendingRequest::wait()`].
    pub fn poll(mut self) -> PollResult {
        match self.pending.poll() {
            Poll::Pending => PollResult::Pending(self),
            Poll::Ready(Ok(resp)) => PollResult::Done(Ok(self.finalizer.into_response(resp))),
            Poll::Ready(Err(err)) => PollResult::Done(Err(self.finalizer.into_error(err))),
        }
    }

    /// Block until the result of a pending request is ready.
    ///
    /// If you want check whether the result is ready without blocking, use
    /// [`PendingRequest::poll()`].
    pub fn wait(mut self) -> Result<Response, SendError> {
        match self.pending.wait() {
            Ok(resp) => Ok(self.finalizer.into_response(resp)),
            Err(err) => Err(self.finalizer.into_error(err)),
        }
    }

    /// Get a reference to the original [`Request`] associated with this pending request.
    ///
    /// Note that the request's original body is already sending, so the returned request does not
    /// have a body.
    pub fn sent_req(&self) -> &Request {
        &self.finalizer.sent_req
    }
}

struct Finalizer {
    // Guest-caching requests attach Fastly cache headers even if they later transition into a
    // plain direct backend request.
    apply_cache_headers: bool,
    // Metadata that will be attached to the [`Response`] once the handle is finished.
    pub(super) backend: Backend,
    // TODO 2024-07-31: this now forces request headers to be copied an extra
    // time, while it used to be ~free. Once async fn support is added and
    // pending request select is no longer present, we should be able to drop
    // this forced clone and let users clone if needed.
    pub(super) sent_req: Request,
}

impl Finalizer {
    /// Once a `Response` is received from the backend, patch it up with
    /// properties from `self` to return to the user.
    fn into_response(self, mut resp: Response) -> Response {
        if self.apply_cache_headers {
            resp = resp.with_fastly_cache_headers(&self.sent_req);
        }
        resp.sent_req = Some(self.sent_req);
        resp.metadata.backend = Some(self.backend);
        resp
    }

    fn into_pending(self, handle: PendingRequestHandle) -> PendingRequestHandle {
        if self.apply_cache_headers {
            handle.with_fastly_cache_miss_headers(&self.sent_req)
        } else {
            handle
        }
    }

    /// Convert a `SendErrorCause` into an appropriate `SendError`.
    fn into_error(self, cause: SendErrorCause) -> SendError {
        SendError::new(self.backend.name(), self.sent_req, cause)
    }
}

/// The result of a call to [`PendingRequest::poll()`].
// Ignoring this clippy lint: changing it would be a breaking change.
#[allow(clippy::large_enum_variant)]
pub enum PollResult {
    /// The request is still in progress, and can be polled again.
    Pending(PendingRequest),
    /// The request has either completed or errored.
    Done(Result<Response, SendError>),
}

/// Given a collection of [`PendingRequest`]s, block until the result of one of the requests is
/// ready.
///
/// This function accepts any type which can become an iterator that yields requests; a common
/// choice is `Vec<PendingRequest>`.
///
/// Returns a tuple `(result, remaining)`, where:
///
/// - `result` is the result of the request that became ready.
///
/// - `remaining` is a vector containing all of the requests that did not become ready. The order of
/// the requests in this vector is not guaranteed to match the order of the requests in the argument
/// collection.
///
/// ### Examples
///
/// **Selecting using the request URI**
///
/// You can use [`Response::get_backend_request()`] to inspect the request that a response came
/// from. This example uses the URL to see which of the two requests finished first:
///
/// ```no_run
/// use fastly::{Error, Request};
/// # fn f() -> Result<(), Error> { // Wrap the example in a function, so we can propagate errors.
///
/// // Send two asynchronous requests, and store the pending requests in a vector.
/// let req1 = Request::get("http://www.origin.org/meow")
///     .send_async("TheOrigin")?;
/// let req2 = Request::get("http://www.origin.org/woof")
///     .send_async("TheOrigin")?;
/// let pending_reqs = vec![req1, req2];
///
/// // Wait for one of the requests to finish.
/// let (resp, _remaining) = fastly::http::request::select(pending_reqs);
///
/// // Return an error if the request was not successful.
/// let resp = resp?;
///
/// // Inspect the response metadata to see which backend this response came from.
/// match resp
///     .get_backend_request()
///     .unwrap()
///     .get_url()
///     .path()
/// {
///     "/meow" => println!("I love cats!"),
///     "/woof" => println!("I love dogs!"),
///     _ => panic!("unexpected result"),
/// }
///
/// # Ok(())
/// # }
/// ```
///
/// **Selecting using the backend name**
///
/// You can also use [`Response::get_backend_name()`] to identify which pending request in the given
/// collection finished. Consider this example, where two requests are sent asynchronously to two
/// different backends:
///
/// ```no_run
/// use fastly::{Error, Request};
/// # fn f() -> Result<(), Error> { // Wrap the example in a function, so we can propagate errors.
///
/// // Send two asynchronous requests, and store the pending requests in a vector.
/// let req1 = Request::get("http://www.origin-1.org/")
///     .send_async("origin1")?;
/// let req2 = Request::get("http://www.origin-2.org/")
///     .send_async("origin2")?;
/// let pending_reqs = vec![req1, req2];
///
/// // Wait for one of the requests to finish.
/// let (resp, _remaining) = fastly::http::request::select(pending_reqs);
///
/// // Return an error if the request was not successful.
/// let resp = resp?;
///
/// // Inspect the response to see which backend this response came from.
/// match resp.get_backend_name().unwrap() {
///     "origin1" => println!("origin 1 responded first!"),
///     "origin2" => println!("origin 2 responded first!"),
///     _ => panic!("unexpected result"),
/// }
///
/// # Ok(())
/// # }
/// ```
///
/// ### Panics
///
/// Panics if the argument collection is empty, or contains more than
/// [`fastly_shared::MAX_PENDING_REQS`] requests.
//
// This function may internally observe readiness for a request that still needs additional
// asynchronous steps before producing a terminal result. In that case, `select` continues waiting
// until one pending request reaches a terminal success or error.
pub fn select<I>(pending_reqs: I) -> (Result<Response, SendError>, Vec<PendingRequest>)
where
    I: IntoIterator<Item = PendingRequest>,
{
    let mut pending_reqs: Vec<PendingRequest> = pending_reqs.into_iter().collect();
    if pending_reqs.is_empty() || pending_reqs.len() > MAX_PENDING_REQS as usize {
        panic!(
            "the number of selected pending requests must be at least 1, and less than {}",
            MAX_PENDING_REQS
        );
    }

    loop {
        let handles = pending_reqs
            .iter()
            .map(PendingRequest::wait_handle)
            .collect::<Vec<_>>();

        // Readiness only guarantees that a request's next host-driven stage can advance. The
        // subsequent `poll()` may still return `Pending` after performing internal guest-side
        // transitions, so we keep looping until one request reaches a terminal state.
        let selected = async_io::select(&handles)
            .expect("fastly_async_io::select failed when waiting on pending requests");
        if selected >= pending_reqs.len() {
            panic!("fastly_async_io::select returned an invalid index");
        }
        let pending = pending_reqs.swap_remove(selected);
        match pending.poll() {
            PollResult::Done(res) => return (res, pending_reqs),
            PollResult::Pending(pending) => pending_reqs.push(pending),
        }
    }
}