fastly 0.12.0

Fastly Compute API
Documentation
use crate::{
    cache::core::{
        handle, handle::ReplaceOptions as HandleReplaceOptions, CacheError, CacheKey, Found,
        FoundInner, WriteOptions,
    },
    convert::{ToHeaderName, ToHeaderValue},
    handle::RequestHandle,
    http::body::StreamingBody,
};
use bytes::Bytes;
use fastly_sys::fastly_cache::CacheLookupState;
use http::{HeaderName, HeaderValue};
use std::{sync::Arc, time::Duration};

/// Strategy to use to replace the existing object
#[derive(Clone, Debug, Default)]
pub enum ReplaceStrategy {
    /// Immediately start the replace and do not wait for any other pending requests for the same
    /// object, including insert requests.
    ///
    /// With this strategy a replace will race all other pending requests to update the object.
    ///
    /// The existing object will be accessible until this replace finishes providing the
    /// replacement object.
    ///
    /// This is the default replace strategy.
    #[default]
    Immediate,
    /// Immediate, but remove the existing object immediately
    ///
    /// Requests for the same object that arrive after this replace starts will wait until this
    /// replace starts providing the replacement object.
    ImmediateForceMiss,
    /// Join the wait list behind other pending requests before starting this request.
    ///
    /// With this strategy this replace request will wait for an in-progress replace or insert
    /// request before starting.
    ///
    /// This strategy allows implementing a counter, but may cause timeouts if too many requests
    /// are waiting for in-progress and waiting updates to complete.
    Wait,
}

/// An owned variant of `HandleReplaceOptions`.
#[derive(Debug, Default)]
struct ReplaceOptions {
    request_headers: Option<RequestHandle>,
    replace_strategy: ReplaceStrategy,
    service: Option<String>,
    always_use_requested_range: bool,
}

impl ReplaceOptions {
    fn as_handle_options(&self) -> HandleReplaceOptions<'_> {
        HandleReplaceOptions {
            request_headers: self.request_headers.as_ref(),
            replace_strategy: self.replace_strategy.clone(),
            service: self.service.clone(),
            always_use_requested_range: self.always_use_requested_range,
        }
    }
}

/// A builder-style API for configuring a non-transactional replace.
pub struct ReplaceBuilder {
    key: CacheKey,
    options: ReplaceOptions,
}

/// Returns a [`ReplaceBuilder`] that will perform a transactional cache replacement.
///
/// ```no_run
/// # use fastly::cache::core::*;
/// # use std::io::Write;
/// # use std::time::Duration;
/// # fn f() -> Result<(), Box<dyn std::error::Error>> {
/// let current = replace(CacheKey::from_static(b"my_key")).begin()?;
///
/// let contents = if let Some(found) = current.existing_object() {
///     todo!("create replacement object from existing")
/// } else {
///     b"my cached object"
/// };
///
/// let mut writer = current
///     .surrogate_keys(["my_key"])
///     .known_length(contents.len() as u64)
///     .execute(Duration::from_secs(3600))?;
/// writer.write_all(contents)?;
/// writer.finish()?;
/// # Ok(()) }
/// ```
///
/// A cache replacement has two phases.  In the first phase following [`ReplaceBuilder::begin()`]
/// you have access to an existing object, if one exists, including its metadata.  In the second
/// phase following [`Replace::execute()`] you write the replacement object.  You may use the
/// existing object from the first phase to determine the metadata arguments for the replacement
/// object.
///
/// The [`ReplaceStrategy`] chosen controls whether one replace will wait for another and whether
/// or not a lookup request needs to wait for a the replace to start writing the replacement
/// object.
///
/// When replacing an object the request headers used to lookup an existing object provided by
/// [`ReplaceBuilder::header()`] or [`ReplaceBuilder::header_values()`] should match the request
/// headers used to write the replacement object provided by [`Replace::header()`] or
/// [`Replace::header_values()`].  Additionally if you use a lookup or insert to create an object
/// then later replace it the [`Replace::vary_by()`] should match.  If the header or vary values
/// are different between the existing object and the replacement object other requests looking up
/// the object may not be able to find the replacement object.
pub fn replace(key: CacheKey) -> ReplaceBuilder {
    ReplaceBuilder {
        key,
        options: ReplaceOptions::default(),
    }
}

impl ReplaceBuilder {
    /// Begin the replace, returning a [`Replace`] for reading the object to be replaced (if it
    /// exists) which is also used to provide the new replacement object.
    ///
    /// A [`Replace`] gives access to the existing object, if one is stored, that may be used to
    /// construct the replacement object. Use [`Replace::execute()`] to get a [`StreamingBody`] to
    /// write the replacement object into. The existing object cannot be accessed after calling
    /// [`Replace::execute()`].
    ///
    /// For the replace to complete successfully, the object must be written into the
    /// [`StreamingBody`], and then [`StreamingBody::finish()`] must be called. If the
    /// [`StreamingBody`] is dropped before calling [`StreamingBody::finish()`], the replacement is
    /// considered incomplete, and any concurrent lookups that may be reading from the object as it
    /// is streamed into the cache may encounter a streaming error.
    pub fn begin(self) -> Result<Replace, CacheError> {
        let replace_handle = handle::replace(self.key, &self.options.as_handle_options())?;
        let handle = Arc::new(replace_handle);

        let existing_object = if handle.get_state().contains(CacheLookupState::FOUND) {
            let found = Found {
                inner: FoundInner::ExistingObject(handle.clone()),
            };

            Some(found)
        } else {
            None
        };

        Ok(Replace {
            handle,
            existing_object,
            options: WriteOptions::default(),
        })
    }

    /// Sets a single-value header for looking up an existing object to replace, discarding any
    /// previous values associated with the header `name`.
    ///
    #[doc = include_str!("../../../docs/snippets/cache-headers.md")]
    pub fn header(self, name: impl ToHeaderName, value: impl ToHeaderValue) -> Self {
        self.header_values(name.into_owned(), Some(&value.into_owned()))
    }

    /// Sets a multi-value header for looking up an existing object to replace, discarding any
    /// previous values associated with the header `name`.
    ///
    #[doc = include_str!("../../../docs/snippets/cache-headers.md")]
    pub fn header_values<'a>(
        mut self,
        name: impl ToHeaderName,
        values: impl IntoIterator<Item = &'a HeaderValue>,
    ) -> Self {
        self.options
            .request_headers
            .get_or_insert_with(RequestHandle::new)
            .set_header_values(&name.into_owned(), values);
        self
    }

    /// Sets the strategy for performing the replace.
    ///
    /// See [`ReplaceStrategy`] for details.
    pub fn replace_strategy(mut self, strategy: ReplaceStrategy) -> Self {
        self.options.replace_strategy = strategy;
        self
    }

    #[doc=include_str!("../../../docs/snippets/always-use-requested-range.md")]
    pub fn always_use_requested_range(mut self) -> Self {
        self.options.always_use_requested_range = true;
        self
    }
}

/// An in-progress Replace operation.
pub struct Replace {
    handle: Arc<handle::CacheReplaceHandle>,
    existing_object: Option<Found>,
    options: WriteOptions,
}

impl Replace {
    /// Finish using the existing object and start writing a replacement object to the [`StreamingBody`].
    ///
    /// The required `max_age` argument is the "time to live" for the replacement cache item: the time
    /// for which the item will be considered fresh, starting from the start of its history
    /// (now, unless `initial_age` was provided).
    pub fn execute(self, max_age: Duration) -> Result<StreamingBody, CacheError> {
        let Replace {
            handle,
            mut options,
            existing_object,
        } = self;
        options.max_age = max_age;

        // Drop the `existing_object` field.
        drop(existing_object);

        // We've now dropped the `existing_object` field, so the `handle`
        // field's `Arc` will now be unique.
        let body_handle = Arc::into_inner(handle)
            .unwrap()
            .replace_insert(&options.as_handle_options())?;

        Ok(body_handle.into())
    }

    /// Sets the maximum time the cached item may live on a deliver node in a POP.
    pub fn deliver_node_max_age(mut self, duration: Duration) -> Self {
        self.options.edge_max_age = Some(duration);
        self
    }

    /// The existing object, if one exists.
    ///
    /// The existing object may be stale.
    pub fn existing_object(&self) -> Option<&Found> {
        self.existing_object.as_ref()
    }

    /// Sets a single-value header for the replacement object, discarding any previous values
    /// associated with the header `name`.
    ///
    #[doc = include_str!("../../../docs/snippets/cache-headers.md")]
    pub fn header(self, name: impl ToHeaderName, value: impl ToHeaderValue) -> Self {
        self.header_values(name.into_owned(), Some(&value.into_owned()))
    }

    /// Sets a multi-value header for the replacement object, discarding any previous values
    /// associated with the header `name`.
    ///
    #[doc = include_str!("../../../docs/snippets/cache-headers.md")]
    pub fn header_values<'a>(
        mut self,
        name: impl ToHeaderName,
        values: impl IntoIterator<Item = &'a HeaderValue>,
    ) -> Self {
        self.options
            .request_headers
            .get_or_insert_with(RequestHandle::new)
            .set_header_values(&name.into_owned(), values);
        self
    }

    /// Sets the initial age of the cached item, to be used in freshness calculations.
    ///
    /// The initial age is `Duration::ZERO` by default.
    pub fn initial_age(mut self, age: Duration) -> Self {
        self.options.initial_age = Some(age);
        self
    }

    #[doc = include_str!("../../../docs/snippets/cache-insert-known-length.md")]
    pub fn known_length(mut self, length: u64) -> Self {
        self.options.length = Some(length);
        self
    }

    #[doc = include_str!("../../../docs/snippets/cache-insert-sensitive-data.md")]
    pub fn sensitive_data(mut self, is_sensitive_data: bool) -> Self {
        self.options.sensitive_data = is_sensitive_data;
        self
    }

    #[doc = include_str!("../../../docs/snippets/cache-swr.md")]
    pub fn stale_while_revalidate(mut self, duration: Duration) -> Self {
        self.options.stale_while_revalidate = Some(duration);
        self
    }

    #[doc = include_str!("../../../docs/snippets/cache-insert-surrogate-keys.md")]
    pub fn surrogate_keys<'a>(mut self, keys: impl IntoIterator<Item = &'a str>) -> Self {
        self.options.surrogate_keys(keys);
        self
    }

    /// Sets the user-defined metadata to associate with the cached item.
    pub fn user_metadata(mut self, user_metadata: Bytes) -> Self {
        self.options.user_metadata = Some(user_metadata);
        self
    }

    /// Sets the list of headers that must match when looking up this cached item.
    ///
    #[doc = include_str!("../../../docs/snippets/cache-headers.md")]
    pub fn vary_by<'a>(mut self, headers: impl IntoIterator<Item = &'a HeaderName>) -> Self {
        self.options.vary_by(headers);
        self
    }
}