mod config;
mod error;
pub(super) mod redis;
use std::collections::HashMap;
use std::collections::HashSet;
use std::time::Duration;
use std::time::Instant;
pub(super) use error::Error;
use tokio_util::time::FutureExt;
use super::cache_control::CacheControl;
use crate::plugins::response_cache::invalidation::InvalidationKind;
use crate::plugins::response_cache::metrics::record_fetch_duration;
use crate::plugins::response_cache::metrics::record_fetch_error;
use crate::plugins::response_cache::metrics::record_insert_duration;
use crate::plugins::response_cache::metrics::record_insert_error;
use crate::plugins::response_cache::metrics::record_invalidation_duration;
type StorageResult<T> = Result<T, Error>;
#[derive(Debug, Clone)]
pub(super) struct Document {
pub(super) key: String,
pub(super) data: serde_json_bytes::Value,
pub(super) control: CacheControl,
pub(super) invalidation_keys: Vec<String>,
pub(super) expire: Duration,
pub(super) debug: bool,
}
#[derive(Debug, Clone)]
pub(super) struct CacheEntry {
pub(super) key: String,
pub(super) data: serde_json_bytes::Value,
pub(super) control: CacheControl,
pub(super) cache_tags: Option<HashSet<String>>,
}
pub(super) trait CacheStorage {
fn insert_timeout(&self) -> Duration;
fn fetch_timeout(&self) -> Duration;
fn invalidate_timeout(&self) -> Duration;
#[doc(hidden)]
async fn internal_insert(&self, document: Document, subgraph_name: &str) -> StorageResult<()>;
async fn insert(&self, document: Document, subgraph_name: &str) -> StorageResult<()> {
let now = Instant::now();
let result = flatten_storage_error(
self.internal_insert(document, subgraph_name)
.timeout(self.insert_timeout())
.await,
);
record_insert_duration(now.elapsed(), subgraph_name, 1);
result.inspect_err(|err| record_insert_error(err, subgraph_name))
}
#[doc(hidden)]
async fn internal_insert_in_batch(
&self,
documents: Vec<Document>,
subgraph_name: &str,
) -> StorageResult<()>;
async fn insert_in_batch(
&self,
documents: Vec<Document>,
subgraph_name: &str,
) -> StorageResult<()> {
let batch_size = documents.len();
let now = Instant::now();
let result = flatten_storage_error(
self.internal_insert_in_batch(documents, subgraph_name)
.timeout(self.insert_timeout())
.await,
);
record_insert_duration(now.elapsed(), subgraph_name, batch_size);
result.inspect_err(|err| record_insert_error(err, subgraph_name))
}
#[doc(hidden)]
async fn internal_fetch(&self, cache_key: &str) -> StorageResult<CacheEntry>;
async fn fetch(&self, cache_key: &str, subgraph_name: &str) -> StorageResult<CacheEntry> {
let now = Instant::now();
let result = flatten_storage_error(
self.internal_fetch(cache_key)
.timeout(self.fetch_timeout())
.await,
);
record_fetch_duration(now.elapsed(), subgraph_name, 1);
result.inspect_err(|err| record_fetch_error(err, subgraph_name))
}
#[doc(hidden)]
async fn internal_fetch_multiple(
&self,
cache_keys: &[&str],
) -> StorageResult<Vec<StorageResult<CacheEntry>>>;
async fn fetch_multiple(
&self,
cache_keys: &[&str],
subgraph_name: &str,
) -> StorageResult<Vec<Option<CacheEntry>>> {
let batch_size = cache_keys.len();
let now = Instant::now();
let result = flatten_storage_error(
self.internal_fetch_multiple(cache_keys)
.timeout(self.fetch_timeout())
.await,
);
record_fetch_duration(now.elapsed(), subgraph_name, batch_size);
let values = result
.inspect_err(|err| record_fetch_error(err, subgraph_name))?
.into_iter()
.map(|value| {
value
.inspect_err(|err| record_fetch_error(err, subgraph_name))
.ok()
})
.collect();
Ok(values)
}
#[doc(hidden)]
async fn internal_invalidate_by_subgraph(&self, subgraph_name: &str) -> StorageResult<u64>;
async fn invalidate_by_subgraph(
&self,
subgraph_name: &str,
invalidation_kind: InvalidationKind,
) -> StorageResult<u64> {
let now = Instant::now();
let result = flatten_storage_error(
self.internal_invalidate_by_subgraph(subgraph_name)
.timeout(self.invalidate_timeout())
.await,
);
record_invalidation_duration(now.elapsed(), invalidation_kind);
result
}
#[doc(hidden)]
async fn internal_invalidate(
&self,
invalidation_keys: Vec<String>,
subgraph_names: Vec<String>,
) -> StorageResult<HashMap<String, u64>>;
async fn invalidate(
&self,
invalidation_keys: Vec<String>,
subgraph_names: Vec<String>,
invalidation_kind: InvalidationKind,
) -> StorageResult<HashMap<String, u64>> {
let now = Instant::now();
let result = flatten_storage_error(
self.internal_invalidate(invalidation_keys, subgraph_names)
.timeout(self.invalidate_timeout())
.await,
);
record_invalidation_duration(now.elapsed(), invalidation_kind);
result
}
#[cfg(all(
test,
any(not(feature = "ci"), all(target_arch = "x86_64", target_os = "linux"))
))]
async fn truncate_namespace(&self) -> StorageResult<()>;
}
fn flatten_storage_error<V, E>(value: Result<Result<V, Error>, E>) -> Result<V, Error>
where
E: Into<Error>,
{
value.map_err(Into::into).flatten()
}