use std::time::{Duration, Instant};
use fastly_shared::{FastlyStatus, INVALID_REQUEST_PROMISE_HANDLE};
use fastly_sys::{fastly_http_downstream::NextRequestOptionsMask, BodyHandle, RequestHandle};
use crate::compute_runtime::heap_memory_snapshot_mib;
use crate::{Request, Response};
pub trait HandlerResult {
type Error;
fn send(self) -> Result<(), Self::Error>;
}
impl HandlerResult for Response {
type Error = std::convert::Infallible;
fn send(self) -> Result<(), Self::Error> {
self.send_to_client();
Ok(())
}
}
impl HandlerResult for () {
type Error = std::convert::Infallible;
fn send(self) -> Result<(), Self::Error> {
Ok(())
}
}
impl<E> HandlerResult for Result<Response, E>
where
E: std::fmt::Display,
{
type Error = E;
fn send(self) -> Result<(), Self::Error> {
match self {
Ok(resp) => {
resp.send_to_client();
Ok(())
}
Err(e) => {
send_internal_server_err(&e);
Err(e)
}
}
}
}
impl<E> HandlerResult for Result<(), E>
where
E: std::fmt::Display,
{
type Error = E;
fn send(self) -> Result<(), Self::Error> {
self.inspect_err(send_internal_server_err)
}
}
fn send_internal_server_err<E: std::fmt::Display>(e: &E) {
Response::from_body(e.to_string())
.with_status(crate::http::StatusCode::INTERNAL_SERVER_ERROR)
.send_to_client();
}
pub struct ServeSummary<E> {
error: Option<E>,
requests: usize,
time_wait: Duration,
time_handler: Duration,
}
impl<E> ServeSummary<E> {
fn new() -> Self {
Self {
error: None,
requests: 0,
time_wait: Duration::ZERO,
time_handler: Duration::ZERO,
}
}
pub fn requests(&self) -> usize {
self.requests
}
pub fn time_handler(&self) -> Duration {
self.time_handler
}
pub fn time_waited(&self) -> Duration {
self.time_wait
}
pub fn error(&self) -> Option<&E> {
self.error.as_ref()
}
pub fn into_result(self) -> Result<(), E> {
self.error.map_or(Ok(()), Err)
}
fn record_handler(&mut self, started: Instant) {
self.time_handler += started.elapsed();
}
fn record_wait(&mut self, started: Instant) {
self.time_wait += started.elapsed();
}
}
pub struct Serve {
created: Instant,
max_lifetime: Duration,
max_memory: u32,
max_requests: usize,
timeout: Duration,
}
impl Serve {
pub fn new() -> Self {
Self {
created: Instant::now(),
max_lifetime: Duration::MAX,
max_memory: 0,
max_requests: 0,
timeout: Duration::MAX,
}
}
pub fn with_max_lifetime(mut self, limit: Duration) -> Self {
self.max_lifetime = limit;
self
}
pub fn with_max_memory(mut self, mib: u32) -> Self {
self.max_memory = mib;
self
}
pub fn with_max_requests(mut self, limit: usize) -> Self {
self.max_requests = limit;
self
}
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.timeout = timeout;
self
}
pub fn run<F, MaybeResult>(self, mut handler: F) -> ServeSummary<MaybeResult::Error>
where
F: FnMut(Request) -> MaybeResult,
MaybeResult: HandlerResult,
{
self.run_with_context(move |r, ()| handler(r), &mut ())
}
pub fn run_with_context<F, C, MaybeResult>(
self,
mut handler: F,
cx: &mut C,
) -> ServeSummary<MaybeResult::Error>
where
F: FnMut(Request, &mut C) -> MaybeResult,
MaybeResult: HandlerResult,
{
let options = NextRequestOptions::default().with_timeout(self.timeout);
let mut summary = ServeSummary::new();
let mut req = Request::from_client();
loop {
summary.requests += 1;
let start_handler = Instant::now();
let resp = handler(req, cx);
summary.record_handler(start_handler);
if let Err(e) = resp.send() {
summary.error = Some(e);
break;
}
if self.max_requests > 0 && summary.requests >= self.max_requests {
break;
}
if self.created.elapsed() >= self.max_lifetime {
break;
}
if self.max_memory > 0 {
let usage = heap_memory_snapshot_mib().unwrap_or(u32::MAX);
if usage > self.max_memory {
break;
}
}
let start_wait = Instant::now();
let next = match RequestPromise::new(&options) {
Ok(p) => p,
Err(e) => {
panic!("failed to register promise for receiving another request: {e}");
}
};
let res = next.wait();
summary.record_wait(start_wait);
if let Ok(r) = res {
req = r;
} else {
break;
}
}
summary
}
}
impl Default for Serve {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug)]
struct RequestPromiseHandle(fastly_sys::RequestPromiseHandle);
impl Default for RequestPromiseHandle {
fn default() -> Self {
RequestPromiseHandle(INVALID_REQUEST_PROMISE_HANDLE)
}
}
impl From<fastly_sys::RequestPromiseHandle> for RequestPromiseHandle {
fn from(value: fastly_sys::RequestPromiseHandle) -> Self {
RequestPromiseHandle(value)
}
}
impl Drop for RequestPromiseHandle {
fn drop(&mut self) {
if self.0 == INVALID_REQUEST_PROMISE_HANDLE {
return;
}
let status = unsafe { fastly_sys::fastly_http_downstream::next_request_abandon(self.0) };
if status != FastlyStatus::OK {
panic!("failed to abandon request promise: {status:?}");
}
self.0 = INVALID_REQUEST_PROMISE_HANDLE;
}
}
#[derive(Debug)]
pub struct RequestPromise {
handle: RequestPromiseHandle,
}
impl TryFrom<RequestPromiseHandle> for RequestPromise {
type Error = FastlyStatus;
fn try_from(handle: RequestPromiseHandle) -> Result<Self, Self::Error> {
if handle.0 == INVALID_REQUEST_PROMISE_HANDLE {
Err(FastlyStatus::BADF)
} else {
Ok(RequestPromise { handle })
}
}
}
#[derive(Default)]
pub struct NextRequestOptions {
timeout: Option<Duration>,
}
impl NextRequestOptions {
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.timeout = Some(timeout);
self
}
pub fn from_timeout(timeout: Duration) -> Self {
Self {
timeout: Some(timeout),
}
}
}
impl From<&NextRequestOptions>
for (
fastly_sys::fastly_http_downstream::NextRequestOptionsMask,
fastly_sys::fastly_http_downstream::NextRequestOptions,
)
{
fn from(value: &NextRequestOptions) -> Self {
let mut options = fastly_sys::fastly_http_downstream::NextRequestOptions::default();
let mut mask = fastly_sys::fastly_http_downstream::NextRequestOptionsMask::default();
if let Some(timeout) = value.timeout {
options.timeout_ms = timeout.as_millis().try_into().unwrap_or(u64::MAX);
mask |= NextRequestOptionsMask::TIMEOUT;
}
(mask, options)
}
}
#[derive(Debug, thiserror::Error)]
pub enum NextRequestError {
#[error("no request available (yet)")]
Timeout(RequestPromise),
#[error("current response is not yet finished")]
OutstandingResponse,
#[error("no future requests available for this instance")]
EndOfSession,
#[error("error while waiting for next request: {0:?}")]
Other(FastlyStatus),
}
impl RequestPromise {
pub fn new(reuse_settings: &NextRequestOptions) -> Result<RequestPromise, NextRequestError> {
let (options_mask, options) = reuse_settings.into();
let mut handle = RequestPromiseHandle::default();
let status = unsafe {
fastly_sys::fastly_http_downstream::next_request(
options_mask,
&options as *const fastly_sys::fastly_http_downstream::NextRequestOptions,
&mut handle.0,
)
};
if status == FastlyStatus::OK {
Ok(handle.try_into().unwrap())
} else {
Err(NextRequestError::Other(status))
}
}
pub fn wait_timeout(self, timeout: Duration) -> Result<Request, NextRequestError> {
let raw = self.handle.0;
if timeout.is_zero() {
let mut ready = 0u32;
let status = unsafe { fastly_sys::fastly_async_io::is_ready(raw, &mut ready) };
match (status, ready != 0) {
(e, true) if e.is_ok() => (),
(e, false) if e.is_ok() => return Err(NextRequestError::Timeout(self)),
(e, _) => return Err(NextRequestError::Other(e)),
}
} else {
let millis = timeout.as_millis().try_into().unwrap_or(u32::MAX).max(1);
let raw_handles = [raw];
let mut done_index = 0u32;
let status = unsafe {
fastly_sys::fastly_async_io::select(
&raw_handles as *const u32,
raw_handles.len(),
millis,
&mut done_index as *mut u32,
)
};
match status {
FastlyStatus::OK if done_index == 0 => (),
FastlyStatus::OK => return Err(NextRequestError::Timeout(self)),
e => return Err(NextRequestError::Other(e)),
}
};
self.wait()
}
pub fn wait(self) -> Result<Request, NextRequestError> {
let RequestPromise { mut handle } = self;
let raw = handle.0;
handle.0 = INVALID_REQUEST_PROMISE_HANDLE;
let mut req_handle = RequestHandle::default();
let mut body_handle = BodyHandle::default();
let status = unsafe {
fastly_sys::fastly_http_downstream::next_request_wait(
raw,
&mut req_handle,
&mut body_handle,
)
};
match status {
FastlyStatus::OK => Ok(Request::from_client_handles(req_handle, body_handle)),
FastlyStatus::UNSUPPORTED => Err(NextRequestError::OutstandingResponse),
FastlyStatus::NONE => Err(NextRequestError::EndOfSession),
e => Err(NextRequestError::Other(e)),
}
}
}