Skip to main content

fastly/http/
serve.rs

1//! Support for reusable sandboxes.
2//!
3//! Normally, each incoming HTTP request spins up a new Compute instance.
4//! This provides good reproducibility properties... but it means that if an instance has expensive
5//! initialization, that initialization must be repeated for each request.
6//! If a service has an expensive initialization routine, reusable instances can provide an
7//! efficiency improvement (and a cost improvement!)
8//!
9//! ## Timeouts
10//!
11//! A Compute instance is not a general-purpose server; it does not and should not indefinitely
12//! wait for new requests. Therefore, the time spent waiting for a new request is bounded in
13//! three different ways.
14//!
15//! -   At the platform level, there is a maximum time over which a `RequestPromise` can be
16//!     unfulfilled. The timer starts when the `RequestPromise` is created.
17//!     After this timeout, `RequestPromise::wait` returns
18//!     `NextRequestError::EndOfSession`, and the promise is no longer valid.
19//!
20//!     This timeout is not publicly disclosed, and may change.
21//!
22//! -   When a `RequestPromise` is created, the promise-level timeout can be configured
23//!     in `NextRequestOptions::timeout`. The effective promise-level timeout is the minimum of the
24//!     provided value and the platform-level timeout.
25//!
26//! -   Separately from the promise-level timeout, `wait_timeout` can be used to time out a
27//!     particular `wait` call. If `wait_timeout` times out while the promise is still valid,
28//!     `wait_timeout` will return `RequestPromise::Timeout`, indicating the promise can be retried.
29//!
30//! ## Billing
31//!
32//! When a WASM instance waits on a `RequestPromise`, it continues to consume memory and wall-clock
33//! time. The memory is only freed when the WASM instance exits, as usual.
34//!
35//! If your service is billed by WASM memory * wall-clock time, you will be billed for the usage while
36//! waiting for a `RequestPromise`, even if the `RequestPromise` does not resolve to a request
37//! (i.e. times out). You can limit this usage by timing out the `RequestPromise`, as described
38//! above.
39//!
40//! A WASM instance that is blocked waiting on a `RequestPromise` does not consume vCPU time.
41//!
42//! ## Examples
43//!
44//! ```no_run
45//! # use fastly::http::serve::Serve;
46//! # use fastly::{Error, Response, Request};
47//! fn handler(_req: Request) -> Result<Response, Error> {
48//!     Ok(Response::from_body("hello")
49//!         .with_header("hello", "world!")
50//!         .with_status(200))
51//! }
52//!
53//! fn main() -> Result<(), Error> {
54//!     Serve::new()
55//!         .with_max_requests(5)
56//!         .run(handler)
57//!         .into_result()
58//! }
59//! ```
60use std::time::{Duration, Instant};
61
62use fastly_shared::{FastlyStatus, INVALID_REQUEST_PROMISE_HANDLE};
63use fastly_sys::{fastly_http_downstream::NextRequestOptionsMask, BodyHandle, RequestHandle};
64
65use crate::compute_runtime::heap_memory_snapshot_mib;
66use crate::{Request, Response};
67
68/// Trait for supporting more flexible return types for the handler passed to [Serve::run].
69///
70/// The types that this is implemented for by default are:
71///
72/// - `Response`, for a handler that always generates a response.
73/// - `()`, for a handler that has already called either [Response::send_to_client] or
74///   [Response::stream_to_client].
75/// - `Result<(), E>`, for a handler that has already called either [Response::send_to_client] or
76///   [Response::stream_to_client], but might fail with an error of type `E` along the way.
77/// - `Result<Response, E>`, for a handle the might generate a response to send or an error.
78///
79/// For the `Result` implementations, a return value containing an error will attempt to send
80/// a 500 response status with a body containing the [ToString::to_string] representation of the
81/// error, in the same way that [fastly::main][crate::main] does.
82pub trait HandlerResult {
83    /// The error type generated by the handler.
84    type Error;
85
86    /// Perform any remaining work to send this response downstream.
87    fn send(self) -> Result<(), Self::Error>;
88}
89
90impl HandlerResult for Response {
91    type Error = std::convert::Infallible;
92
93    fn send(self) -> Result<(), Self::Error> {
94        self.send_to_client();
95        Ok(())
96    }
97}
98
99impl HandlerResult for () {
100    type Error = std::convert::Infallible;
101
102    fn send(self) -> Result<(), Self::Error> {
103        Ok(())
104    }
105}
106
107impl<E> HandlerResult for Result<Response, E>
108where
109    E: std::fmt::Display,
110{
111    type Error = E;
112
113    fn send(self) -> Result<(), Self::Error> {
114        match self {
115            Ok(resp) => {
116                resp.send_to_client();
117                Ok(())
118            }
119            Err(e) => {
120                send_internal_server_err(&e);
121                Err(e)
122            }
123        }
124    }
125}
126
127impl<E> HandlerResult for Result<(), E>
128where
129    E: std::fmt::Display,
130{
131    type Error = E;
132
133    fn send(self) -> Result<(), Self::Error> {
134        self.inspect_err(send_internal_server_err)
135    }
136}
137
138fn send_internal_server_err<E: std::fmt::Display>(e: &E) {
139    Response::from_body(e.to_string())
140        .with_status(crate::http::StatusCode::INTERNAL_SERVER_ERROR)
141        .send_to_client();
142}
143
144/// A summary of information from running [Serve].
145pub struct ServeSummary<E> {
146    error: Option<E>,
147    requests: usize,
148    time_wait: Duration,
149    time_handler: Duration,
150}
151
152impl<E> ServeSummary<E> {
153    fn new() -> Self {
154        Self {
155            error: None,
156            requests: 0,
157            time_wait: Duration::ZERO,
158            time_handler: Duration::ZERO,
159        }
160    }
161
162    /// Returns how many requests were processed by [Serve].
163    pub fn requests(&self) -> usize {
164        self.requests
165    }
166
167    /// Returns the total amount of time that [Serve] spent running the handler callback.
168    pub fn time_handler(&self) -> Duration {
169        self.time_handler
170    }
171
172    /// Returns the total amount of time that [Serve] spent waiting for another request.
173    pub fn time_waited(&self) -> Duration {
174        self.time_wait
175    }
176
177    /// Inspect the error returned by the handler that caused [Serve] to stop.
178    pub fn error(&self) -> Option<&E> {
179        self.error.as_ref()
180    }
181
182    /// Convert this summary into a [Result] containing the handler error that stopped [Serve].
183    pub fn into_result(self) -> Result<(), E> {
184        self.error.map_or(Ok(()), Err)
185    }
186
187    fn record_handler(&mut self, started: Instant) {
188        self.time_handler += started.elapsed();
189    }
190
191    fn record_wait(&mut self, started: Instant) {
192        self.time_wait += started.elapsed();
193    }
194}
195
196/// Support for processing multiple requests from a single sandbox.
197pub struct Serve {
198    created: Instant,
199    max_lifetime: Duration,
200    max_memory: u32,
201    max_requests: usize,
202    timeout: Duration,
203}
204
205impl Serve {
206    /// Prepare a new [Serve] instance for handling requests in this sandbox.
207    pub fn new() -> Self {
208        Self {
209            created: Instant::now(),
210            max_lifetime: Duration::MAX,
211            max_memory: 0,
212            max_requests: 0,
213            timeout: Duration::MAX,
214        }
215    }
216
217    /// Configure when to stop accepting additional requests.
218    ///
219    /// Defaults to [Duration::MAX].
220    pub fn with_max_lifetime(mut self, limit: Duration) -> Self {
221        self.max_lifetime = limit;
222        self
223    }
224
225    /// Configure the maximum amount of memory (in mebibytes) to allow this instance
226    /// to use before stopping, as reported by [heap_memory_snapshot_mib].
227    ///
228    /// A `limit` of `0` (the default) will be treated as unlimited.
229    pub fn with_max_memory(mut self, mib: u32) -> Self {
230        self.max_memory = mib;
231        self
232    }
233
234    /// Configure the maximum number of requests to serve before stopping.
235    ///
236    /// A `limit` of `0` (the default) will be treated as unlimited.
237    pub fn with_max_requests(mut self, limit: usize) -> Self {
238        self.max_requests = limit;
239        self
240    }
241
242    /// Configure how long to wait for another request before stopping.
243    ///
244    /// Defaults to [Duration::MAX].
245    pub fn with_timeout(mut self, timeout: Duration) -> Self {
246        self.timeout = timeout;
247        self
248    }
249
250    /// Begin serving requests in this sandbox.
251    pub fn run<F, MaybeResult>(self, mut handler: F) -> ServeSummary<MaybeResult::Error>
252    where
253        F: FnMut(Request) -> MaybeResult,
254        MaybeResult: HandlerResult,
255    {
256        self.run_with_context(move |r, ()| handler(r), &mut ())
257    }
258
259    /// Begin serving requests in this sandbox, but with a contextual argument passed
260    /// to every invocation of the handler callback.
261    ///
262    /// The callback should return a type that implements [HandlerResult], such as:
263    ///
264    /// - `Response`, if it always returns a response.
265    /// - `Result<Response, E>`, if it might fail with an error of type `E` instead of generating a response
266    pub fn run_with_context<F, C, MaybeResult>(
267        self,
268        mut handler: F,
269        cx: &mut C,
270    ) -> ServeSummary<MaybeResult::Error>
271    where
272        F: FnMut(Request, &mut C) -> MaybeResult,
273        MaybeResult: HandlerResult,
274    {
275        let options = NextRequestOptions::default().with_timeout(self.timeout);
276        let mut summary = ServeSummary::new();
277        let mut req = Request::from_client();
278
279        loop {
280            summary.requests += 1;
281
282            let start_handler = Instant::now();
283            let resp = handler(req, cx);
284            summary.record_handler(start_handler);
285
286            if let Err(e) = resp.send() {
287                summary.error = Some(e);
288                break;
289            }
290
291            // Check if we've hit the user-defined request limit, and stop if so:
292            if self.max_requests > 0 && summary.requests >= self.max_requests {
293                break;
294            }
295
296            // Check if we've hit the user-defined lifetime limit, and stop if so:
297            if self.created.elapsed() >= self.max_lifetime {
298                break;
299            }
300
301            // Check if we've hit the user-defined memory limit, and stop if so. If
302            // we are unable to read our memory usage, error on the side of caution
303            // and assume that it's too large for us to continue.
304            if self.max_memory > 0 {
305                let usage = heap_memory_snapshot_mib().unwrap_or(u32::MAX);
306
307                if usage > self.max_memory {
308                    break;
309                }
310            }
311
312            let start_wait = Instant::now();
313
314            let next = match RequestPromise::new(&options) {
315                Ok(p) => p,
316                Err(e) => {
317                    // Failed to register a promise, which should never happen unless
318                    // there is already an existing handle! Something has clearly gone
319                    // wrong in the program, so panic.
320                    panic!("failed to register promise for receiving another request: {e}");
321                }
322            };
323
324            let res = next.wait();
325            summary.record_wait(start_wait);
326
327            if let Ok(r) = res {
328                req = r;
329            } else {
330                break;
331            }
332        }
333
334        summary
335    }
336}
337
338impl Default for Serve {
339    fn default() -> Self {
340        Self::new()
341    }
342}
343
344#[derive(Debug)]
345struct RequestPromiseHandle(fastly_sys::RequestPromiseHandle);
346
347impl Default for RequestPromiseHandle {
348    fn default() -> Self {
349        RequestPromiseHandle(INVALID_REQUEST_PROMISE_HANDLE)
350    }
351}
352
353impl From<fastly_sys::RequestPromiseHandle> for RequestPromiseHandle {
354    fn from(value: fastly_sys::RequestPromiseHandle) -> Self {
355        RequestPromiseHandle(value)
356    }
357}
358
359impl Drop for RequestPromiseHandle {
360    fn drop(&mut self) {
361        if self.0 == INVALID_REQUEST_PROMISE_HANDLE {
362            // Already invalidated.
363            return;
364        }
365        let status = unsafe { fastly_sys::fastly_http_downstream::next_request_abandon(self.0) };
366        if status != FastlyStatus::OK {
367            panic!("failed to abandon request promise: {status:?}");
368        }
369        self.0 = INVALID_REQUEST_PROMISE_HANDLE;
370    }
371}
372
373/// A promise of a future Request from a customer.
374///
375/// When .wait() is called, this will resolve to either:
376/// - a `Request`, if an additional user request has been assigned to this instance
377/// - an `Error`, if:
378///   - The current `Request` has not been completed
379///   - A limit has been reached (number of requests per instance, timeout waiting for the next
380///   request)
381#[derive(Debug)]
382pub struct RequestPromise {
383    handle: RequestPromiseHandle,
384}
385
386impl TryFrom<RequestPromiseHandle> for RequestPromise {
387    type Error = FastlyStatus;
388
389    fn try_from(handle: RequestPromiseHandle) -> Result<Self, Self::Error> {
390        if handle.0 == INVALID_REQUEST_PROMISE_HANDLE {
391            Err(FastlyStatus::BADF)
392        } else {
393            Ok(RequestPromise { handle })
394        }
395    }
396}
397
398/// Settings to use when waiting for a second (third, fourth, ...) request.
399#[derive(Default)]
400pub struct NextRequestOptions {
401    /// How long the promise will wait for a new request.
402    ///
403    /// This timer starts at the creation of the RequestPromise.
404    /// If you want to bound how long a given .wait call lasts, use .wait_timeout.
405    timeout: Option<Duration>,
406}
407
408impl NextRequestOptions {
409    /// Set a bound on how long a RequestPromise can be outstanding.
410    ///
411    /// This timer starts at the creation of the RequestPromise.
412    /// After the timeout, if not request is ready, the RequestPromise will report
413    /// [NextRequestError::EndOfSession].
414    ///
415    /// If you want to bound how long a given .wait call lasts, use .wait_timeout.
416    pub fn with_timeout(mut self, timeout: Duration) -> Self {
417        self.timeout = Some(timeout);
418        self
419    }
420
421    /// Shorthand for `default().with_timeout(timeout)`;
422    /// see [NextRequestOptions::with_timeout].
423    pub fn from_timeout(timeout: Duration) -> Self {
424        Self {
425            timeout: Some(timeout),
426        }
427    }
428}
429
430impl From<&NextRequestOptions>
431    for (
432        fastly_sys::fastly_http_downstream::NextRequestOptionsMask,
433        fastly_sys::fastly_http_downstream::NextRequestOptions,
434    )
435{
436    fn from(value: &NextRequestOptions) -> Self {
437        let mut options = fastly_sys::fastly_http_downstream::NextRequestOptions::default();
438        let mut mask = fastly_sys::fastly_http_downstream::NextRequestOptionsMask::default();
439
440        if let Some(timeout) = value.timeout {
441            options.timeout_ms = timeout.as_millis().try_into().unwrap_or(u64::MAX);
442            mask |= NextRequestOptionsMask::TIMEOUT;
443        }
444
445        (mask, options)
446    }
447}
448
449/// Errors that can occur when asking for or resolving a RequestPromise.
450#[derive(Debug, thiserror::Error)]
451pub enum NextRequestError {
452    /// No request was available, but one may become available in the future.
453    ///
454    /// Note that the RequestPromise is still valid, and may provide a request in the
455    /// future. If you no longer intend to accept an additional request, drop the RequestPromise
456    /// promptly after receiving the timeout.
457    #[error("no request available (yet)")]
458    Timeout(RequestPromise),
459
460    /// No request is available because the response for the current request has not completed.
461    ///
462    /// The response for the current request must be completed before a promise can be fulfilled.
463    // How can you have any pudding if you don't eat your meat?
464    #[error("current response is not yet finished")]
465    OutstandingResponse,
466
467    /// No more requests will be delivered to this sandbox.
468    ///
469    /// This can occur if the RequestPromise has exceeded its overall timeout or if another limit
470    /// has been reached.
471    #[error("no future requests available for this instance")]
472    EndOfSession,
473
474    /// Any other error.
475    #[error("error while waiting for next request: {0:?}")]
476    Other(FastlyStatus),
477}
478
479impl RequestPromise {
480    /// Create a new promise for receiving a subsequent downstream request.
481    pub fn new(reuse_settings: &NextRequestOptions) -> Result<RequestPromise, NextRequestError> {
482        let (options_mask, options) = reuse_settings.into();
483        let mut handle = RequestPromiseHandle::default();
484        let status = unsafe {
485            fastly_sys::fastly_http_downstream::next_request(
486                options_mask,
487                &options as *const fastly_sys::fastly_http_downstream::NextRequestOptions,
488                &mut handle.0,
489            )
490        };
491        if status == FastlyStatus::OK {
492            Ok(handle.try_into().unwrap())
493        } else {
494            Err(NextRequestError::Other(status))
495        }
496    }
497
498    /// Wait for the next request, up to the timeout provided here.
499    ///
500    /// The timeout here represents how long this method waits, and not whether the handle
501    /// has become invalid. If a timeout of zero is provided, this acts as a poll-once, and
502    /// returns immediately with [NextRequestError::Timeout] if there is not a request ready.
503    ///
504    /// Note that if the timeout is reached, this instance is still registered to receive a future
505    /// request; the RequestPromise can be recovered from the [NextRequestError::Timeout]
506    /// and another `wait_timeout` or `wait` call may be attempted on it.
507    ///
508    /// If this instance needs to shut down, dropping the [RequestPromise] or
509    /// [NextRequestError] will abandon the promise and avoid queueing future requests for
510    /// the session.
511    ///
512    /// Note that the timeout has an unspecified minimum resolution.
513    /// If a nonzero timeout smaller than the minimum resolution is provided, the time will be
514    /// rounded up to this resolution.
515    pub fn wait_timeout(self, timeout: Duration) -> Result<Request, NextRequestError> {
516        // "borrow" the handle without deconstructing it;
517        // we are just checking its readiness.
518        let raw = self.handle.0;
519
520        // async_io::select uses "timeout of zero" to mean "wait indefinitely".
521        // To provide "timeout at 0" behavior to the callers here,
522        // we use `is_ready` instead of `select` if the timeout is zero.
523        if timeout.is_zero() {
524            let mut ready = 0u32;
525            let status = unsafe { fastly_sys::fastly_async_io::is_ready(raw, &mut ready) };
526
527            match (status, ready != 0) {
528                // No errors, reported ready
529                (e, true) if e.is_ok() => (),
530                // No errors, reported nonready: it's OK to try again with this handle
531                (e, false) if e.is_ok() => return Err(NextRequestError::Timeout(self)),
532                // Some other error: pass it through.
533                // Note that this will drop the RequestPromise. The types reflect this, preventing
534                // re-use.
535                (e, _) => return Err(NextRequestError::Other(e)),
536            }
537        } else {
538            // If asked for a sub-millisecond time, round up to 1ms for the _select_ call.
539            let millis = timeout.as_millis().try_into().unwrap_or(u32::MAX).max(1);
540
541            let raw_handles = [raw];
542            let mut done_index = 0u32;
543
544            let status = unsafe {
545                fastly_sys::fastly_async_io::select(
546                    &raw_handles as *const u32,
547                    raw_handles.len(),
548                    millis,
549                    &mut done_index as *mut u32,
550                )
551            };
552
553            match status {
554                FastlyStatus::OK if done_index == 0 => (),
555                FastlyStatus::OK => return Err(NextRequestError::Timeout(self)),
556                e => return Err(NextRequestError::Other(e)),
557            }
558        };
559
560        // We have an indicator that the request is ready, so this won't block.
561        self.wait()
562    }
563
564    /// Wait for the next request.
565    ///
566    /// This returns if a next request is available; if an error occurs; or if the RequestPromise
567    /// exceeds its deadline.
568    pub fn wait(self) -> Result<Request, NextRequestError> {
569        // Invalidate the wrapper, since next_request_wait consumes the handle on success.
570        let RequestPromise { mut handle } = self;
571        let raw = handle.0;
572        handle.0 = INVALID_REQUEST_PROMISE_HANDLE;
573
574        let mut req_handle = RequestHandle::default();
575        let mut body_handle = BodyHandle::default();
576        let status = unsafe {
577            fastly_sys::fastly_http_downstream::next_request_wait(
578                raw,
579                &mut req_handle,
580                &mut body_handle,
581            )
582        };
583        match status {
584            FastlyStatus::OK => Ok(Request::from_client_handles(req_handle, body_handle)),
585            FastlyStatus::UNSUPPORTED => Err(NextRequestError::OutstandingResponse),
586            FastlyStatus::NONE => Err(NextRequestError::EndOfSession),
587            e => Err(NextRequestError::Other(e)),
588        }
589    }
590}