use hydracache::{CacheInvalidation, HydraCache};
use hydracache_core::CacheCodec;
use crate::{CacheEntity, InvalidationIntent, InvalidationIntentBatch};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct InvalidationCollector {
namespace: String,
reason: String,
intents: Vec<InvalidationIntent>,
}
impl InvalidationCollector {
pub fn new(namespace: impl Into<String>, reason: impl Into<String>) -> Self {
Self {
namespace: namespace.into(),
reason: reason.into(),
intents: Vec::new(),
}
}
pub fn namespace(&self) -> &str {
&self.namespace
}
pub fn reason(&self) -> &str {
&self.reason
}
pub fn intents(&self) -> &[InvalidationIntent] {
&self.intents
}
pub fn is_empty(&self) -> bool {
self.intents.is_empty()
}
pub fn len(&self) -> usize {
self.intents.len()
}
pub fn intent(&mut self, intent: InvalidationIntent) -> &mut Self {
self.intents.push(intent);
self
}
pub fn invalidate_key(&mut self, key: impl Into<String>) -> &mut Self {
self.intent(InvalidationIntent::key(key))
}
pub fn invalidate_tag(&mut self, tag: impl Into<String>) -> &mut Self {
self.intent(InvalidationIntent::tag(tag))
}
pub fn invalidate_entity(
&mut self,
entity: impl Into<String>,
key: impl Into<String>,
) -> &mut Self {
self.intent(InvalidationIntent::entity(entity, key))
}
pub fn invalidate_collection(&mut self, collection: impl Into<String>) -> &mut Self {
self.intent(InvalidationIntent::collection(collection))
}
pub fn flush(&mut self) -> &mut Self {
self.intent(InvalidationIntent::flush())
}
pub fn cache_entity<E>(&mut self, id: E::Id) -> &mut Self
where
E: CacheEntity,
{
let id = id.to_string();
self.invalidate_entity(E::ENTITY, id);
if let Some(collection) = E::collection_tag() {
self.invalidate_collection(collection);
}
self
}
pub fn into_collected(self) -> CollectedInvalidations {
let mut batch = InvalidationIntentBatch::new(self.reason);
for intent in self.intents {
batch = batch.intent(intent);
}
CollectedInvalidations {
namespace: self.namespace,
batch,
}
}
}
impl Default for InvalidationCollector {
fn default() -> Self {
Self::new("db", "")
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CollectedInvalidations {
namespace: String,
batch: InvalidationIntentBatch,
}
impl CollectedInvalidations {
pub fn namespace(&self) -> &str {
&self.namespace
}
pub fn batch(&self) -> &InvalidationIntentBatch {
&self.batch
}
pub fn into_batch(self) -> InvalidationIntentBatch {
self.batch
}
pub fn is_empty(&self) -> bool {
self.batch.is_empty()
}
pub fn len(&self) -> usize {
self.batch.len()
}
pub async fn execute_local<C>(
self,
cache: &HydraCache<C>,
) -> hydracache::CacheResult<CollectedInvalidationReport>
where
C: CacheCodec,
{
let mut report = CollectedInvalidationReport {
intent_count: self.batch.len(),
..CollectedInvalidationReport::default()
};
for intent in self.batch.intents() {
match intent.to_cache_invalidation() {
CacheInvalidation::Key { key } => {
if cache.remove(&key).await? {
report.keys_removed += 1;
}
}
CacheInvalidation::Tag { tag } => {
report.tags_removed += cache.invalidate_tag(&tag).await?;
}
CacheInvalidation::Flush => {
cache.flush().await?;
report.flushed = true;
}
}
}
Ok(report)
}
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub struct CollectedInvalidationReport {
pub intent_count: usize,
pub keys_removed: u64,
pub tags_removed: u64,
pub flushed: bool,
}
#[cfg(test)]
mod tests {
use hydracache::HydraCache;
use super::*;
struct User;
impl CacheEntity for User {
type Id = i64;
const ENTITY: &'static str = "user";
const COLLECTION: Option<&'static str> = Some("users");
}
#[test]
fn collector_preserves_namespace_reason_and_ordered_intents() {
let mut collector = InvalidationCollector::new("tenant-a", "user-write");
collector
.invalidate_key("physical:user:42")
.invalidate_tag("tenant:7")
.cache_entity::<User>(42);
let collected = collector.into_collected();
assert_eq!(collected.namespace(), "tenant-a");
assert_eq!(collected.batch().reason(), "user-write");
assert_eq!(collected.len(), 4);
assert_eq!(
collected.batch().intents()[0],
InvalidationIntent::key("physical:user:42")
);
assert_eq!(
collected.batch().intents()[1],
InvalidationIntent::tag("tenant:7")
);
assert_eq!(
collected.batch().intents()[2],
InvalidationIntent::entity("user", "42")
);
assert_eq!(
collected.batch().intents()[3],
InvalidationIntent::collection("users")
);
}
#[tokio::test]
async fn collected_invalidations_can_apply_directly_to_local_cache() {
let cache = HydraCache::local().build();
cache
.get_or_insert_with(
"user:42",
hydracache::CacheOptions::new().tags(["users", "user:42"]),
|| async { "Ada".to_owned() },
)
.await
.unwrap();
let mut collector = InvalidationCollector::new("db", "direct");
collector.cache_entity::<User>(42);
let report = collector
.into_collected()
.execute_local(&cache)
.await
.unwrap();
assert_eq!(report.intent_count, 2);
assert_eq!(report.tags_removed, 1);
assert!(!report.flushed);
assert_eq!(cache.get::<String>("user:42").await.unwrap(), None);
}
}