use super::{SendError, SendErrorCause};
use crate::convert::{ToHeaderName, ToHeaderValue};
use crate::{async_io, Backend, Request, Response};
use fastly_shared::MAX_PENDING_REQS;
use fastly_sys::fastly_http_req::PendingResponseKind;
use std::task::Poll;
pub mod handle;
mod state_machine;
pub use handle::{select_handles, PendingRequestHandle, PollHandleResult};
pub(crate) use state_machine::BackgroundRevalidation;
use state_machine::PendingRequestStateMachine;
pub struct PendingRequest {
pending: PendingRequestStateMachine,
finalizer: Finalizer,
}
impl PendingRequest {
pub(super) fn new(handle: PendingRequestHandle, backend: Backend, sent_req: Request) -> Self {
Self {
pending: PendingRequestStateMachine::new(handle),
finalizer: Finalizer {
apply_cache_headers: false,
backend,
sent_req,
},
}
}
pub(super) fn with_guest_caching(
req: Request,
backend: Backend,
sent_req: Request,
) -> Result<Self, SendError> {
let backend_name = backend.name().to_owned();
let pending = match PendingRequestStateMachine::with_guest_caching(req, &backend_name) {
Ok(pending) => pending,
Err(err) => return Err(SendError::new(backend_name, sent_req, err)),
};
Ok(Self {
pending,
finalizer: Finalizer {
apply_cache_headers: true,
backend,
sent_req,
},
})
}
fn wait_handle(&self) -> u32 {
self.pending.wait_handle()
}
pub fn append_response_header(
&mut self,
name: impl ToHeaderName,
value: impl ToHeaderValue,
target: PendingResponseKind,
) {
self.pending.append_response_header(name, value, target);
}
pub fn set_response_header(
&mut self,
name: impl ToHeaderName,
value: impl ToHeaderValue,
target: PendingResponseKind,
) {
self.pending.set_response_header(name, value, target);
}
pub fn remove_response_header(&mut self, name: impl ToHeaderName, target: PendingResponseKind) {
self.pending.remove_response_header(name, target);
}
pub fn with_response_header(
mut self,
name: impl ToHeaderName,
value: impl ToHeaderValue,
) -> Self {
self.append_response_header(name, value, PendingResponseKind::Any);
self
}
pub fn with_set_response_header(
mut self,
name: impl ToHeaderName,
value: impl ToHeaderValue,
) -> Self {
self.set_response_header(name, value, PendingResponseKind::Any);
self
}
pub fn without_response_header(mut self, name: impl ToHeaderName) -> Self {
self.remove_response_header(name, PendingResponseKind::Any);
self
}
pub fn send_to_client(self) -> Result<(), SendError> {
self.pending.send_to_client(self.finalizer)
}
pub fn poll(mut self) -> PollResult {
match self.pending.poll() {
Poll::Pending => PollResult::Pending(self),
Poll::Ready(Ok(resp)) => PollResult::Done(Ok(self.finalizer.into_response(resp))),
Poll::Ready(Err(err)) => PollResult::Done(Err(self.finalizer.into_error(err))),
}
}
pub fn wait(mut self) -> Result<Response, SendError> {
match self.pending.wait() {
Ok(resp) => Ok(self.finalizer.into_response(resp)),
Err(err) => Err(self.finalizer.into_error(err)),
}
}
pub fn sent_req(&self) -> &Request {
&self.finalizer.sent_req
}
}
struct Finalizer {
apply_cache_headers: bool,
pub(super) backend: Backend,
pub(super) sent_req: Request,
}
impl Finalizer {
fn into_response(self, mut resp: Response) -> Response {
if self.apply_cache_headers {
resp = resp.with_fastly_cache_headers(&self.sent_req);
}
resp.sent_req = Some(self.sent_req);
resp.metadata.backend = Some(self.backend);
resp
}
fn into_pending(self, handle: PendingRequestHandle) -> PendingRequestHandle {
if self.apply_cache_headers {
handle.with_fastly_cache_miss_headers(&self.sent_req)
} else {
handle
}
}
fn into_error(self, cause: SendErrorCause) -> SendError {
SendError::new(self.backend.name(), self.sent_req, cause)
}
}
#[allow(clippy::large_enum_variant)]
pub enum PollResult {
Pending(PendingRequest),
Done(Result<Response, SendError>),
}
pub fn select<I>(pending_reqs: I) -> (Result<Response, SendError>, Vec<PendingRequest>)
where
I: IntoIterator<Item = PendingRequest>,
{
let mut pending_reqs: Vec<PendingRequest> = pending_reqs.into_iter().collect();
if pending_reqs.is_empty() || pending_reqs.len() > MAX_PENDING_REQS as usize {
panic!(
"the number of selected pending requests must be at least 1, and less than {}",
MAX_PENDING_REQS
);
}
loop {
let handles = pending_reqs
.iter()
.map(PendingRequest::wait_handle)
.collect::<Vec<_>>();
let selected = async_io::select(&handles)
.expect("fastly_async_io::select failed when waiting on pending requests");
if selected >= pending_reqs.len() {
panic!("fastly_async_io::select returned an invalid index");
}
let pending = pending_reqs.swap_remove(selected);
match pending.poll() {
PollResult::Done(res) => return (res, pending_reqs),
PollResult::Pending(pending) => pending_reqs.push(pending),
}
}
}