fastly 0.12.0

Fastly Compute API
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
//! Support for reusable sandboxes.
//!
//! Normally, each incoming HTTP request spins up a new Compute instance.
//! This provides good reproducibility properties... but it means that if an instance has expensive
//! initialization, that initialization must be repeated for each request.
//! If a service has an expensive initialization routine, reusable instances can provide an
//! efficiency improvement (and a cost improvement!)
//!
//! ## Timeouts
//!
//! A Compute instance is not a general-purpose server; it does not and should not indefinitely
//! wait for new requests. Therefore, the time spent waiting for a new request is bounded in
//! three different ways.
//!
//! -   At the platform level, there is a maximum time over which a `RequestPromise` can be
//!     unfulfilled. The timer starts when the `RequestPromise` is created.
//!     After this timeout, `RequestPromise::wait` returns
//!     `NextRequestError::EndOfSession`, and the promise is no longer valid.
//!
//!     This timeout is not publicly disclosed, and may change.
//!
//! -   When a `RequestPromise` is created, the promise-level timeout can be configured
//!     in `NextRequestOptions::timeout`. The effective promise-level timeout is the minimum of the
//!     provided value and the platform-level timeout.
//!
//! -   Separately from the promise-level timeout, `wait_timeout` can be used to time out a
//!     particular `wait` call. If `wait_timeout` times out while the promise is still valid,
//!     `wait_timeout` will return `RequestPromise::Timeout`, indicating the promise can be retried.
//!
//! ## Billing
//!
//! When a WASM instance waits on a `RequestPromise`, it continues to consume memory and wall-clock
//! time. The memory is only freed when the WASM instance exits, as usual.
//!
//! If your service is billed by WASM memory * wall-clock time, you will be billed for the usage while
//! waiting for a `RequestPromise`, even if the `RequestPromise` does not resolve to a request
//! (i.e. times out). You can limit this usage by timing out the `RequestPromise`, as described
//! above.
//!
//! A WASM instance that is blocked waiting on a `RequestPromise` does not consume vCPU time.
//!
//! ## Examples
//!
//! ```no_run
//! # use fastly::http::serve::Serve;
//! # use fastly::{Error, Response, Request};
//! fn handler(_req: Request) -> Result<Response, Error> {
//!     Ok(Response::from_body("hello")
//!         .with_header("hello", "world!")
//!         .with_status(200))
//! }
//!
//! fn main() -> Result<(), Error> {
//!     Serve::new()
//!         .with_max_requests(5)
//!         .run(handler)
//!         .into_result()
//! }
//! ```
use std::time::{Duration, Instant};

use fastly_shared::{FastlyStatus, INVALID_REQUEST_PROMISE_HANDLE};
use fastly_sys::{fastly_http_downstream::NextRequestOptionsMask, BodyHandle, RequestHandle};

use crate::compute_runtime::heap_memory_snapshot_mib;
use crate::{Request, Response};

/// Trait for supporting more flexible return types for the handler passed to [Serve::run].
///
/// The types that this is implemented for by default are:
///
/// - `Response`, for a handler that always generates a response.
/// - `()`, for a handler that has already called either [Response::send_to_client] or
///   [Response::stream_to_client].
/// - `Result<(), E>`, for a handler that has already called either [Response::send_to_client] or
///   [Response::stream_to_client], but might fail with an error of type `E` along the way.
/// - `Result<Response, E>`, for a handle the might generate a response to send or an error.
///
/// For the `Result` implementations, a return value containing an error will attempt to send
/// a 500 response status with a body containing the [ToString::to_string] representation of the
/// error, in the same way that [fastly::main][crate::main] does.
pub trait HandlerResult {
    /// The error type generated by the handler.
    type Error;

    /// Perform any remaining work to send this response downstream.
    fn send(self) -> Result<(), Self::Error>;
}

impl HandlerResult for Response {
    type Error = std::convert::Infallible;

    fn send(self) -> Result<(), Self::Error> {
        self.send_to_client();
        Ok(())
    }
}

impl HandlerResult for () {
    type Error = std::convert::Infallible;

    fn send(self) -> Result<(), Self::Error> {
        Ok(())
    }
}

impl<E> HandlerResult for Result<Response, E>
where
    E: std::fmt::Display,
{
    type Error = E;

    fn send(self) -> Result<(), Self::Error> {
        match self {
            Ok(resp) => {
                resp.send_to_client();
                Ok(())
            }
            Err(e) => {
                send_internal_server_err(&e);
                Err(e)
            }
        }
    }
}

impl<E> HandlerResult for Result<(), E>
where
    E: std::fmt::Display,
{
    type Error = E;

    fn send(self) -> Result<(), Self::Error> {
        self.inspect_err(send_internal_server_err)
    }
}

fn send_internal_server_err<E: std::fmt::Display>(e: &E) {
    Response::from_body(e.to_string())
        .with_status(crate::http::StatusCode::INTERNAL_SERVER_ERROR)
        .send_to_client();
}

/// A summary of information from running [Serve].
pub struct ServeSummary<E> {
    error: Option<E>,
    requests: usize,
    time_wait: Duration,
    time_handler: Duration,
}

impl<E> ServeSummary<E> {
    fn new() -> Self {
        Self {
            error: None,
            requests: 0,
            time_wait: Duration::ZERO,
            time_handler: Duration::ZERO,
        }
    }

    /// Returns how many requests were processed by [Serve].
    pub fn requests(&self) -> usize {
        self.requests
    }

    /// Returns the total amount of time that [Serve] spent running the handler callback.
    pub fn time_handler(&self) -> Duration {
        self.time_handler
    }

    /// Returns the total amount of time that [Serve] spent waiting for another request.
    pub fn time_waited(&self) -> Duration {
        self.time_wait
    }

    /// Inspect the error returned by the handler that caused [Serve] to stop.
    pub fn error(&self) -> Option<&E> {
        self.error.as_ref()
    }

    /// Convert this summary into a [Result] containing the handler error that stopped [Serve].
    pub fn into_result(self) -> Result<(), E> {
        self.error.map_or(Ok(()), Err)
    }

    fn record_handler(&mut self, started: Instant) {
        self.time_handler += started.elapsed();
    }

    fn record_wait(&mut self, started: Instant) {
        self.time_wait += started.elapsed();
    }
}

/// Support for processing multiple requests from a single sandbox.
pub struct Serve {
    created: Instant,
    max_lifetime: Duration,
    max_memory: u32,
    max_requests: usize,
    timeout: Duration,
}

impl Serve {
    /// Prepare a new [Serve] instance for handling requests in this sandbox.
    pub fn new() -> Self {
        Self {
            created: Instant::now(),
            max_lifetime: Duration::MAX,
            max_memory: 0,
            max_requests: 0,
            timeout: Duration::MAX,
        }
    }

    /// Configure when to stop accepting additional requests.
    ///
    /// Defaults to [Duration::MAX].
    pub fn with_max_lifetime(mut self, limit: Duration) -> Self {
        self.max_lifetime = limit;
        self
    }

    /// Configure the maximum amount of memory (in mebibytes) to allow this instance
    /// to use before stopping, as reported by [heap_memory_snapshot_mib].
    ///
    /// A `limit` of `0` (the default) will be treated as unlimited.
    pub fn with_max_memory(mut self, mib: u32) -> Self {
        self.max_memory = mib;
        self
    }

    /// Configure the maximum number of requests to serve before stopping.
    ///
    /// A `limit` of `0` (the default) will be treated as unlimited.
    pub fn with_max_requests(mut self, limit: usize) -> Self {
        self.max_requests = limit;
        self
    }

    /// Configure how long to wait for another request before stopping.
    ///
    /// Defaults to [Duration::MAX].
    pub fn with_timeout(mut self, timeout: Duration) -> Self {
        self.timeout = timeout;
        self
    }

    /// Begin serving requests in this sandbox.
    pub fn run<F, MaybeResult>(self, mut handler: F) -> ServeSummary<MaybeResult::Error>
    where
        F: FnMut(Request) -> MaybeResult,
        MaybeResult: HandlerResult,
    {
        self.run_with_context(move |r, ()| handler(r), &mut ())
    }

    /// Begin serving requests in this sandbox, but with a contextual argument passed
    /// to every invocation of the handler callback.
    ///
    /// The callback should return a type that implements [HandlerResult], such as:
    ///
    /// - `Response`, if it always returns a response.
    /// - `Result<Response, E>`, if it might fail with an error of type `E` instead of generating a response
    pub fn run_with_context<F, C, MaybeResult>(
        self,
        mut handler: F,
        cx: &mut C,
    ) -> ServeSummary<MaybeResult::Error>
    where
        F: FnMut(Request, &mut C) -> MaybeResult,
        MaybeResult: HandlerResult,
    {
        let options = NextRequestOptions::default().with_timeout(self.timeout);
        let mut summary = ServeSummary::new();
        let mut req = Request::from_client();

        loop {
            summary.requests += 1;

            let start_handler = Instant::now();
            let resp = handler(req, cx);
            summary.record_handler(start_handler);

            if let Err(e) = resp.send() {
                summary.error = Some(e);
                break;
            }

            // Check if we've hit the user-defined request limit, and stop if so:
            if self.max_requests > 0 && summary.requests >= self.max_requests {
                break;
            }

            // Check if we've hit the user-defined lifetime limit, and stop if so:
            if self.created.elapsed() >= self.max_lifetime {
                break;
            }

            // Check if we've hit the user-defined memory limit, and stop if so. If
            // we are unable to read our memory usage, error on the side of caution
            // and assume that it's too large for us to continue.
            if self.max_memory > 0 {
                let usage = heap_memory_snapshot_mib().unwrap_or(u32::MAX);

                if usage > self.max_memory {
                    break;
                }
            }

            let start_wait = Instant::now();

            let next = match RequestPromise::new(&options) {
                Ok(p) => p,
                Err(e) => {
                    // Failed to register a promise, which should never happen unless
                    // there is already an existing handle! Something has clearly gone
                    // wrong in the program, so panic.
                    panic!("failed to register promise for receiving another request: {e}");
                }
            };

            let res = next.wait();
            summary.record_wait(start_wait);

            if let Ok(r) = res {
                req = r;
            } else {
                break;
            }
        }

        summary
    }
}

impl Default for Serve {
    fn default() -> Self {
        Self::new()
    }
}

#[derive(Debug)]
struct RequestPromiseHandle(fastly_sys::RequestPromiseHandle);

impl Default for RequestPromiseHandle {
    fn default() -> Self {
        RequestPromiseHandle(INVALID_REQUEST_PROMISE_HANDLE)
    }
}

impl From<fastly_sys::RequestPromiseHandle> for RequestPromiseHandle {
    fn from(value: fastly_sys::RequestPromiseHandle) -> Self {
        RequestPromiseHandle(value)
    }
}

impl Drop for RequestPromiseHandle {
    fn drop(&mut self) {
        if self.0 == INVALID_REQUEST_PROMISE_HANDLE {
            // Already invalidated.
            return;
        }
        let status = unsafe { fastly_sys::fastly_http_downstream::next_request_abandon(self.0) };
        if status != FastlyStatus::OK {
            panic!("failed to abandon request promise: {status:?}");
        }
        self.0 = INVALID_REQUEST_PROMISE_HANDLE;
    }
}

/// A promise of a future Request from a customer.
///
/// When .wait() is called, this will resolve to either:
/// - a `Request`, if an additional user request has been assigned to this instance
/// - an `Error`, if:
///   - The current `Request` has not been completed
///   - A limit has been reached (number of requests per instance, timeout waiting for the next
///   request)
#[derive(Debug)]
pub struct RequestPromise {
    handle: RequestPromiseHandle,
}

impl TryFrom<RequestPromiseHandle> for RequestPromise {
    type Error = FastlyStatus;

    fn try_from(handle: RequestPromiseHandle) -> Result<Self, Self::Error> {
        if handle.0 == INVALID_REQUEST_PROMISE_HANDLE {
            Err(FastlyStatus::BADF)
        } else {
            Ok(RequestPromise { handle })
        }
    }
}

/// Settings to use when waiting for a second (third, fourth, ...) request.
#[derive(Default)]
pub struct NextRequestOptions {
    /// How long the promise will wait for a new request.
    ///
    /// This timer starts at the creation of the RequestPromise.
    /// If you want to bound how long a given .wait call lasts, use .wait_timeout.
    timeout: Option<Duration>,
}

impl NextRequestOptions {
    /// Set a bound on how long a RequestPromise can be outstanding.
    ///
    /// This timer starts at the creation of the RequestPromise.
    /// After the timeout, if not request is ready, the RequestPromise will report
    /// [NextRequestError::EndOfSession].
    ///
    /// If you want to bound how long a given .wait call lasts, use .wait_timeout.
    pub fn with_timeout(mut self, timeout: Duration) -> Self {
        self.timeout = Some(timeout);
        self
    }

    /// Shorthand for `default().with_timeout(timeout)`;
    /// see [NextRequestOptions::with_timeout].
    pub fn from_timeout(timeout: Duration) -> Self {
        Self {
            timeout: Some(timeout),
        }
    }
}

impl From<&NextRequestOptions>
    for (
        fastly_sys::fastly_http_downstream::NextRequestOptionsMask,
        fastly_sys::fastly_http_downstream::NextRequestOptions,
    )
{
    fn from(value: &NextRequestOptions) -> Self {
        let mut options = fastly_sys::fastly_http_downstream::NextRequestOptions::default();
        let mut mask = fastly_sys::fastly_http_downstream::NextRequestOptionsMask::default();

        if let Some(timeout) = value.timeout {
            options.timeout_ms = timeout.as_millis().try_into().unwrap_or(u64::MAX);
            mask |= NextRequestOptionsMask::TIMEOUT;
        }

        (mask, options)
    }
}

/// Errors that can occur when asking for or resolving a RequestPromise.
#[derive(Debug, thiserror::Error)]
pub enum NextRequestError {
    /// No request was available, but one may become available in the future.
    ///
    /// Note that the RequestPromise is still valid, and may provide a request in the
    /// future. If you no longer intend to accept an additional request, drop the RequestPromise
    /// promptly after receiving the timeout.
    #[error("no request available (yet)")]
    Timeout(RequestPromise),

    /// No request is available because the response for the current request has not completed.
    ///
    /// The response for the current request must be completed before a promise can be fulfilled.
    // How can you have any pudding if you don't eat your meat?
    #[error("current response is not yet finished")]
    OutstandingResponse,

    /// No more requests will be delivered to this sandbox.
    ///
    /// This can occur if the RequestPromise has exceeded its overall timeout or if another limit
    /// has been reached.
    #[error("no future requests available for this instance")]
    EndOfSession,

    /// Any other error.
    #[error("error while waiting for next request: {0:?}")]
    Other(FastlyStatus),
}

impl RequestPromise {
    /// Create a new promise for receiving a subsequent downstream request.
    pub fn new(reuse_settings: &NextRequestOptions) -> Result<RequestPromise, NextRequestError> {
        let (options_mask, options) = reuse_settings.into();
        let mut handle = RequestPromiseHandle::default();
        let status = unsafe {
            fastly_sys::fastly_http_downstream::next_request(
                options_mask,
                &options as *const fastly_sys::fastly_http_downstream::NextRequestOptions,
                &mut handle.0,
            )
        };
        if status == FastlyStatus::OK {
            Ok(handle.try_into().unwrap())
        } else {
            Err(NextRequestError::Other(status))
        }
    }

    /// Wait for the next request, up to the timeout provided here.
    ///
    /// The timeout here represents how long this method waits, and not whether the handle
    /// has become invalid. If a timeout of zero is provided, this acts as a poll-once, and
    /// returns immediately with [NextRequestError::Timeout] if there is not a request ready.
    ///
    /// Note that if the timeout is reached, this instance is still registered to receive a future
    /// request; the RequestPromise can be recovered from the [NextRequestError::Timeout]
    /// and another `wait_timeout` or `wait` call may be attempted on it.
    ///
    /// If this instance needs to shut down, dropping the [RequestPromise] or
    /// [NextRequestError] will abandon the promise and avoid queueing future requests for
    /// the session.
    ///
    /// Note that the timeout has an unspecified minimum resolution.
    /// If a nonzero timeout smaller than the minimum resolution is provided, the time will be
    /// rounded up to this resolution.
    pub fn wait_timeout(self, timeout: Duration) -> Result<Request, NextRequestError> {
        // "borrow" the handle without deconstructing it;
        // we are just checking its readiness.
        let raw = self.handle.0;

        // async_io::select uses "timeout of zero" to mean "wait indefinitely".
        // To provide "timeout at 0" behavior to the callers here,
        // we use `is_ready` instead of `select` if the timeout is zero.
        if timeout.is_zero() {
            let mut ready = 0u32;
            let status = unsafe { fastly_sys::fastly_async_io::is_ready(raw, &mut ready) };

            match (status, ready != 0) {
                // No errors, reported ready
                (e, true) if e.is_ok() => (),
                // No errors, reported nonready: it's OK to try again with this handle
                (e, false) if e.is_ok() => return Err(NextRequestError::Timeout(self)),
                // Some other error: pass it through.
                // Note that this will drop the RequestPromise. The types reflect this, preventing
                // re-use.
                (e, _) => return Err(NextRequestError::Other(e)),
            }
        } else {
            // If asked for a sub-millisecond time, round up to 1ms for the _select_ call.
            let millis = timeout.as_millis().try_into().unwrap_or(u32::MAX).max(1);

            let raw_handles = [raw];
            let mut done_index = 0u32;

            let status = unsafe {
                fastly_sys::fastly_async_io::select(
                    &raw_handles as *const u32,
                    raw_handles.len(),
                    millis,
                    &mut done_index as *mut u32,
                )
            };

            match status {
                FastlyStatus::OK if done_index == 0 => (),
                FastlyStatus::OK => return Err(NextRequestError::Timeout(self)),
                e => return Err(NextRequestError::Other(e)),
            }
        };

        // We have an indicator that the request is ready, so this won't block.
        self.wait()
    }

    /// Wait for the next request.
    ///
    /// This returns if a next request is available; if an error occurs; or if the RequestPromise
    /// exceeds its deadline.
    pub fn wait(self) -> Result<Request, NextRequestError> {
        // Invalidate the wrapper, since next_request_wait consumes the handle on success.
        let RequestPromise { mut handle } = self;
        let raw = handle.0;
        handle.0 = INVALID_REQUEST_PROMISE_HANDLE;

        let mut req_handle = RequestHandle::default();
        let mut body_handle = BodyHandle::default();
        let status = unsafe {
            fastly_sys::fastly_http_downstream::next_request_wait(
                raw,
                &mut req_handle,
                &mut body_handle,
            )
        };
        match status {
            FastlyStatus::OK => Ok(Request::from_client_handles(req_handle, body_handle)),
            FastlyStatus::UNSUPPORTED => Err(NextRequestError::OutstandingResponse),
            FastlyStatus::NONE => Err(NextRequestError::EndOfSession),
            e => Err(NextRequestError::Other(e)),
        }
    }
}