use self::handle::{
CacheHandle, CacheReplaceHandle, GetBodyOptions, LookupOptions as HandleLookupOptions,
WriteOptions as HandleWriteOptions,
};
use crate::{
convert::{ToHeaderName, ToHeaderValue},
handle::{BodyHandle, RequestHandle},
http::{
body::{Body, StreamingBody},
HeaderName, HeaderValue,
},
};
use bytes::Bytes;
use fastly_shared::FastlyStatus;
use std::{sync::Arc, time::Duration};
mod handle;
mod replace;
#[doc(hidden)]
pub use handle::CacheBusyHandle;
pub use handle::CacheKey;
use handle::CacheLookupState;
pub use replace::{replace, Replace, ReplaceBuilder, ReplaceStrategy};
#[derive(Debug, thiserror::Error)]
#[non_exhaustive]
pub enum CacheError {
#[error("cache operation failed due to a limit")]
LimitExceeded,
#[error("invalid cache operation")]
InvalidOperation,
#[error("unsupported cache operation")]
Unsupported,
#[error("unknown cache operation error; please report this as a bug: {0:?}")]
Other(FastlyStatus),
}
impl From<FastlyStatus> for CacheError {
fn from(status: FastlyStatus) -> Self {
match status {
FastlyStatus::UNSUPPORTED => CacheError::Unsupported,
FastlyStatus::LIMITEXCEEDED => CacheError::LimitExceeded,
FastlyStatus::BADF => CacheError::InvalidOperation,
other => CacheError::Other(other),
}
}
}
#[derive(Default)]
struct LookupOptions {
request_headers: Option<RequestHandle>,
service: Option<String>,
always_use_requested_range: bool,
}
impl LookupOptions {
fn as_handle_options(&self) -> HandleLookupOptions<'_> {
HandleLookupOptions {
request_headers: self.request_headers.as_ref(),
service: self.service.clone(),
always_use_requested_range: self.always_use_requested_range,
}
}
}
pub struct LookupBuilder {
key: CacheKey,
options: LookupOptions,
}
pub fn lookup(key: CacheKey) -> LookupBuilder {
LookupBuilder {
key,
options: LookupOptions::default(),
}
}
impl LookupBuilder {
#[doc = include_str!("../../docs/snippets/cache-headers.md")]
pub fn header_values<'a>(
mut self,
name: impl ToHeaderName,
values: impl IntoIterator<Item = &'a HeaderValue>,
) -> Self {
self.options
.request_headers
.get_or_insert_with(RequestHandle::new)
.set_header_values(&name.into_owned(), values);
self
}
#[doc = include_str!("../../docs/snippets/cache-headers.md")]
pub fn header(self, name: impl ToHeaderName, value: impl ToHeaderValue) -> Self {
self.header_values(name.into_owned(), Some(&value.into_owned()))
}
#[doc = include_str!("../../docs/snippets/privileged_behalf.md")]
pub fn on_behalf_of(mut self, service: impl ToString) -> Self {
self.options.service = Some(service.to_string());
self
}
#[doc=include_str!("../../docs/snippets/always-use-requested-range.md")]
pub fn always_use_requested_range(mut self) -> Self {
self.options.always_use_requested_range = true;
self
}
pub fn execute(self) -> Result<Option<Found>, CacheError> {
let cache_handle = handle::lookup(self.key, &self.options.as_handle_options())?;
cache_handle.wait()?;
if cache_handle.get_state().contains(CacheLookupState::FOUND) {
Ok(Some(Found {
inner: FoundInner::Lookup(Arc::new(cache_handle)),
}))
} else {
Ok(None)
}
}
}
pub struct PendingTransaction {
handle: handle::CacheBusyHandle,
}
impl PendingTransaction {
pub fn pending(&self) -> Result<bool, CacheError> {
Ok(!self.handle.is_ready()?)
}
pub fn wait(self) -> Result<Transaction, CacheError> {
let cache_handle = self.handle.wait()?;
cache_handle.wait()?;
Ok(Transaction {
handle: Arc::new(cache_handle),
})
}
#[doc(hidden)]
pub unsafe fn from_handle(handle: CacheBusyHandle) -> Self {
Self { handle }
}
#[doc(hidden)]
pub unsafe fn into_handle(self) -> CacheBusyHandle {
self.handle
}
}
pub(crate) enum FoundInner {
ExistingObject(Arc<CacheReplaceHandle>),
Lookup(Arc<CacheHandle>),
}
impl FoundInner {
fn age(&self) -> Duration {
let age_ns = match self {
FoundInner::ExistingObject(handle) => handle.get_age_ns(),
FoundInner::Lookup(handle) => handle.get_age_ns(),
};
Duration::from_nanos(age_ns.expect("`Found` is missing age metadata"))
}
fn get_body(&self, options: &GetBodyOptions) -> Result<Option<BodyHandle>, FastlyStatus> {
match self {
FoundInner::ExistingObject(handle) => handle.get_body(options),
FoundInner::Lookup(handle) => handle.get_body(options),
}
}
fn get_state(&self) -> CacheLookupState {
match self {
FoundInner::ExistingObject(handle) => handle.get_state(),
FoundInner::Lookup(handle) => handle.get_state(),
}
}
fn hits(&self) -> u64 {
match self {
FoundInner::ExistingObject(handle) => handle.get_hits(),
FoundInner::Lookup(handle) => handle.get_hits(),
}
.expect("`Found` is missing hits metadata")
}
fn known_length(&self) -> Option<u64> {
match self {
FoundInner::ExistingObject(handle) => handle.get_length(),
FoundInner::Lookup(handle) => handle.get_length(),
}
}
fn stale_while_revalidate(&self) -> Duration {
let stale_while_revalidate_ns = match self {
FoundInner::ExistingObject(handle) => handle.get_stale_while_revalidate_ns(),
FoundInner::Lookup(handle) => handle.get_stale_while_revalidate_ns(),
};
Duration::from_nanos(
stale_while_revalidate_ns.expect("`Found` is missing stale_while_revalidate metadata"),
)
}
fn max_age(&self) -> Duration {
let max_age_ns = match self {
FoundInner::ExistingObject(handle) => handle.get_max_age_ns(),
FoundInner::Lookup(handle) => handle.get_max_age_ns(),
};
Duration::from_nanos(max_age_ns.expect("`Found` is missing max age metadata"))
}
fn remaining_ttl(&self) -> Duration {
self.max_age().saturating_sub(self.age())
}
fn user_metadata(&self) -> Bytes {
let user_metadata = match self {
FoundInner::ExistingObject(handle) => handle.get_user_metadata(),
FoundInner::Lookup(handle) => handle.get_user_metadata(),
};
user_metadata
.expect("`Found` is missing user_metadata")
.clone()
}
}
pub struct Found {
inner: FoundInner,
}
impl Found {
pub fn max_age(&self) -> Duration {
self.inner.max_age()
}
pub fn remaining_ttl(&self) -> Duration {
self.inner.remaining_ttl()
}
pub fn age(&self) -> Duration {
self.inner.age()
}
pub fn stale_while_revalidate(&self) -> Duration {
self.inner.stale_while_revalidate()
}
pub fn known_length(&self) -> Option<u64> {
self.inner.known_length()
}
pub fn user_metadata(&self) -> Bytes {
self.inner.user_metadata()
}
pub fn is_usable(&self) -> bool {
self.inner.get_state().contains(CacheLookupState::USABLE)
}
pub fn is_stale(&self) -> bool {
self.inner.get_state().contains(CacheLookupState::STALE)
}
pub fn hits(&self) -> u64 {
self.inner.hits()
}
#[doc = include_str!("../../docs/snippets/cache-found-multiple-streams.md")]
pub fn to_stream(&self) -> Result<Body, CacheError> {
self.to_stream_from_range(None, None)
}
#[doc = include_str!("../../docs/snippets/cache-found-multiple-streams.md")]
pub fn to_stream_from_range(
&self,
from: Option<u64>,
to: Option<u64>,
) -> Result<Body, CacheError> {
match (from, to) {
(Some(from), Some(to)) if to < from => {
return Err(CacheError::Other(FastlyStatus::INVAL))
}
(_, _) => (),
}
let body_handle = self
.inner
.get_body(&GetBodyOptions { from, to })?
.ok_or(CacheError::InvalidOperation)?;
Ok(body_handle.into())
}
}
#[derive(Default)]
struct WriteOptions {
max_age: Duration,
request_headers: Option<RequestHandle>,
vary_rule: Option<String>,
initial_age: Option<Duration>,
stale_while_revalidate: Option<Duration>,
surrogate_keys: Option<String>,
length: Option<u64>,
user_metadata: Option<Bytes>,
sensitive_data: bool,
edge_max_age: Option<Duration>,
service: Option<String>,
}
impl WriteOptions {
fn as_handle_options(&self) -> HandleWriteOptions<'_> {
let initial_age_ns = self.initial_age.map(|age| {
age.as_nanos()
.try_into()
.expect("initial_age larger than u64::MAX nanoseconds")
});
let max_age_ns = self.max_age.as_nanos().try_into().unwrap_or(u64::MAX);
let stale_while_revalidate_ns = self
.stale_while_revalidate
.map(|swr| swr.as_nanos().try_into().unwrap_or(u64::MAX));
let edge_max_age_ns = self
.edge_max_age
.map(|edge_max_age| edge_max_age.as_nanos().try_into().unwrap_or(u64::MAX));
HandleWriteOptions {
max_age_ns,
request_headers: self.request_headers.as_ref(),
vary_rule: self.vary_rule.as_deref(),
initial_age_ns,
stale_while_revalidate_ns,
surrogate_keys: self.surrogate_keys.as_deref(),
length: self.length,
user_metadata: self.user_metadata.clone(),
sensitive_data: self.sensitive_data,
edge_max_age_ns,
service: self.service.clone(),
}
}
fn vary_by<'a>(&mut self, headers: impl IntoIterator<Item = &'a HeaderName>) {
let mut vary_rule = String::new();
for header in headers {
if !vary_rule.is_empty() {
vary_rule.push(' ')
}
vary_rule.push_str(header.as_str())
}
self.vary_rule = Some(vary_rule);
}
fn surrogate_keys<'a>(&mut self, surrogate_keys: impl IntoIterator<Item = &'a str>) {
let mut keys = String::new();
for key in surrogate_keys {
if !keys.is_empty() {
keys.push(' ')
}
keys.push_str(key);
}
self.surrogate_keys = Some(keys);
}
}
pub struct InsertBuilder {
key: CacheKey,
options: WriteOptions,
}
pub fn insert(key: CacheKey, max_age: Duration) -> InsertBuilder {
InsertBuilder {
key,
options: WriteOptions {
max_age,
..WriteOptions::default()
},
}
}
impl InsertBuilder {
#[doc = include_str!("../../docs/snippets/cache-headers.md")]
pub fn header_values<'a>(
mut self,
name: impl ToHeaderName,
values: impl IntoIterator<Item = &'a HeaderValue>,
) -> Self {
self.options
.request_headers
.get_or_insert_with(RequestHandle::new)
.set_header_values(&name.into_owned(), values);
self
}
#[doc = include_str!("../../docs/snippets/cache-headers.md")]
pub fn header(self, name: impl ToHeaderName, value: impl ToHeaderValue) -> Self {
self.header_values(name.into_owned(), Some(&value.into_owned()))
}
#[doc = include_str!("../../docs/snippets/cache-headers.md")]
pub fn vary_by<'a>(mut self, headers: impl IntoIterator<Item = &'a HeaderName>) -> Self {
self.options.vary_by(headers);
self
}
pub fn initial_age(mut self, age: Duration) -> Self {
self.options.initial_age = Some(age);
self
}
#[doc = include_str!("../../docs/snippets/cache-swr.md")]
pub fn stale_while_revalidate(mut self, duration: Duration) -> Self {
self.options.stale_while_revalidate = Some(duration);
self
}
#[doc = include_str!("../../docs/snippets/cache-insert-surrogate-keys.md")]
pub fn surrogate_keys<'a>(mut self, keys: impl IntoIterator<Item = &'a str>) -> Self {
self.options.surrogate_keys(keys);
self
}
#[doc = include_str!("../../docs/snippets/cache-insert-known-length.md")]
pub fn known_length(mut self, length: u64) -> Self {
self.options.length = Some(length);
self
}
pub fn user_metadata(mut self, user_metadata: Bytes) -> Self {
self.options.user_metadata = Some(user_metadata);
self
}
#[doc = include_str!("../../docs/snippets/cache-insert-sensitive-data.md")]
pub fn sensitive_data(mut self, is_sensitive_data: bool) -> Self {
self.options.sensitive_data = is_sensitive_data;
self
}
pub fn deliver_node_max_age(mut self, duration: Duration) -> Self {
self.options.edge_max_age = Some(duration);
self
}
#[doc = include_str!("../../docs/snippets/cache-insertion.md")]
pub fn execute(self) -> Result<StreamingBody, CacheError> {
let body_handle = handle::insert(self.key, &self.options.as_handle_options())?;
Ok(body_handle.into())
}
#[doc = include_str!("../../docs/snippets/privileged_behalf.md")]
pub fn on_behalf_of(mut self, service: impl ToString) -> Self {
self.options.service = Some(service.to_string());
self
}
}
pub struct Transaction {
handle: Arc<CacheHandle>,
}
impl Transaction {
pub fn lookup(key: CacheKey) -> TransactionLookupBuilder {
TransactionLookupBuilder {
key,
options: LookupOptions::default(),
lazy_await: false,
}
}
pub fn found(&self) -> Option<Found> {
if self.handle.get_state().contains(CacheLookupState::FOUND) {
Some(Found {
inner: FoundInner::Lookup(self.handle.clone()),
})
} else {
None
}
}
pub fn must_insert(&self) -> bool {
!self.handle.get_state().contains(CacheLookupState::FOUND)
&& self
.handle
.get_state()
.contains(CacheLookupState::MUST_INSERT_OR_UPDATE)
}
pub fn must_insert_or_update(&self) -> bool {
self.handle
.get_state()
.contains(CacheLookupState::MUST_INSERT_OR_UPDATE)
}
pub fn cancel_insert_or_update(&self) -> Result<(), CacheError> {
Ok(self.handle.transaction_cancel()?)
}
pub fn insert(self, max_age: Duration) -> TransactionInsertBuilder {
TransactionInsertBuilder {
handle: self.handle.clone(),
options: WriteOptions {
max_age,
..Default::default()
},
}
}
pub fn update(self, max_age: Duration) -> TransactionUpdateBuilder {
TransactionUpdateBuilder {
handle: self.handle.clone(),
options: WriteOptions {
max_age,
..Default::default()
},
}
}
}
pub struct TransactionLookupBuilder {
key: CacheKey,
options: LookupOptions,
lazy_await: bool,
}
impl TransactionLookupBuilder {
#[doc = include_str!("../../docs/snippets/cache-headers.md")]
pub fn header_values<'a>(
mut self,
name: impl ToHeaderName,
values: impl IntoIterator<Item = &'a HeaderValue>,
) -> Self {
self.options
.request_headers
.get_or_insert_with(RequestHandle::new)
.set_header_values(&name.into_owned(), values);
self
}
#[doc = include_str!("../../docs/snippets/cache-headers.md")]
pub fn header(self, name: impl ToHeaderName, value: impl ToHeaderValue) -> Self {
self.header_values(name.into_owned(), Some(&value.into_owned()))
}
#[doc(hidden)]
pub fn lazy_await(mut self) -> Self {
self.lazy_await = true;
self
}
#[doc = include_str!("../../docs/snippets/privileged_behalf.md")]
pub fn on_behalf_of(mut self, service: impl ToString) -> Self {
self.options.service = Some(service.to_string());
self
}
#[doc=include_str!("../../docs/snippets/always-use-requested-range.md")]
pub fn always_use_requested_range(mut self) -> Self {
self.options.always_use_requested_range = true;
self
}
pub fn execute(self) -> Result<Transaction, CacheError> {
let cache_handle = handle::transaction_lookup(self.key, &self.options.as_handle_options())?;
if !self.lazy_await {
cache_handle.wait()?;
}
Ok(Transaction {
handle: Arc::new(cache_handle),
})
}
pub fn execute_async(self) -> Result<PendingTransaction, CacheError> {
let busy_handle =
handle::transaction_lookup_async(self.key, &self.options.as_handle_options())?;
Ok(PendingTransaction {
handle: busy_handle,
})
}
}
pub struct TransactionInsertBuilder {
handle: Arc<CacheHandle>,
options: WriteOptions,
}
impl TransactionInsertBuilder {
#[doc = include_str!("../../docs/snippets/cache-headers.md")]
pub fn vary_by<'a>(mut self, headers: impl IntoIterator<Item = &'a HeaderName>) -> Self {
self.options.vary_by(headers);
self
}
pub fn initial_age(mut self, age: Duration) -> Self {
self.options.initial_age = Some(age);
self
}
#[doc = include_str!("../../docs/snippets/cache-swr.md")]
pub fn stale_while_revalidate(mut self, duration: Duration) -> Self {
self.options.stale_while_revalidate = Some(duration);
self
}
#[doc = include_str!("../../docs/snippets/cache-insert-surrogate-keys.md")]
pub fn surrogate_keys<'a>(mut self, keys: impl IntoIterator<Item = &'a str>) -> Self {
self.options.surrogate_keys(keys);
self
}
#[doc = include_str!("../../docs/snippets/cache-insert-known-length.md")]
pub fn known_length(mut self, length: u64) -> Self {
self.options.length = Some(length);
self
}
pub fn user_metadata(mut self, user_metadata: Bytes) -> Self {
self.options.user_metadata = Some(user_metadata);
self
}
#[doc = include_str!("../../docs/snippets/cache-insert-sensitive-data.md")]
pub fn sensitive_data(mut self, is_sensitive_data: bool) -> Self {
self.options.sensitive_data = is_sensitive_data;
self
}
pub fn deliver_node_max_age(mut self, duration: Duration) -> Self {
self.options.edge_max_age = Some(duration);
self
}
#[doc = include_str!("../../docs/snippets/privileged_behalf.md")]
pub fn on_behalf_of(mut self, service: impl ToString) -> Self {
self.options.service = Some(service.to_string());
self
}
#[doc = include_str!("../../docs/snippets/cache-insertion.md")]
pub fn execute(self) -> Result<StreamingBody, CacheError> {
let body_handle = self
.handle
.transaction_insert(&self.options.as_handle_options())?;
Ok(body_handle.into())
}
#[doc = include_str!("../../docs/snippets/cache-insertion.md")]
pub fn execute_and_stream_back(self) -> Result<(StreamingBody, Found), CacheError> {
let (body_handle, cache_handle) = self
.handle
.transaction_insert_and_stream_back(&self.options.as_handle_options())?;
Ok((
body_handle.into(),
Found {
inner: FoundInner::Lookup(Arc::new(cache_handle)),
},
))
}
}
pub struct TransactionUpdateBuilder {
handle: Arc<CacheHandle>,
options: WriteOptions,
}
impl TransactionUpdateBuilder {
#[doc = include_str!("../../docs/snippets/cache-headers.md")]
pub fn vary_by<'a>(mut self, headers: impl IntoIterator<Item = &'a HeaderName>) -> Self {
self.options.vary_by(headers);
self
}
pub fn age(mut self, age: Duration) -> Self {
self.options.initial_age = Some(age);
self
}
pub fn deliver_node_max_age(mut self, duration: Duration) -> Self {
self.options.edge_max_age = Some(duration);
self
}
#[doc = include_str!("../../docs/snippets/cache-swr.md")]
pub fn stale_while_revalidate(mut self, duration: Duration) -> Self {
self.options.stale_while_revalidate = Some(duration);
self
}
#[doc = include_str!("../../docs/snippets/cache-insert-surrogate-keys.md")]
pub fn surrogate_keys<'a>(mut self, keys: impl IntoIterator<Item = &'a str>) -> Self {
self.options.surrogate_keys(keys);
self
}
pub fn user_metadata(mut self, user_metadata: Bytes) -> Self {
self.options.user_metadata = Some(user_metadata);
self
}
#[doc = include_str!("../../docs/snippets/privileged_behalf.md")]
pub fn on_behalf_of(mut self, service: impl ToString) -> Self {
self.options.service = Some(service.to_string());
self
}
pub fn execute(self) -> Result<(), CacheError> {
self.handle
.transaction_update(&self.options.as_handle_options())
.map_err(CacheError::from)
}
}