hydracache_db/
transaction.rs1use hydracache::{CacheInvalidation, HydraCache};
2use hydracache_core::CacheCodec;
3
4use crate::{CacheEntity, InvalidationIntent, InvalidationIntentBatch};
5
6#[derive(Debug, Clone, PartialEq, Eq)]
12pub struct InvalidationCollector {
13 namespace: String,
14 reason: String,
15 intents: Vec<InvalidationIntent>,
16}
17
18impl InvalidationCollector {
19 pub fn new(namespace: impl Into<String>, reason: impl Into<String>) -> Self {
21 Self {
22 namespace: namespace.into(),
23 reason: reason.into(),
24 intents: Vec::new(),
25 }
26 }
27
28 pub fn namespace(&self) -> &str {
30 &self.namespace
31 }
32
33 pub fn reason(&self) -> &str {
35 &self.reason
36 }
37
38 pub fn intents(&self) -> &[InvalidationIntent] {
40 &self.intents
41 }
42
43 pub fn is_empty(&self) -> bool {
45 self.intents.is_empty()
46 }
47
48 pub fn len(&self) -> usize {
50 self.intents.len()
51 }
52
53 pub fn intent(&mut self, intent: InvalidationIntent) -> &mut Self {
55 self.intents.push(intent);
56 self
57 }
58
59 pub fn invalidate_key(&mut self, key: impl Into<String>) -> &mut Self {
61 self.intent(InvalidationIntent::key(key))
62 }
63
64 pub fn invalidate_tag(&mut self, tag: impl Into<String>) -> &mut Self {
66 self.intent(InvalidationIntent::tag(tag))
67 }
68
69 pub fn invalidate_entity(
71 &mut self,
72 entity: impl Into<String>,
73 key: impl Into<String>,
74 ) -> &mut Self {
75 self.intent(InvalidationIntent::entity(entity, key))
76 }
77
78 pub fn invalidate_collection(&mut self, collection: impl Into<String>) -> &mut Self {
80 self.intent(InvalidationIntent::collection(collection))
81 }
82
83 pub fn flush(&mut self) -> &mut Self {
85 self.intent(InvalidationIntent::flush())
86 }
87
88 pub fn cache_entity<E>(&mut self, id: E::Id) -> &mut Self
90 where
91 E: CacheEntity,
92 {
93 let id = id.to_string();
94 self.invalidate_entity(E::ENTITY, id);
95 if let Some(collection) = E::collection_tag() {
96 self.invalidate_collection(collection);
97 }
98 self
99 }
100
101 pub fn into_collected(self) -> CollectedInvalidations {
103 let mut batch = InvalidationIntentBatch::new(self.reason);
104 for intent in self.intents {
105 batch = batch.intent(intent);
106 }
107
108 CollectedInvalidations {
109 namespace: self.namespace,
110 batch,
111 }
112 }
113}
114
115impl Default for InvalidationCollector {
116 fn default() -> Self {
117 Self::new("db", "")
118 }
119}
120
121#[derive(Debug, Clone, PartialEq, Eq)]
123pub struct CollectedInvalidations {
124 namespace: String,
125 batch: InvalidationIntentBatch,
126}
127
128impl CollectedInvalidations {
129 pub fn namespace(&self) -> &str {
131 &self.namespace
132 }
133
134 pub fn batch(&self) -> &InvalidationIntentBatch {
136 &self.batch
137 }
138
139 pub fn into_batch(self) -> InvalidationIntentBatch {
141 self.batch
142 }
143
144 pub fn is_empty(&self) -> bool {
146 self.batch.is_empty()
147 }
148
149 pub fn len(&self) -> usize {
151 self.batch.len()
152 }
153
154 pub async fn execute_local<C>(
160 self,
161 cache: &HydraCache<C>,
162 ) -> hydracache::CacheResult<CollectedInvalidationReport>
163 where
164 C: CacheCodec,
165 {
166 let mut report = CollectedInvalidationReport {
167 intent_count: self.batch.len(),
168 ..CollectedInvalidationReport::default()
169 };
170
171 for intent in self.batch.intents() {
172 match intent.to_cache_invalidation() {
173 CacheInvalidation::Key { key } => {
174 if cache.remove(&key).await? {
175 report.keys_removed += 1;
176 }
177 }
178 CacheInvalidation::Tag { tag } => {
179 report.tags_removed += cache.invalidate_tag(&tag).await?;
180 }
181 CacheInvalidation::Flush => {
182 cache.flush().await?;
183 report.flushed = true;
184 }
185 }
186 }
187
188 Ok(report)
189 }
190}
191
192#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
194pub struct CollectedInvalidationReport {
195 pub intent_count: usize,
197 pub keys_removed: u64,
199 pub tags_removed: u64,
201 pub flushed: bool,
203}
204
205#[cfg(test)]
206mod tests {
207 use hydracache::HydraCache;
208
209 use super::*;
210
211 struct User;
212
213 impl CacheEntity for User {
214 type Id = i64;
215
216 const ENTITY: &'static str = "user";
217 const COLLECTION: Option<&'static str> = Some("users");
218 }
219
220 #[test]
221 fn collector_preserves_namespace_reason_and_ordered_intents() {
222 let mut collector = InvalidationCollector::new("tenant-a", "user-write");
223
224 collector
225 .invalidate_key("physical:user:42")
226 .invalidate_tag("tenant:7")
227 .cache_entity::<User>(42);
228
229 let collected = collector.into_collected();
230 assert_eq!(collected.namespace(), "tenant-a");
231 assert_eq!(collected.batch().reason(), "user-write");
232 assert_eq!(collected.len(), 4);
233 assert_eq!(
234 collected.batch().intents()[0],
235 InvalidationIntent::key("physical:user:42")
236 );
237 assert_eq!(
238 collected.batch().intents()[1],
239 InvalidationIntent::tag("tenant:7")
240 );
241 assert_eq!(
242 collected.batch().intents()[2],
243 InvalidationIntent::entity("user", "42")
244 );
245 assert_eq!(
246 collected.batch().intents()[3],
247 InvalidationIntent::collection("users")
248 );
249 }
250
251 #[tokio::test]
252 async fn collected_invalidations_can_apply_directly_to_local_cache() {
253 let cache = HydraCache::local().build();
254 cache
255 .get_or_insert_with(
256 "user:42",
257 hydracache::CacheOptions::new().tags(["users", "user:42"]),
258 || async { "Ada".to_owned() },
259 )
260 .await
261 .unwrap();
262
263 let mut collector = InvalidationCollector::new("db", "direct");
264 collector.cache_entity::<User>(42);
265
266 let report = collector
267 .into_collected()
268 .execute_local(&cache)
269 .await
270 .unwrap();
271
272 assert_eq!(report.intent_count, 2);
273 assert_eq!(report.tags_removed, 1);
274 assert!(!report.flushed);
275 assert_eq!(cache.get::<String>("user:42").await.unwrap(), None);
276 }
277}