Skip to main content

hydracache_db/
transaction.rs

1use hydracache::{CacheInvalidation, HydraCache};
2use hydracache_core::CacheCodec;
3
4use crate::{CacheEntity, InvalidationIntent, InvalidationIntentBatch};
5
6/// Mutable invalidation collector passed to transaction companion closures.
7///
8/// The collector is intentionally small and database-neutral. Repository code
9/// records the cache targets made stale by a write while keeping ownership of
10/// SQL/ORM execution inside the transaction closure.
11#[derive(Debug, Clone, PartialEq, Eq)]
12pub struct InvalidationCollector {
13    namespace: String,
14    reason: String,
15    intents: Vec<InvalidationIntent>,
16}
17
18impl InvalidationCollector {
19    /// Create an empty collector for one cache namespace and operator-facing reason.
20    pub fn new(namespace: impl Into<String>, reason: impl Into<String>) -> Self {
21        Self {
22            namespace: namespace.into(),
23            reason: reason.into(),
24            intents: Vec::new(),
25        }
26    }
27
28    /// Return the namespace that should receive the collected invalidations.
29    pub fn namespace(&self) -> &str {
30        &self.namespace
31    }
32
33    /// Return the reason attached to durable outbox rows.
34    pub fn reason(&self) -> &str {
35        &self.reason
36    }
37
38    /// Return collected intents in insertion order.
39    pub fn intents(&self) -> &[InvalidationIntent] {
40        &self.intents
41    }
42
43    /// Return true when no invalidations have been collected.
44    pub fn is_empty(&self) -> bool {
45        self.intents.is_empty()
46    }
47
48    /// Return the number of collected intents.
49    pub fn len(&self) -> usize {
50        self.intents.len()
51    }
52
53    /// Add one already-normalized invalidation intent.
54    pub fn intent(&mut self, intent: InvalidationIntent) -> &mut Self {
55        self.intents.push(intent);
56        self
57    }
58
59    /// Add one physical cache-key invalidation.
60    pub fn invalidate_key(&mut self, key: impl Into<String>) -> &mut Self {
61        self.intent(InvalidationIntent::key(key))
62    }
63
64    /// Add one tag invalidation.
65    pub fn invalidate_tag(&mut self, tag: impl Into<String>) -> &mut Self {
66        self.intent(InvalidationIntent::tag(tag))
67    }
68
69    /// Add one entity-tag invalidation.
70    pub fn invalidate_entity(
71        &mut self,
72        entity: impl Into<String>,
73        key: impl Into<String>,
74    ) -> &mut Self {
75        self.intent(InvalidationIntent::entity(entity, key))
76    }
77
78    /// Add one collection-tag invalidation.
79    pub fn invalidate_collection(&mut self, collection: impl Into<String>) -> &mut Self {
80        self.intent(InvalidationIntent::collection(collection))
81    }
82
83    /// Add a cache-wide flush invalidation.
84    pub fn flush(&mut self) -> &mut Self {
85        self.intent(InvalidationIntent::flush())
86    }
87
88    /// Add both entity and collection invalidations for a cache entity id.
89    pub fn cache_entity<E>(&mut self, id: E::Id) -> &mut Self
90    where
91        E: CacheEntity,
92    {
93        let id = id.to_string();
94        self.invalidate_entity(E::ENTITY, id);
95        if let Some(collection) = E::collection_tag() {
96            self.invalidate_collection(collection);
97        }
98        self
99    }
100
101    /// Finish collection and return an immutable invalidation payload.
102    pub fn into_collected(self) -> CollectedInvalidations {
103        let mut batch = InvalidationIntentBatch::new(self.reason);
104        for intent in self.intents {
105            batch = batch.intent(intent);
106        }
107
108        CollectedInvalidations {
109            namespace: self.namespace,
110            batch,
111        }
112    }
113}
114
115impl Default for InvalidationCollector {
116    fn default() -> Self {
117        Self::new("db", "")
118    }
119}
120
121/// Immutable invalidation payload produced by an [`InvalidationCollector`].
122#[derive(Debug, Clone, PartialEq, Eq)]
123pub struct CollectedInvalidations {
124    namespace: String,
125    batch: InvalidationIntentBatch,
126}
127
128impl CollectedInvalidations {
129    /// Return the namespace that should receive the invalidation batch.
130    pub fn namespace(&self) -> &str {
131        &self.namespace
132    }
133
134    /// Return the durable outbox batch.
135    pub fn batch(&self) -> &InvalidationIntentBatch {
136        &self.batch
137    }
138
139    /// Consume into the durable outbox batch.
140    pub fn into_batch(self) -> InvalidationIntentBatch {
141        self.batch
142    }
143
144    /// Return true when the batch has no invalidation intents.
145    pub fn is_empty(&self) -> bool {
146        self.batch.is_empty()
147    }
148
149    /// Return the number of invalidation intents.
150    pub fn len(&self) -> usize {
151        self.batch.len()
152    }
153
154    /// Apply collected invalidations directly to the local cache.
155    ///
156    /// This is useful for non-durable single-process examples. Production
157    /// transaction flows should prefer a durable outbox so invalidation intent is
158    /// committed atomically with the database write.
159    pub async fn execute_local<C>(
160        self,
161        cache: &HydraCache<C>,
162    ) -> hydracache::CacheResult<CollectedInvalidationReport>
163    where
164        C: CacheCodec,
165    {
166        let mut report = CollectedInvalidationReport {
167            intent_count: self.batch.len(),
168            ..CollectedInvalidationReport::default()
169        };
170
171        for intent in self.batch.intents() {
172            match intent.to_cache_invalidation() {
173                CacheInvalidation::Key { key } => {
174                    if cache.remove(&key).await? {
175                        report.keys_removed += 1;
176                    }
177                }
178                CacheInvalidation::Tag { tag } => {
179                    report.tags_removed += cache.invalidate_tag(&tag).await?;
180                }
181                CacheInvalidation::Flush => {
182                    cache.flush().await?;
183                    report.flushed = true;
184                }
185            }
186        }
187
188        Ok(report)
189    }
190}
191
192/// Result of applying collected invalidations directly to a local cache.
193#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
194pub struct CollectedInvalidationReport {
195    /// Number of collected invalidation intents.
196    pub intent_count: usize,
197    /// Number of physical keys removed.
198    pub keys_removed: u64,
199    /// Number of entries removed through tags.
200    pub tags_removed: u64,
201    /// Whether a flush intent was applied.
202    pub flushed: bool,
203}
204
205#[cfg(test)]
206mod tests {
207    use hydracache::HydraCache;
208
209    use super::*;
210
211    struct User;
212
213    impl CacheEntity for User {
214        type Id = i64;
215
216        const ENTITY: &'static str = "user";
217        const COLLECTION: Option<&'static str> = Some("users");
218    }
219
220    #[test]
221    fn collector_preserves_namespace_reason_and_ordered_intents() {
222        let mut collector = InvalidationCollector::new("tenant-a", "user-write");
223
224        collector
225            .invalidate_key("physical:user:42")
226            .invalidate_tag("tenant:7")
227            .cache_entity::<User>(42);
228
229        let collected = collector.into_collected();
230        assert_eq!(collected.namespace(), "tenant-a");
231        assert_eq!(collected.batch().reason(), "user-write");
232        assert_eq!(collected.len(), 4);
233        assert_eq!(
234            collected.batch().intents()[0],
235            InvalidationIntent::key("physical:user:42")
236        );
237        assert_eq!(
238            collected.batch().intents()[1],
239            InvalidationIntent::tag("tenant:7")
240        );
241        assert_eq!(
242            collected.batch().intents()[2],
243            InvalidationIntent::entity("user", "42")
244        );
245        assert_eq!(
246            collected.batch().intents()[3],
247            InvalidationIntent::collection("users")
248        );
249    }
250
251    #[tokio::test]
252    async fn collected_invalidations_can_apply_directly_to_local_cache() {
253        let cache = HydraCache::local().build();
254        cache
255            .get_or_insert_with(
256                "user:42",
257                hydracache::CacheOptions::new().tags(["users", "user:42"]),
258                || async { "Ada".to_owned() },
259            )
260            .await
261            .unwrap();
262
263        let mut collector = InvalidationCollector::new("db", "direct");
264        collector.cache_entity::<User>(42);
265
266        let report = collector
267            .into_collected()
268            .execute_local(&cache)
269            .await
270            .unwrap();
271
272        assert_eq!(report.intent_count, 2);
273        assert_eq!(report.tags_removed, 1);
274        assert!(!report.flushed);
275        assert_eq!(cache.get::<String>("user:42").await.unwrap(), None);
276    }
277}