use std::collections::HashMap;
use std::collections::HashSet;
use std::time::Duration;
use std::time::Instant;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;
use fred::interfaces::ClientLike;
use fred::interfaces::KeysInterface;
use fred::interfaces::SortedSetsInterface;
use fred::prelude::Options;
use fred::types::Expiration;
use fred::types::ExpireOptions;
use fred::types::Value;
use fred::types::sorted_sets::Ordering;
use serde::Deserialize;
use serde::Serialize;
use tokio::sync::broadcast;
use tokio::sync::mpsc;
use tokio::task::JoinSet;
use tokio_util::time::FutureExt;
use tower::BoxError;
use super::CacheEntry;
use super::CacheStorage;
use super::Document;
use super::StorageResult;
use crate::cache::redis::RedisCacheStorage;
use crate::cache::redis::RedisKey;
use crate::cache::redis::RedisValue;
use crate::cache::storage::KeyType;
use crate::cache::storage::ValueType;
use crate::plugins::response_cache::cache_control::CacheControl;
use crate::plugins::response_cache::metrics::record_maintenance_duration;
use crate::plugins::response_cache::metrics::record_maintenance_error;
use crate::plugins::response_cache::metrics::record_maintenance_queue_error;
use crate::plugins::response_cache::metrics::record_maintenance_success;
use crate::plugins::response_cache::plugin::RESPONSE_CACHE_VERSION;
pub(crate) type Config = super::config::Config;
#[derive(Deserialize, Debug, Clone, Serialize)]
struct CacheValue {
data: serde_json_bytes::Value,
cache_control: CacheControl,
cache_tags: Option<HashSet<String>>,
}
impl ValueType for CacheValue {}
impl From<(&str, CacheValue)> for CacheEntry {
fn from((cache_key, cache_value): (&str, CacheValue)) -> Self {
CacheEntry {
key: cache_key.to_string(),
data: cache_value.data,
control: cache_value.cache_control,
cache_tags: cache_value.cache_tags,
}
}
}
#[derive(Clone)]
pub(crate) struct Storage {
storage: RedisCacheStorage,
cache_tag_tx: mpsc::Sender<String>,
fetch_timeout: Duration,
insert_timeout: Duration,
invalidate_timeout: Duration,
maintenance_timeout: Duration,
}
impl Storage {
pub(crate) async fn new(
config: &Config,
drop_rx: broadcast::Receiver<()>,
) -> Result<Self, BoxError> {
let storage = RedisCacheStorage::new(config.into(), "response-cache").await?;
let (cache_tag_tx, cache_tag_rx) = mpsc::channel(1000);
let s = Self {
storage,
cache_tag_tx,
fetch_timeout: config.fetch_timeout,
insert_timeout: config.insert_timeout,
invalidate_timeout: config.invalidate_timeout,
maintenance_timeout: config.maintenance_timeout,
};
s.perform_periodic_maintenance(cache_tag_rx, drop_rx).await;
Ok(s)
}
pub(crate) fn activate(&self) {
self.storage.activate();
}
fn make_key<K: KeyType>(&self, key: K) -> String {
self.storage.make_key(RedisKey(key))
}
async fn invalidate_keys(&self, invalidation_keys: Vec<String>) -> StorageResult<u64> {
let options = Options {
timeout: Some(self.invalidate_timeout()),
..Options::default()
};
let pipeline = self.storage.pipeline().with_options(&options);
for invalidation_key in &invalidation_keys {
let invalidation_key =
format!("version:{RESPONSE_CACHE_VERSION}:cache-tag:{invalidation_key}");
self.send_to_maintenance_queue(invalidation_key.clone());
let redis_key = self.make_key(invalidation_key.clone());
let _: () = pipeline
.zrange(redis_key, 0, -1, None, false, None, false)
.await?;
}
let results: Vec<Vec<String>> = pipeline.all().await?;
let all_keys: HashSet<String> = results.into_iter().flatten().collect();
if all_keys.is_empty() {
return Ok(0);
}
let keys = all_keys
.into_iter()
.map(|key| self.make_key(key))
.map(fred::types::Key::from);
let deleted = self
.storage
.delete_from_scan_result_with_options(keys, options)
.await?;
Ok(deleted as u64)
}
fn send_to_maintenance_queue(&self, cache_tag_key: String) {
if let Err(err) = self.cache_tag_tx.try_send(cache_tag_key) {
record_maintenance_queue_error(&err);
}
}
pub(crate) async fn perform_periodic_maintenance(
&self,
mut cache_tag_rx: mpsc::Receiver<String>,
mut drop_rx: broadcast::Receiver<()>,
) {
let storage = self.clone();
tokio::spawn(async move {
loop {
tokio::select! {
biased;
_ = drop_rx.recv() => break,
Some(cache_tag) = cache_tag_rx.recv() => storage.perform_maintenance_on_cache_tag(cache_tag).await
}
}
});
}
async fn perform_maintenance_on_cache_tag(&self, cache_tag: String) {
let cutoff = now() - 1;
let now = Instant::now();
let removed_items_result = super::flatten_storage_error(
self.remove_keys_from_cache_tag_by_cutoff(cache_tag, cutoff as f64)
.timeout(self.maintenance_timeout())
.await,
);
record_maintenance_duration(now.elapsed());
match removed_items_result {
Ok(removed_items) => record_maintenance_success(removed_items),
Err(err) => record_maintenance_error(&err),
}
}
async fn remove_keys_from_cache_tag_by_cutoff(
&self,
cache_tag_key: String,
cutoff_time: f64,
) -> StorageResult<u64> {
let options = Options {
timeout: Some(self.maintenance_timeout()),
..Options::default()
};
let cache_tag_key = self.make_key(cache_tag_key);
Ok(self
.storage
.client()
.with_options(&options)
.zremrangebyscore(&cache_tag_key, f64::NEG_INFINITY, cutoff_time)
.await?)
}
fn cache_tag_permutations(
&self,
document_invalidation_keys: &[String],
subgraph_name: &str,
) -> Vec<String> {
let mut cache_tags = Vec::with_capacity(1 + document_invalidation_keys.len());
cache_tags.push(format!("subgraph-{subgraph_name}"));
for invalidation_key in document_invalidation_keys {
cache_tags.push(format!("subgraph-{subgraph_name}:key-{invalidation_key}"));
}
for cache_tag in cache_tags.iter_mut() {
*cache_tag = format!("version:{RESPONSE_CACHE_VERSION}:cache-tag:{cache_tag}");
}
cache_tags
}
fn maintenance_timeout(&self) -> Duration {
self.maintenance_timeout
}
}
impl CacheStorage for Storage {
fn insert_timeout(&self) -> Duration {
self.insert_timeout
}
fn fetch_timeout(&self) -> Duration {
self.fetch_timeout
}
fn invalidate_timeout(&self) -> Duration {
self.invalidate_timeout
}
async fn internal_insert(&self, document: Document, subgraph_name: &str) -> StorageResult<()> {
self.internal_insert_in_batch(vec![document], subgraph_name)
.await
}
async fn internal_insert_in_batch(
&self,
mut batch_docs: Vec<Document>,
subgraph_name: &str,
) -> StorageResult<()> {
let now = now();
let mut original_cache_tags = Vec::with_capacity(batch_docs.len());
for document in &mut batch_docs {
if document.debug {
original_cache_tags.push(document.invalidation_keys.clone());
} else {
original_cache_tags.push(Vec::new());
}
document.invalidation_keys =
self.cache_tag_permutations(&document.invalidation_keys, subgraph_name);
}
let num_cache_tags_estimate = 2 * batch_docs.len();
let mut cache_tags_to_pcks: HashMap<String, Vec<(f64, String)>> =
HashMap::with_capacity(num_cache_tags_estimate);
for document in &mut batch_docs {
for cache_tag_key in document.invalidation_keys.drain(..) {
let cache_tag_value = (
(now + document.expire.as_secs()) as f64,
document.key.clone(),
);
if let Some(entry) = cache_tags_to_pcks.get_mut(&cache_tag_key) {
entry.push(cache_tag_value);
} else {
cache_tags_to_pcks.insert(cache_tag_key, vec![cache_tag_value]);
}
}
}
let options = Options {
timeout: Some(self.insert_timeout()),
..Options::default()
};
let pipeline = self.storage.client().pipeline().with_options(&options);
for (cache_tag_key, elements) in cache_tags_to_pcks.into_iter() {
self.send_to_maintenance_queue(cache_tag_key.clone());
let max_expiry_time = elements
.iter()
.map(|(exp_time, _)| *exp_time)
.fold(now as f64, f64::max);
let cache_tag_expiry_time = max_expiry_time as i64 + 1;
let redis_key = self.make_key(cache_tag_key);
let _: Result<(), _> = pipeline
.zadd(
redis_key.clone(),
None,
Some(Ordering::GreaterThan),
false,
false,
elements,
)
.await;
for exp_opt in [ExpireOptions::NX, ExpireOptions::GT] {
let _: Result<(), _> = pipeline
.expire_at(redis_key.clone(), cache_tag_expiry_time, Some(exp_opt))
.await;
}
}
let result_vec = pipeline.try_all::<Value>().await;
for result in result_vec {
if let Err(err) = result {
tracing::debug!("Caught error during cache tag update: {err:?}");
return Err(err.into());
}
}
let pipeline = self.storage.client().pipeline().with_options(&options);
for (document, cache_tags) in batch_docs.into_iter().zip(original_cache_tags) {
let value = CacheValue {
data: document.data,
cache_control: document.control,
cache_tags: document.debug.then(|| cache_tags.into_iter().collect()),
};
let _: () = pipeline
.set::<(), _, _>(
self.make_key(document.key),
&serde_json::to_string(&value)?,
Some(Expiration::EXAT((now + document.expire.as_secs()) as i64)),
None,
false,
)
.await?;
}
let result_vec = pipeline.try_all::<Value>().await;
for result in result_vec {
if let Err(err) = result {
tracing::debug!("Caught error during document insert: {err:?}");
return Err(err.into());
}
}
Ok(())
}
async fn internal_fetch(&self, cache_key: &str) -> StorageResult<CacheEntry> {
let options = Options {
timeout: Some(self.fetch_timeout()),
..Options::default()
};
let value: RedisValue<CacheValue> = self
.storage
.get_with_options(RedisKey(cache_key), options)
.await?;
Ok(CacheEntry::from((cache_key, value.0)))
}
async fn internal_fetch_multiple(
&self,
cache_keys: &[&str],
) -> StorageResult<Vec<StorageResult<CacheEntry>>> {
let keys: Vec<RedisKey<String>> = cache_keys
.iter()
.map(|key| RedisKey(key.to_string()))
.collect();
let options = Options {
timeout: Some(self.fetch_timeout()),
..Options::default()
};
let values: Vec<Result<RedisValue<CacheValue>, _>> = self
.storage
.get_multiple_with_options(keys, options)
.await?;
let entries = values
.into_iter()
.zip(cache_keys)
.map(|(opt_value, cache_key)| {
opt_value
.map(|value| CacheEntry::from((*cache_key, value.0)))
.map_err(Into::into)
})
.collect();
Ok(entries)
}
async fn internal_invalidate_by_subgraph(&self, subgraph_name: &str) -> StorageResult<u64> {
self.invalidate_keys(vec![format!("subgraph-{subgraph_name}")])
.await
}
async fn internal_invalidate(
&self,
invalidation_keys: Vec<String>,
subgraph_names: Vec<String>,
) -> StorageResult<HashMap<String, u64>> {
let mut join_set = JoinSet::default();
let num_subgraphs = subgraph_names.len();
for subgraph_name in subgraph_names {
let keys: Vec<String> = invalidation_keys
.iter()
.map(|invalidation_key| format!("subgraph-{subgraph_name}:key-{invalidation_key}"))
.collect();
let storage = self.clone();
join_set.spawn(async move { (subgraph_name, storage.invalidate_keys(keys).await) });
}
let mut counts = HashMap::with_capacity(num_subgraphs);
while let Some(result) = join_set.join_next().await {
let (subgraph_name, count) = result?;
counts.insert(subgraph_name, count?);
}
Ok(counts)
}
#[cfg(all(
test,
any(not(feature = "ci"), all(target_arch = "x86_64", target_os = "linux"))
))]
async fn truncate_namespace(&self) -> StorageResult<()> {
self.storage.truncate_namespace().await?;
Ok(())
}
}
fn now() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs()
}
#[cfg(all(
test,
any(not(feature = "ci"), all(target_arch = "x86_64", target_os = "linux"))
))]
impl Storage {
async fn mocked(
config: &Config,
is_cluster: bool,
mock_storage: std::sync::Arc<dyn fred::mocks::Mocks>,
drop_rx: broadcast::Receiver<()>,
) -> Result<Storage, BoxError> {
let storage = RedisCacheStorage::from_mocks_and_config(
mock_storage,
config.into(),
"response-cache",
is_cluster,
)
.await?;
let (cache_tag_tx, cache_tag_rx) = mpsc::channel(100);
let s = Self {
storage,
cache_tag_tx,
fetch_timeout: config.fetch_timeout,
insert_timeout: config.insert_timeout,
invalidate_timeout: config.invalidate_timeout,
maintenance_timeout: config.maintenance_timeout,
};
s.perform_periodic_maintenance(cache_tag_rx, drop_rx).await;
Ok(s)
}
async fn all_keys_in_namespace(&self) -> Result<Vec<String>, BoxError> {
use fred::types::scan::Scanner;
use tokio_stream::StreamExt;
let mut scan_stream = self
.storage
.scan_with_namespaced_results(String::from("*"), None);
let mut keys = Vec::default();
while let Some(result) = scan_stream.next().await {
if let Some(page_keys) = result?.take_results() {
let mut str_keys: Vec<String> = page_keys
.into_iter()
.map(|k| k.into_string().unwrap())
.map(|k| self.storage.strip_namespace(k))
.collect();
keys.append(&mut str_keys);
}
}
Ok(keys)
}
async fn ttl(&self, key: &str) -> StorageResult<i64> {
let key = self.make_key(key);
Ok(self.storage.client().ttl(key).await?)
}
async fn expire_time(&self, key: &str) -> StorageResult<i64> {
let key = self.make_key(key);
Ok(self.storage.client().expire_time(key).await?)
}
async fn zscore(&self, sorted_set_key: &str, member: &str) -> Result<i64, BoxError> {
let sorted_set_key = self.make_key(sorted_set_key);
let score: String = self.storage.client().zscore(sorted_set_key, member).await?;
Ok(score.parse()?)
}
async fn zcard(&self, sorted_set_key: &str) -> StorageResult<u64> {
let sorted_set_key = self.make_key(sorted_set_key);
let cardinality = self.storage.client().zcard(sorted_set_key).await?;
Ok(cardinality)
}
async fn zexists(&self, sorted_set_key: &str, member: &str) -> StorageResult<bool> {
let sorted_set_key = self.make_key(sorted_set_key);
let score: Option<String> = self.storage.client().zscore(sorted_set_key, member).await?;
Ok(score.is_some())
}
async fn exists(&self, key: &str) -> StorageResult<bool> {
let key = self.make_key(key);
Ok(self.storage.client().exists(key).await?)
}
}
#[cfg(all(
test,
any(not(feature = "ci"), all(target_arch = "x86_64", target_os = "linux"))
))]
mod tests {
use std::sync::Arc;
use std::time::Duration;
use insta::assert_debug_snapshot;
use itertools::Itertools;
use tokio::sync::broadcast;
use tokio::time::Instant;
use tower::BoxError;
use uuid::Uuid;
use super::Config;
use super::Storage;
use super::now;
use crate::metrics::FutureMetricsExt;
use crate::plugins::response_cache::ErrorCode;
use crate::plugins::response_cache::storage::CacheStorage;
use crate::plugins::response_cache::storage::Document;
use crate::plugins::response_cache::storage::Error;
const SUBGRAPH_NAME: &str = "test";
fn redis_config(clustered: bool) -> Config {
Config::test(clustered, &random_namespace())
}
fn random_namespace() -> String {
Uuid::new_v4().to_string()
}
fn common_document() -> Document {
Document {
key: "key".to_string(),
data: Default::default(),
control: Default::default(),
invalidation_keys: vec!["invalidate".to_string()],
expire: Duration::from_secs(60),
debug: true,
}
}
#[tokio::test]
#[rstest::rstest]
async fn test_invalidation_key_permutations(
#[values(None, Some("test"))] namespace: Option<&str>,
#[values(vec![], vec!["invalidation"], vec!["invalidation1", "invalidation2", "invalidation3"])]
invalidation_keys: Vec<&str>,
) {
let mut settings = insta::Settings::clone_current();
settings.set_snapshot_suffix(format!(
"input____{}____{}",
namespace.unwrap_or("None"),
invalidation_keys.iter().join("__")
));
let _guard = settings.bind_to_scope();
let mock_storage = Arc::new(fred::mocks::Echo);
let config = Config {
namespace: namespace.map(ToString::to_string),
..redis_config(false)
};
let (_drop_tx, drop_rx) = broadcast::channel(2);
let storage = Storage::mocked(&config, false, mock_storage, drop_rx)
.await
.expect("could not build storage");
let invalidation_keys: Vec<String> = invalidation_keys
.into_iter()
.map(ToString::to_string)
.collect();
let mut cache_tags = storage.cache_tag_permutations(&invalidation_keys, "products");
cache_tags.sort();
assert_debug_snapshot!(cache_tags);
}
mod ttl_guarantees {
use std::collections::HashMap;
use std::time::Duration;
use itertools::Itertools;
use tokio::sync::broadcast;
use tower::BoxError;
use super::SUBGRAPH_NAME;
use super::common_document;
use super::redis_config;
use crate::plugins::response_cache::storage::CacheStorage;
use crate::plugins::response_cache::storage::Document;
use crate::plugins::response_cache::storage::redis::Storage;
#[tokio::test]
#[rstest::rstest]
async fn single_document(#[values(true, false)] clustered: bool) -> Result<(), BoxError> {
let (_drop_tx, drop_rx) = broadcast::channel(2);
let storage = Storage::new(&redis_config(clustered), drop_rx).await?;
storage.truncate_namespace().await?;
let document = common_document();
storage.insert(document.clone(), SUBGRAPH_NAME).await?;
let document_key = document.key.clone();
let expected_cache_tag_keys =
storage.cache_tag_permutations(&document.invalidation_keys, SUBGRAPH_NAME);
let keys = storage.all_keys_in_namespace().await?;
assert!(keys.contains(&document_key));
for key in &expected_cache_tag_keys {
assert!(keys.contains(key), "missing {key}");
}
assert_eq!(keys.len(), 3);
let document_ttl = storage.ttl(&document_key).await?;
assert!(document_ttl > 0);
for cache_tag_key in &expected_cache_tag_keys {
let cache_tag_ttl = storage.ttl(cache_tag_key).await?;
assert!(cache_tag_ttl > 0, "{cache_tag_key}");
assert!(document_ttl < cache_tag_ttl, "{cache_tag_key}")
}
let document_expire_time = storage.expire_time(&document_key).await?;
assert!(document_expire_time > 0);
for cache_tag_key in &expected_cache_tag_keys {
let document_score = storage.zscore(cache_tag_key, &document_key).await?;
assert_eq!(document_expire_time, document_score);
}
Ok(())
}
#[tokio::test]
#[rstest::rstest]
async fn multiple_documents(
#[values(true, false)] clustered: bool,
) -> Result<(), BoxError> {
let (_drop_tx, drop_rx) = broadcast::channel(2);
let storage = Storage::new(&redis_config(clustered), drop_rx).await?;
storage.truncate_namespace().await?;
let documents = vec![
Document {
key: "key1".to_string(),
invalidation_keys: vec![
"invalidation".to_string(),
"invalidation1".to_string(),
],
expire: Duration::from_secs(30),
..common_document()
},
Document {
key: "key2".to_string(),
invalidation_keys: vec![
"invalidation".to_string(),
"invalidation2".to_string(),
],
expire: Duration::from_secs(60),
..common_document()
},
];
storage
.insert_in_batch(documents.clone(), SUBGRAPH_NAME)
.await?;
let mut expected_document_keys = Vec::new();
let mut expected_cache_tag_keys = Vec::new();
for document in &documents {
expected_document_keys.push(document.key.clone());
expected_cache_tag_keys.push(
storage.cache_tag_permutations(&document.invalidation_keys, SUBGRAPH_NAME),
);
}
let all_expected_cache_tag_keys: Vec<String> = expected_cache_tag_keys
.iter()
.flatten()
.cloned()
.unique()
.collect();
assert!(
all_expected_cache_tag_keys.len()
< expected_cache_tag_keys.iter().map(|keys| keys.len()).sum()
);
let keys = storage.all_keys_in_namespace().await?;
for expected_document_key in &expected_document_keys {
assert!(keys.contains(expected_document_key));
}
for expected_cache_tag_key in &all_expected_cache_tag_keys {
assert!(keys.contains(expected_cache_tag_key));
}
assert_eq!(keys.len(), 6);
let mut ttls: HashMap<String, i64> = HashMap::default();
for key in &keys {
let ttl = storage.ttl(key).await?;
assert!(ttl > 0);
ttls.insert(key.clone(), ttl);
}
for (index, document) in documents.iter().enumerate() {
let document_key = &expected_document_keys[index];
let cache_tag_keys = &expected_cache_tag_keys[index];
let document_ttl = ttls.get(document_key).unwrap();
assert!(document.expire.as_secs() as i64 - *document_ttl < 10);
for cache_tag_key in cache_tag_keys {
let cache_tag_ttl = ttls.get(cache_tag_key).unwrap();
assert!(document_ttl < cache_tag_ttl);
}
}
for index in 0..documents.len() {
let document_key = &expected_document_keys[index];
let cache_tag_keys = &expected_cache_tag_keys[index];
let document_expire_time = storage.expire_time(document_key).await?;
assert!(document_expire_time > 0);
for cache_tag_key in cache_tag_keys {
let document_score = storage.zscore(cache_tag_key, document_key).await?;
assert_eq!(document_expire_time, document_score);
}
}
Ok(())
}
#[tokio::test]
#[rstest::rstest]
async fn cache_tag_ttl_will_only_increase(
#[values(true, false)] clustered: bool,
) -> Result<(), BoxError> {
let (_drop_tx, drop_rx) = broadcast::channel(2);
let storage = Storage::new(&redis_config(clustered), drop_rx).await?;
storage.truncate_namespace().await?;
let document = Document {
key: "key1".to_string(),
expire: Duration::from_secs(60),
..common_document()
};
storage.insert(document.clone(), SUBGRAPH_NAME).await?;
let keys = storage.all_keys_in_namespace().await?;
let mut expire_times: HashMap<String, i64> = HashMap::default();
for key in &keys {
let expire_time = storage.expire_time(key).await?;
assert!(expire_time > 0);
expire_times.insert(key.clone(), expire_time);
}
let document = Document {
key: "key2".to_string(),
expire: Duration::from_secs(1),
..common_document()
};
storage.insert(document, SUBGRAPH_NAME).await?;
for key in keys {
let new_expire_time = storage.expire_time(&key).await?;
assert!(new_expire_time > 0);
assert_eq!(*expire_times.get(&key).unwrap(), new_expire_time);
}
Ok(())
}
#[tokio::test]
#[rstest::rstest]
async fn cache_tag_score_will_not_decrease(
#[values(true, false)] clustered: bool,
) -> Result<(), BoxError> {
let (_drop_tx, drop_rx) = broadcast::channel(2);
let storage = Storage::new(&redis_config(clustered), drop_rx).await?;
storage.truncate_namespace().await?;
let document = Document {
expire: Duration::from_secs(60),
data: serde_json_bytes::Value::Number(1.into()),
..common_document()
};
let document_key = document.key.clone();
storage.insert(document.clone(), SUBGRAPH_NAME).await?;
let stored_data = storage.fetch(&document_key, SUBGRAPH_NAME).await?;
assert_eq!(stored_data.data, document.data);
let keys = storage.cache_tag_permutations(&document.invalidation_keys, SUBGRAPH_NAME);
let mut scores: HashMap<String, i64> = HashMap::default();
let mut expire_times: HashMap<String, i64> = HashMap::default();
for key in &keys {
let score = storage.zscore(key, &document_key).await?;
assert!(score > 0);
scores.insert(key.clone(), score);
let expire_time = storage.expire_time(key).await?;
assert!(expire_time > 0);
expire_times.insert(key.clone(), expire_time);
}
let document = Document {
expire: Duration::from_secs(10),
data: serde_json_bytes::Value::Number(2.into()),
..common_document()
};
storage.insert(document.clone(), SUBGRAPH_NAME).await?;
let stored_data = storage.fetch(&document.key, SUBGRAPH_NAME).await?;
assert_eq!(stored_data.data, document.data);
let ttl = storage.ttl(&document_key).await?;
assert!(ttl <= document.expire.as_secs() as i64);
for key in keys {
let score = storage.zscore(&key, &document_key).await?;
assert!(score > 0);
assert_eq!(*scores.get(&key).unwrap(), score);
let expire_time = storage.expire_time(&key).await?;
assert!(expire_time > 0);
assert_eq!(*expire_times.get(&key).unwrap(), expire_time);
}
Ok(())
}
#[tokio::test]
#[rstest::rstest]
async fn cache_tag_score_will_increase(
#[values(true, false)] clustered: bool,
) -> Result<(), BoxError> {
let (_drop_tx, drop_rx) = broadcast::channel(2);
let storage = Storage::new(&redis_config(clustered), drop_rx).await?;
storage.truncate_namespace().await?;
let document = Document {
expire: Duration::from_secs(60),
data: serde_json_bytes::Value::Number(1.into()),
..common_document()
};
let document_key = document.key.clone();
storage.insert(document.clone(), SUBGRAPH_NAME).await?;
let stored_data = storage.fetch(&common_document().key, SUBGRAPH_NAME).await?;
assert_eq!(stored_data.data, document.data);
let keys = storage.cache_tag_permutations(&document.invalidation_keys, SUBGRAPH_NAME);
let old_ttl = document.expire;
let document = Document {
expire: old_ttl * 2,
data: serde_json_bytes::Value::Number(2.into()),
..common_document()
};
storage.insert(document.clone(), SUBGRAPH_NAME).await?;
let stored_data = storage.fetch(&document.key, SUBGRAPH_NAME).await?;
assert_eq!(stored_data.data, document.data);
let ttl = storage.ttl(&document_key).await?;
assert!(ttl <= document.expire.as_secs() as i64);
assert!(ttl > old_ttl.as_secs() as i64);
let doc_expire_time = storage.expire_time(&document_key).await?;
for key in keys {
let score = storage.zscore(&key, &document_key).await?;
assert!(doc_expire_time <= score);
let expire_time = storage.expire_time(&key).await?;
assert!(doc_expire_time < expire_time);
}
Ok(())
}
}
mod cache_tag_insert_failure_should_abort_key_insertion {
use std::sync::Arc;
use fred::error::Error;
use fred::error::ErrorKind;
use fred::interfaces::KeysInterface;
use fred::mocks::MockCommand;
use fred::mocks::Mocks;
use fred::prelude::Expiration;
use fred::prelude::Value;
use parking_lot::RwLock;
use tokio::sync::broadcast;
use tower::BoxError;
use super::SUBGRAPH_NAME;
use super::common_document;
use super::redis_config;
use crate::plugins::response_cache::ErrorCode;
use crate::plugins::response_cache::storage::CacheStorage;
use crate::plugins::response_cache::storage::Document;
use crate::plugins::response_cache::storage::redis::Storage;
#[tokio::test]
#[rstest::rstest]
async fn type_failure(#[values(true, false)] clustered: bool) -> Result<(), BoxError> {
let (_drop_tx, drop_rx) = broadcast::channel(2);
let config = redis_config(clustered);
let storage = Storage::new(&config, drop_rx).await?;
storage.truncate_namespace().await?;
let document = common_document();
let document_key = document.key.clone();
let cache_tag_keys =
storage.cache_tag_permutations(&document.invalidation_keys, SUBGRAPH_NAME);
let insert_invalid_cache_tag = |key: String| async {
let namespaced_key = storage.make_key(key);
let _: () = storage
.storage
.client()
.set(namespaced_key, 1, Some(Expiration::EX(60)), None, false)
.await?;
Ok::<(), BoxError>(())
};
for key in cache_tag_keys {
storage.truncate_namespace().await?;
insert_invalid_cache_tag(key.clone()).await?;
let result = storage.insert(document.clone(), SUBGRAPH_NAME).await;
result.expect_err(&format!(
"cache tag {key} should have caused insertion failure"
));
assert!(!storage.exists(&document_key).await?);
}
let documents = vec![
Document {
key: "key1".to_string(),
invalidation_keys: vec![],
..common_document()
},
Document {
key: "key2".to_string(),
invalidation_keys: vec!["invalidate".to_string()],
..common_document()
},
];
let cache_tag_keys =
storage.cache_tag_permutations(&documents[1].invalidation_keys, SUBGRAPH_NAME);
for key in cache_tag_keys {
storage.truncate_namespace().await?;
insert_invalid_cache_tag(key.clone()).await?;
storage
.insert_in_batch(documents.clone(), SUBGRAPH_NAME)
.await
.expect_err(&format!(
"cache tag {key} should have caused insertion failure"
));
for document in &documents {
assert!(!storage.exists(&document.key).await?);
}
}
Ok(())
}
#[tokio::test]
#[rstest::rstest]
async fn timeout_failure(#[values(true, false)] clustered: bool) -> Result<(), BoxError> {
use crate::plugins::response_cache::storage::error::Error as StorageError;
#[derive(Default, Debug, Clone)]
struct MockStorage(Arc<RwLock<Vec<MockCommand>>>);
impl Mocks for MockStorage {
fn process_command(&self, command: MockCommand) -> Result<Value, Error> {
self.0.write().push(command);
Err(Error::new(ErrorKind::Timeout, "timeout"))
}
}
let (_drop_tx, drop_rx) = broadcast::channel(2);
let mock_storage = Arc::new(MockStorage::default());
let storage = Storage::mocked(
&redis_config(clustered),
clustered,
mock_storage.clone(),
drop_rx,
)
.await?;
let document = common_document();
let document_key = Value::from(storage.make_key(document.key.clone()));
let result = storage.insert(document, SUBGRAPH_NAME).await;
let error = result.expect_err("should have timed out via redis");
assert!(matches!(error, StorageError::Database(ref e) if e.details() == "timeout"));
assert_eq!(error.code(), "TIMEOUT");
for command in mock_storage.0.read().iter() {
if command.cmd.contains("SET") && command.args.contains(&document_key) {
panic!("Command {command:?} set the document key");
}
}
Ok(())
}
}
#[tokio::test]
#[rstest::rstest]
async fn maintenance_removes_expired_data(
#[values(true, false)] clustered: bool,
) -> Result<(), BoxError> {
let (_drop_tx, drop_rx) = broadcast::channel(2);
let storage = Storage::new(&redis_config(clustered), drop_rx).await?;
storage.truncate_namespace().await?;
let documents = vec![
Document {
key: "key1".to_string(),
expire: Duration::from_secs(2),
..common_document()
},
Document {
key: "key2".to_string(),
expire: Duration::from_secs(60),
..common_document()
},
Document {
key: "key3".to_string(),
expire: Duration::from_secs(60),
..common_document()
},
];
storage
.insert_in_batch(documents.clone(), SUBGRAPH_NAME)
.await?;
let invalidation_key = storage.cache_tag_permutations(&[], SUBGRAPH_NAME).remove(0);
assert_eq!(storage.zcard(&invalidation_key).await?, 3);
let doc_key1 = "key1";
let doc_key2 = "key2";
let doc_key3 = "key3";
for key in [&doc_key1, &doc_key2, &doc_key3] {
assert!(storage.zexists(&invalidation_key, key).await?);
}
let cutoff = now() + 10;
assert!(storage.zscore(&invalidation_key, doc_key1).await? < cutoff as i64);
let removed_keys = storage
.remove_keys_from_cache_tag_by_cutoff(invalidation_key.clone(), cutoff as f64)
.await?;
assert_eq!(removed_keys, 1);
assert_eq!(storage.zcard(&invalidation_key).await?, 2);
assert!(!storage.zexists(&invalidation_key, doc_key1).await?);
assert!(storage.zexists(&invalidation_key, doc_key2).await?);
assert!(storage.zexists(&invalidation_key, doc_key3).await?);
let cutoff = now() + 1000;
let removed_keys = storage
.remove_keys_from_cache_tag_by_cutoff(invalidation_key.clone(), cutoff as f64)
.await?;
assert_eq!(removed_keys, 2);
assert_eq!(storage.zcard(&invalidation_key).await?, 0);
for key in [&doc_key1, &doc_key2, &doc_key3] {
assert!(!storage.zexists(&invalidation_key, key).await?);
}
Ok(())
}
mod invalidation {
use tokio::sync::broadcast;
use tower::BoxError;
use super::common_document;
use super::redis_config;
use crate::plugins::response_cache::storage::CacheStorage;
use crate::plugins::response_cache::storage::Document;
use crate::plugins::response_cache::storage::redis::Storage;
#[tokio::test]
#[rstest::rstest]
async fn invalidation_by_subgraph_removes_everything_associated_with_that_subgraph(
#[values(true, false)] clustered: bool,
) -> Result<(), BoxError> {
let (_drop_tx, drop_rx) = broadcast::channel(2);
let storage = Storage::new(&redis_config(clustered), drop_rx).await?;
storage.truncate_namespace().await?;
let document1 = Document {
key: "key1".to_string(),
..common_document()
};
let document2 = Document {
key: "key2".to_string(),
..common_document()
};
let document3 = Document {
key: "key3".to_string(),
..common_document()
};
storage.insert(document1.clone(), "S1").await?;
storage.insert(document2.clone(), "S2").await?;
storage.insert(document3.clone(), "S2").await?;
let num_invalidated = storage.invalidate_by_subgraph("S1", "subgraph").await?;
assert_eq!(num_invalidated, 1);
assert!(!storage.exists("key1").await?);
assert!(storage.exists("key2").await?);
let num_invalidated = storage.invalidate_by_subgraph("S2", "subgraph").await?;
assert_eq!(num_invalidated, 2);
assert!(!storage.exists("key2").await?);
assert!(!storage.exists("key3").await?);
Ok(())
}
#[tokio::test]
#[rstest::rstest]
async fn arguments_are_restrictive_rather_than_additive(
#[values(true, false)] clustered: bool,
) -> Result<(), BoxError> {
let (_drop_tx, drop_rx) = broadcast::channel(2);
let storage = Storage::new(&redis_config(clustered), drop_rx).await?;
storage.truncate_namespace().await?;
let document1 = Document {
key: "key1".to_string(),
invalidation_keys: vec!["A".to_string()],
..common_document()
};
let document2 = Document {
key: "key2".to_string(),
invalidation_keys: vec!["A".to_string()],
..common_document()
};
let document3 = Document {
key: "key3".to_string(),
invalidation_keys: vec!["B".to_string()],
..common_document()
};
storage.insert(document1.clone(), "S1").await?;
storage.insert(document2.clone(), "S2").await?;
storage.insert(document3.clone(), "S2").await?;
let invalidated = storage
.invalidate(vec!["A".to_string()], vec!["S2".to_string()], "cache_tag")
.await?;
assert_eq!(invalidated.len(), 1);
assert_eq!(*invalidated.get("S2").unwrap(), 1);
assert!(storage.exists("key1").await?);
assert!(!storage.exists("key2").await?);
assert!(storage.exists("key3").await?);
Ok(())
}
#[tokio::test]
#[rstest::rstest]
async fn invalidating_missing_subgraph_will_not_error(
#[values(true, false)] clustered: bool,
) -> Result<(), BoxError> {
let (_drop_tx, drop_rx) = broadcast::channel(2);
let storage = Storage::new(&redis_config(clustered), drop_rx).await?;
storage.truncate_namespace().await?;
storage.insert(common_document(), "S1").await?;
let invalidated = storage.invalidate_by_subgraph("S2", "subgraph").await?;
assert_eq!(invalidated, 0);
let invalidated = storage
.invalidate(vec!["key".to_string()], vec!["S2".to_string()], "cache_tag")
.await?;
assert_eq!(invalidated.len(), 1);
assert_eq!(*invalidated.get("S2").unwrap(), 0);
Ok(())
}
#[tokio::test]
#[rstest::rstest]
async fn invalidating_missing_invalidation_key_will_not_error(
#[values(true, false)] clustered: bool,
) -> Result<(), BoxError> {
let (_drop_tx, drop_rx) = broadcast::channel(2);
let storage = Storage::new(&redis_config(clustered), drop_rx).await?;
storage.truncate_namespace().await?;
storage.insert(common_document(), "S1").await?;
let invalidated = storage
.invalidate(vec!["key".to_string()], vec!["S1".to_string()], "cache_tag")
.await?;
assert_eq!(invalidated.len(), 1);
assert_eq!(*invalidated.get("S1").unwrap(), 0);
Ok(())
}
#[tokio::test]
#[rstest::rstest]
async fn invalidation_is_idempotent(
#[values(true, false)] clustered: bool,
) -> Result<(), BoxError> {
let (_drop_tx, drop_rx) = broadcast::channel(2);
let storage = Storage::new(&redis_config(clustered), drop_rx).await?;
storage.truncate_namespace().await?;
let document = common_document();
let document_key = document.key.clone();
storage.insert(document, "S1").await?;
assert!(storage.exists(&document_key).await?);
let invalidated = storage.invalidate_by_subgraph("S1", "subgraph").await?;
assert_eq!(invalidated, 1);
assert!(!storage.exists(&document_key).await?);
let invalidated = storage.invalidate_by_subgraph("S1", "subgraph").await?;
assert_eq!(invalidated, 0);
assert!(!storage.exists(&document_key).await?);
Ok(())
}
}
#[tokio::test]
async fn timeout_errors_are_captured() -> Result<(), BoxError> {
async move {
let config = Config {
fetch_timeout: Duration::from_nanos(0),
..redis_config(false)
};
let (_drop_tx, drop_rx) = broadcast::channel(2);
let storage = Storage::new(&config, drop_rx).await?;
storage.truncate_namespace().await?;
let document = common_document();
let now = Instant::now();
while now.elapsed() < Duration::from_secs(5) {
let error = match storage.fetch_multiple(&[&document.key], "S1").await {
Ok(v) => {
if v.iter().all(|e| e.is_none()) {
continue;
}
panic!("Value was unexpected");
}
Err(err) => err,
};
assert!(matches!(error, Error::Timeout(_)), "{:?}", error);
assert_eq!(error.code(), "TIMEOUT");
assert_counter!(
"apollo.router.operations.response_cache.fetch.error",
1,
"code" = "TIMEOUT",
"subgraph.name" = "S1"
);
return Ok(());
}
panic!("Never observed a timeout");
}
.with_metrics()
.await
}
}