fastly/http/serve.rs
1//! Support for reusable sandboxes.
2//!
3//! Normally, each incoming HTTP request spins up a new Compute instance.
4//! This provides good reproducibility properties... but it means that if an instance has expensive
5//! initialization, that initialization must be repeated for each request.
6//! If a service has an expensive initialization routine, reusable instances can provide an
7//! efficiency improvement (and a cost improvement!)
8//!
9//! ## Timeouts
10//!
11//! A Compute instance is not a general-purpose server; it does not and should not indefinitely
12//! wait for new requests. Therefore, the time spent waiting for a new request is bounded in
13//! three different ways.
14//!
15//! - At the platform level, there is a maximum time over which a `RequestPromise` can be
16//! unfulfilled. The timer starts when the `RequestPromise` is created.
17//! After this timeout, `RequestPromise::wait` returns
18//! `NextRequestError::EndOfSession`, and the promise is no longer valid.
19//!
20//! This timeout is not publicly disclosed, and may change.
21//!
22//! - When a `RequestPromise` is created, the promise-level timeout can be configured
23//! in `NextRequestOptions::timeout`. The effective promise-level timeout is the minimum of the
24//! provided value and the platform-level timeout.
25//!
26//! - Separately from the promise-level timeout, `wait_timeout` can be used to time out a
27//! particular `wait` call. If `wait_timeout` times out while the promise is still valid,
28//! `wait_timeout` will return `RequestPromise::Timeout`, indicating the promise can be retried.
29//!
30//! ## Billing
31//!
32//! When a WASM instance waits on a `RequestPromise`, it continues to consume memory and wall-clock
33//! time. The memory is only freed when the WASM instance exits, as usual.
34//!
35//! If your service is billed by WASM memory * wall-clock time, you will be billed for the usage while
36//! waiting for a `RequestPromise`, even if the `RequestPromise` does not resolve to a request
37//! (i.e. times out). You can limit this usage by timing out the `RequestPromise`, as described
38//! above.
39//!
40//! A WASM instance that is blocked waiting on a `RequestPromise` does not consume vCPU time.
41//!
42//! ## Examples
43//!
44//! ```no_run
45//! # use fastly::http::serve::Serve;
46//! # use fastly::{Error, Response, Request};
47//! fn handler(_req: Request) -> Result<Response, Error> {
48//! Ok(Response::from_body("hello")
49//! .with_header("hello", "world!")
50//! .with_status(200))
51//! }
52//!
53//! fn main() -> Result<(), Error> {
54//! Serve::new()
55//! .with_max_requests(5)
56//! .run(handler)
57//! .into_result()
58//! }
59//! ```
60use std::time::{Duration, Instant};
61
62use fastly_shared::{FastlyStatus, INVALID_REQUEST_PROMISE_HANDLE};
63use fastly_sys::{fastly_http_downstream::NextRequestOptionsMask, BodyHandle, RequestHandle};
64
65use crate::compute_runtime::heap_memory_snapshot_mib;
66use crate::{Request, Response};
67
68/// Trait for supporting more flexible return types for the handler passed to [Serve::run].
69///
70/// The types that this is implemented for by default are:
71///
72/// - `Response`, for a handler that always generates a response.
73/// - `()`, for a handler that has already called either [Response::send_to_client] or
74/// [Response::stream_to_client].
75/// - `Result<(), E>`, for a handler that has already called either [Response::send_to_client] or
76/// [Response::stream_to_client], but might fail with an error of type `E` along the way.
77/// - `Result<Response, E>`, for a handle the might generate a response to send or an error.
78///
79/// For the `Result` implementations, a return value containing an error will attempt to send
80/// a 500 response status with a body containing the [ToString::to_string] representation of the
81/// error, in the same way that [fastly::main][crate::main] does.
82pub trait HandlerResult {
83 /// The error type generated by the handler.
84 type Error;
85
86 /// Perform any remaining work to send this response downstream.
87 fn send(self) -> Result<(), Self::Error>;
88}
89
90impl HandlerResult for Response {
91 type Error = std::convert::Infallible;
92
93 fn send(self) -> Result<(), Self::Error> {
94 self.send_to_client();
95 Ok(())
96 }
97}
98
99impl HandlerResult for () {
100 type Error = std::convert::Infallible;
101
102 fn send(self) -> Result<(), Self::Error> {
103 Ok(())
104 }
105}
106
107impl<E> HandlerResult for Result<Response, E>
108where
109 E: std::fmt::Display,
110{
111 type Error = E;
112
113 fn send(self) -> Result<(), Self::Error> {
114 match self {
115 Ok(resp) => {
116 resp.send_to_client();
117 Ok(())
118 }
119 Err(e) => {
120 send_internal_server_err(&e);
121 Err(e)
122 }
123 }
124 }
125}
126
127impl<E> HandlerResult for Result<(), E>
128where
129 E: std::fmt::Display,
130{
131 type Error = E;
132
133 fn send(self) -> Result<(), Self::Error> {
134 self.inspect_err(send_internal_server_err)
135 }
136}
137
138fn send_internal_server_err<E: std::fmt::Display>(e: &E) {
139 Response::from_body(e.to_string())
140 .with_status(crate::http::StatusCode::INTERNAL_SERVER_ERROR)
141 .send_to_client();
142}
143
144/// A summary of information from running [Serve].
145pub struct ServeSummary<E> {
146 error: Option<E>,
147 requests: usize,
148 time_wait: Duration,
149 time_handler: Duration,
150}
151
152impl<E> ServeSummary<E> {
153 fn new() -> Self {
154 Self {
155 error: None,
156 requests: 0,
157 time_wait: Duration::ZERO,
158 time_handler: Duration::ZERO,
159 }
160 }
161
162 /// Returns how many requests were processed by [Serve].
163 pub fn requests(&self) -> usize {
164 self.requests
165 }
166
167 /// Returns the total amount of time that [Serve] spent running the handler callback.
168 pub fn time_handler(&self) -> Duration {
169 self.time_handler
170 }
171
172 /// Returns the total amount of time that [Serve] spent waiting for another request.
173 pub fn time_waited(&self) -> Duration {
174 self.time_wait
175 }
176
177 /// Inspect the error returned by the handler that caused [Serve] to stop.
178 pub fn error(&self) -> Option<&E> {
179 self.error.as_ref()
180 }
181
182 /// Convert this summary into a [Result] containing the handler error that stopped [Serve].
183 pub fn into_result(self) -> Result<(), E> {
184 self.error.map_or(Ok(()), Err)
185 }
186
187 fn record_handler(&mut self, started: Instant) {
188 self.time_handler += started.elapsed();
189 }
190
191 fn record_wait(&mut self, started: Instant) {
192 self.time_wait += started.elapsed();
193 }
194}
195
196/// Support for processing multiple requests from a single sandbox.
197pub struct Serve {
198 created: Instant,
199 max_lifetime: Duration,
200 max_memory: u32,
201 max_requests: usize,
202 timeout: Duration,
203}
204
205impl Serve {
206 /// Prepare a new [Serve] instance for handling requests in this sandbox.
207 pub fn new() -> Self {
208 Self {
209 created: Instant::now(),
210 max_lifetime: Duration::MAX,
211 max_memory: 0,
212 max_requests: 0,
213 timeout: Duration::MAX,
214 }
215 }
216
217 /// Configure when to stop accepting additional requests.
218 ///
219 /// Defaults to [Duration::MAX].
220 pub fn with_max_lifetime(mut self, limit: Duration) -> Self {
221 self.max_lifetime = limit;
222 self
223 }
224
225 /// Configure the maximum amount of memory (in mebibytes) to allow this instance
226 /// to use before stopping, as reported by [heap_memory_snapshot_mib].
227 ///
228 /// A `limit` of `0` (the default) will be treated as unlimited.
229 pub fn with_max_memory(mut self, mib: u32) -> Self {
230 self.max_memory = mib;
231 self
232 }
233
234 /// Configure the maximum number of requests to serve before stopping.
235 ///
236 /// A `limit` of `0` (the default) will be treated as unlimited.
237 pub fn with_max_requests(mut self, limit: usize) -> Self {
238 self.max_requests = limit;
239 self
240 }
241
242 /// Configure how long to wait for another request before stopping.
243 ///
244 /// Defaults to [Duration::MAX].
245 pub fn with_timeout(mut self, timeout: Duration) -> Self {
246 self.timeout = timeout;
247 self
248 }
249
250 /// Begin serving requests in this sandbox.
251 pub fn run<F, MaybeResult>(self, mut handler: F) -> ServeSummary<MaybeResult::Error>
252 where
253 F: FnMut(Request) -> MaybeResult,
254 MaybeResult: HandlerResult,
255 {
256 self.run_with_context(move |r, ()| handler(r), &mut ())
257 }
258
259 /// Begin serving requests in this sandbox, but with a contextual argument passed
260 /// to every invocation of the handler callback.
261 ///
262 /// The callback should return a type that implements [HandlerResult], such as:
263 ///
264 /// - `Response`, if it always returns a response.
265 /// - `Result<Response, E>`, if it might fail with an error of type `E` instead of generating a response
266 pub fn run_with_context<F, C, MaybeResult>(
267 self,
268 mut handler: F,
269 cx: &mut C,
270 ) -> ServeSummary<MaybeResult::Error>
271 where
272 F: FnMut(Request, &mut C) -> MaybeResult,
273 MaybeResult: HandlerResult,
274 {
275 let options = NextRequestOptions::default().with_timeout(self.timeout);
276 let mut summary = ServeSummary::new();
277 let mut req = Request::from_client();
278
279 loop {
280 summary.requests += 1;
281
282 let start_handler = Instant::now();
283 let resp = handler(req, cx);
284 summary.record_handler(start_handler);
285
286 if let Err(e) = resp.send() {
287 summary.error = Some(e);
288 break;
289 }
290
291 // Check if we've hit the user-defined request limit, and stop if so:
292 if self.max_requests > 0 && summary.requests >= self.max_requests {
293 break;
294 }
295
296 // Check if we've hit the user-defined lifetime limit, and stop if so:
297 if self.created.elapsed() >= self.max_lifetime {
298 break;
299 }
300
301 // Check if we've hit the user-defined memory limit, and stop if so. If
302 // we are unable to read our memory usage, error on the side of caution
303 // and assume that it's too large for us to continue.
304 if self.max_memory > 0 {
305 let usage = heap_memory_snapshot_mib().unwrap_or(u32::MAX);
306
307 if usage > self.max_memory {
308 break;
309 }
310 }
311
312 let start_wait = Instant::now();
313
314 let next = match RequestPromise::new(&options) {
315 Ok(p) => p,
316 Err(e) => {
317 // Failed to register a promise, which should never happen unless
318 // there is already an existing handle! Something has clearly gone
319 // wrong in the program, so panic.
320 panic!("failed to register promise for receiving another request: {e}");
321 }
322 };
323
324 let res = next.wait();
325 summary.record_wait(start_wait);
326
327 if let Ok(r) = res {
328 req = r;
329 } else {
330 break;
331 }
332 }
333
334 summary
335 }
336}
337
338impl Default for Serve {
339 fn default() -> Self {
340 Self::new()
341 }
342}
343
344#[derive(Debug)]
345struct RequestPromiseHandle(fastly_sys::RequestPromiseHandle);
346
347impl Default for RequestPromiseHandle {
348 fn default() -> Self {
349 RequestPromiseHandle(INVALID_REQUEST_PROMISE_HANDLE)
350 }
351}
352
353impl From<fastly_sys::RequestPromiseHandle> for RequestPromiseHandle {
354 fn from(value: fastly_sys::RequestPromiseHandle) -> Self {
355 RequestPromiseHandle(value)
356 }
357}
358
359impl Drop for RequestPromiseHandle {
360 fn drop(&mut self) {
361 if self.0 == INVALID_REQUEST_PROMISE_HANDLE {
362 // Already invalidated.
363 return;
364 }
365 let status = unsafe { fastly_sys::fastly_http_downstream::next_request_abandon(self.0) };
366 if status != FastlyStatus::OK {
367 panic!("failed to abandon request promise: {status:?}");
368 }
369 self.0 = INVALID_REQUEST_PROMISE_HANDLE;
370 }
371}
372
373/// A promise of a future Request from a customer.
374///
375/// When .wait() is called, this will resolve to either:
376/// - a `Request`, if an additional user request has been assigned to this instance
377/// - an `Error`, if:
378/// - The current `Request` has not been completed
379/// - A limit has been reached (number of requests per instance, timeout waiting for the next
380/// request)
381#[derive(Debug)]
382pub struct RequestPromise {
383 handle: RequestPromiseHandle,
384}
385
386impl TryFrom<RequestPromiseHandle> for RequestPromise {
387 type Error = FastlyStatus;
388
389 fn try_from(handle: RequestPromiseHandle) -> Result<Self, Self::Error> {
390 if handle.0 == INVALID_REQUEST_PROMISE_HANDLE {
391 Err(FastlyStatus::BADF)
392 } else {
393 Ok(RequestPromise { handle })
394 }
395 }
396}
397
398/// Settings to use when waiting for a second (third, fourth, ...) request.
399#[derive(Default)]
400pub struct NextRequestOptions {
401 /// How long the promise will wait for a new request.
402 ///
403 /// This timer starts at the creation of the RequestPromise.
404 /// If you want to bound how long a given .wait call lasts, use .wait_timeout.
405 timeout: Option<Duration>,
406}
407
408impl NextRequestOptions {
409 /// Set a bound on how long a RequestPromise can be outstanding.
410 ///
411 /// This timer starts at the creation of the RequestPromise.
412 /// After the timeout, if not request is ready, the RequestPromise will report
413 /// [NextRequestError::EndOfSession].
414 ///
415 /// If you want to bound how long a given .wait call lasts, use .wait_timeout.
416 pub fn with_timeout(mut self, timeout: Duration) -> Self {
417 self.timeout = Some(timeout);
418 self
419 }
420
421 /// Shorthand for `default().with_timeout(timeout)`;
422 /// see [NextRequestOptions::with_timeout].
423 pub fn from_timeout(timeout: Duration) -> Self {
424 Self {
425 timeout: Some(timeout),
426 }
427 }
428}
429
430impl From<&NextRequestOptions>
431 for (
432 fastly_sys::fastly_http_downstream::NextRequestOptionsMask,
433 fastly_sys::fastly_http_downstream::NextRequestOptions,
434 )
435{
436 fn from(value: &NextRequestOptions) -> Self {
437 let mut options = fastly_sys::fastly_http_downstream::NextRequestOptions::default();
438 let mut mask = fastly_sys::fastly_http_downstream::NextRequestOptionsMask::default();
439
440 if let Some(timeout) = value.timeout {
441 options.timeout_ms = timeout.as_millis().try_into().unwrap_or(u64::MAX);
442 mask |= NextRequestOptionsMask::TIMEOUT;
443 }
444
445 (mask, options)
446 }
447}
448
449/// Errors that can occur when asking for or resolving a RequestPromise.
450#[derive(Debug, thiserror::Error)]
451pub enum NextRequestError {
452 /// No request was available, but one may become available in the future.
453 ///
454 /// Note that the RequestPromise is still valid, and may provide a request in the
455 /// future. If you no longer intend to accept an additional request, drop the RequestPromise
456 /// promptly after receiving the timeout.
457 #[error("no request available (yet)")]
458 Timeout(RequestPromise),
459
460 /// No request is available because the response for the current request has not completed.
461 ///
462 /// The response for the current request must be completed before a promise can be fulfilled.
463 // How can you have any pudding if you don't eat your meat?
464 #[error("current response is not yet finished")]
465 OutstandingResponse,
466
467 /// No more requests will be delivered to this sandbox.
468 ///
469 /// This can occur if the RequestPromise has exceeded its overall timeout or if another limit
470 /// has been reached.
471 #[error("no future requests available for this instance")]
472 EndOfSession,
473
474 /// Any other error.
475 #[error("error while waiting for next request: {0:?}")]
476 Other(FastlyStatus),
477}
478
479impl RequestPromise {
480 /// Create a new promise for receiving a subsequent downstream request.
481 pub fn new(reuse_settings: &NextRequestOptions) -> Result<RequestPromise, NextRequestError> {
482 let (options_mask, options) = reuse_settings.into();
483 let mut handle = RequestPromiseHandle::default();
484 let status = unsafe {
485 fastly_sys::fastly_http_downstream::next_request(
486 options_mask,
487 &options as *const fastly_sys::fastly_http_downstream::NextRequestOptions,
488 &mut handle.0,
489 )
490 };
491 if status == FastlyStatus::OK {
492 Ok(handle.try_into().unwrap())
493 } else {
494 Err(NextRequestError::Other(status))
495 }
496 }
497
498 /// Wait for the next request, up to the timeout provided here.
499 ///
500 /// The timeout here represents how long this method waits, and not whether the handle
501 /// has become invalid. If a timeout of zero is provided, this acts as a poll-once, and
502 /// returns immediately with [NextRequestError::Timeout] if there is not a request ready.
503 ///
504 /// Note that if the timeout is reached, this instance is still registered to receive a future
505 /// request; the RequestPromise can be recovered from the [NextRequestError::Timeout]
506 /// and another `wait_timeout` or `wait` call may be attempted on it.
507 ///
508 /// If this instance needs to shut down, dropping the [RequestPromise] or
509 /// [NextRequestError] will abandon the promise and avoid queueing future requests for
510 /// the session.
511 ///
512 /// Note that the timeout has an unspecified minimum resolution.
513 /// If a nonzero timeout smaller than the minimum resolution is provided, the time will be
514 /// rounded up to this resolution.
515 pub fn wait_timeout(self, timeout: Duration) -> Result<Request, NextRequestError> {
516 // "borrow" the handle without deconstructing it;
517 // we are just checking its readiness.
518 let raw = self.handle.0;
519
520 // async_io::select uses "timeout of zero" to mean "wait indefinitely".
521 // To provide "timeout at 0" behavior to the callers here,
522 // we use `is_ready` instead of `select` if the timeout is zero.
523 if timeout.is_zero() {
524 let mut ready = 0u32;
525 let status = unsafe { fastly_sys::fastly_async_io::is_ready(raw, &mut ready) };
526
527 match (status, ready != 0) {
528 // No errors, reported ready
529 (e, true) if e.is_ok() => (),
530 // No errors, reported nonready: it's OK to try again with this handle
531 (e, false) if e.is_ok() => return Err(NextRequestError::Timeout(self)),
532 // Some other error: pass it through.
533 // Note that this will drop the RequestPromise. The types reflect this, preventing
534 // re-use.
535 (e, _) => return Err(NextRequestError::Other(e)),
536 }
537 } else {
538 // If asked for a sub-millisecond time, round up to 1ms for the _select_ call.
539 let millis = timeout.as_millis().try_into().unwrap_or(u32::MAX).max(1);
540
541 let raw_handles = [raw];
542 let mut done_index = 0u32;
543
544 let status = unsafe {
545 fastly_sys::fastly_async_io::select(
546 &raw_handles as *const u32,
547 raw_handles.len(),
548 millis,
549 &mut done_index as *mut u32,
550 )
551 };
552
553 match status {
554 FastlyStatus::OK if done_index == 0 => (),
555 FastlyStatus::OK => return Err(NextRequestError::Timeout(self)),
556 e => return Err(NextRequestError::Other(e)),
557 }
558 };
559
560 // We have an indicator that the request is ready, so this won't block.
561 self.wait()
562 }
563
564 /// Wait for the next request.
565 ///
566 /// This returns if a next request is available; if an error occurs; or if the RequestPromise
567 /// exceeds its deadline.
568 pub fn wait(self) -> Result<Request, NextRequestError> {
569 // Invalidate the wrapper, since next_request_wait consumes the handle on success.
570 let RequestPromise { mut handle } = self;
571 let raw = handle.0;
572 handle.0 = INVALID_REQUEST_PROMISE_HANDLE;
573
574 let mut req_handle = RequestHandle::default();
575 let mut body_handle = BodyHandle::default();
576 let status = unsafe {
577 fastly_sys::fastly_http_downstream::next_request_wait(
578 raw,
579 &mut req_handle,
580 &mut body_handle,
581 )
582 };
583 match status {
584 FastlyStatus::OK => Ok(Request::from_client_handles(req_handle, body_handle)),
585 FastlyStatus::UNSUPPORTED => Err(NextRequestError::OutstandingResponse),
586 FastlyStatus::NONE => Err(NextRequestError::EndOfSession),
587 e => Err(NextRequestError::Other(e)),
588 }
589 }
590}