Skip to main content

fast_cache/storage/embedded_store/
lifecycle.rs

1use super::*;
2
3impl EmbeddedStore {
4    /// Deletes a key and returns true when a value or object was removed.
5    pub fn delete(&self, key: &[u8]) -> bool {
6        let now_ms = now_millis();
7        let route = self.route_key(key);
8        self.delete_routed_then(route, key, now_ms, || {})
9    }
10
11    /// Deletes a routed key and runs `after_delete` before releasing the shard
12    /// write lock when a mutation actually occurred.
13    pub(crate) fn delete_routed_then(
14        &self,
15        route: EmbeddedKeyRoute,
16        key: &[u8],
17        now_ms: u64,
18        after_delete: impl FnOnce(),
19    ) -> bool {
20        let route = match route.shard_id < self.shards.len() {
21            true => route,
22            false => self.route_key(key),
23        };
24        if self.objects.has_objects() {
25            let mut bucket = self.objects.write_bucket(route.shard_id, route.key_hash);
26            let mut shard = self.shards[route.shard_id].write();
27            let deleted_object = bucket.delete_any(key);
28            if deleted_object {
29                self.objects.note_deleted(route.shard_id);
30            }
31            let deleted_session = if let Some(session_prefix) = derived_session_storage_prefix(key)
32            {
33                shard
34                    .session_slots
35                    .delete_hashed(&session_prefix, route.key_hash, key)
36            } else {
37                false
38            };
39            let deleted_map = shard.map.delete_hashed(route.key_hash, key, now_ms);
40            let deleted = deleted_object || deleted_session || deleted_map;
41            if deleted {
42                after_delete();
43            }
44            return deleted;
45        }
46        let mut shard = self.shards[route.shard_id].write();
47        if let Some(session_prefix) = derived_session_storage_prefix(key)
48            && shard
49                .session_slots
50                .delete_hashed(&session_prefix, route.key_hash, key)
51        {
52            after_delete();
53            return true;
54        }
55        let deleted = shard.map.delete_hashed(route.key_hash, key, now_ms);
56        if deleted {
57            after_delete();
58        }
59        deleted
60    }
61
62    /// Returns true when `key` currently exists.
63    pub fn exists(&self, key: &[u8]) -> bool {
64        let route = self.route_key(key);
65        if self.objects.has_objects() {
66            let bucket = self.objects.read_bucket(route.shard_id, route.key_hash);
67            if bucket.object_is_expired(key, now_millis()) {
68                drop(bucket);
69                let mut bucket = self.objects.write_bucket(route.shard_id, route.key_hash);
70                if bucket.delete_expired(key, now_millis()) {
71                    self.objects.note_deleted(route.shard_id);
72                }
73                return self.get(key).is_some();
74            }
75            if bucket.contains_object(key) {
76                return true;
77            }
78        }
79        self.get(key).is_some()
80    }
81
82    /// Returns Redis-style TTL in seconds: `-2` for missing, `-1` for no TTL.
83    pub fn ttl_seconds(&self, key: &[u8]) -> i64 {
84        let route = self.route_key(key);
85        let now_ms = now_millis();
86        if self.objects.has_objects() {
87            let mut bucket = self.objects.write_bucket(route.shard_id, route.key_hash);
88            if bucket.delete_expired(key, now_ms) {
89                self.objects.note_deleted(route.shard_id);
90                return -2;
91            }
92            let ttl = bucket.ttl_millis(key, now_ms);
93            if ttl != -2 {
94                return if ttl < 0 { ttl } else { (ttl + 999) / 1_000 };
95            }
96        }
97        let mut shard = self.shards[route.shard_id].write();
98        if let Some(session_prefix) = derived_session_storage_prefix(key)
99            && shard
100                .session_slots
101                .get_ref_hashed(&session_prefix, route.key_hash, key)
102                .is_some()
103        {
104            return -1;
105        }
106        shard.map.ttl_seconds(key, now_ms)
107    }
108
109    /// Returns Redis-style TTL in milliseconds: `-2` for missing, `-1` for no TTL.
110    pub fn pttl_millis(&self, key: &[u8]) -> i64 {
111        let route = self.route_key(key);
112        let now_ms = now_millis();
113        if self.objects.has_objects() {
114            let mut bucket = self.objects.write_bucket(route.shard_id, route.key_hash);
115            if bucket.delete_expired(key, now_ms) {
116                self.objects.note_deleted(route.shard_id);
117                return -2;
118            }
119            let ttl = bucket.ttl_millis(key, now_ms);
120            if ttl != -2 {
121                return ttl;
122            }
123        }
124        let mut shard = self.shards[route.shard_id].write();
125        if let Some(session_prefix) = derived_session_storage_prefix(key)
126            && shard
127                .session_slots
128                .get_ref_hashed(&session_prefix, route.key_hash, key)
129                .is_some()
130        {
131            return -1;
132        }
133        shard.map.ttl_millis(key, now_ms)
134    }
135
136    /// Removes the TTL from a key and returns true when a TTL was cleared.
137    pub fn persist(&self, key: &[u8]) -> bool {
138        let route = self.route_key(key);
139        let now_ms = now_millis();
140        if self.objects.has_objects() {
141            let mut bucket = self.objects.write_bucket(route.shard_id, route.key_hash);
142            if bucket.delete_expired(key, now_ms) {
143                self.objects.note_deleted(route.shard_id);
144                return false;
145            }
146            let persisted = bucket.persist(key, now_ms);
147            if persisted {
148                return true;
149            }
150            if bucket.contains_object(key) {
151                return false;
152            }
153        }
154        let mut shard = self.shards[route.shard_id].write();
155        if let Some(session_prefix) = derived_session_storage_prefix(key)
156            && shard
157                .session_slots
158                .get_ref_hashed(&session_prefix, route.key_hash, key)
159                .is_some()
160        {
161            return false;
162        }
163        shard.map.persist(key, now_ms)
164    }
165
166    /// Sets an absolute expiration timestamp in Unix milliseconds.
167    pub fn expire(&self, key: &[u8], expire_at_ms: u64) -> bool {
168        let route = self.route_key(key);
169        let now_ms = now_millis();
170        self.expire_routed_then(route, key, expire_at_ms, now_ms, || {})
171    }
172
173    /// Updates an absolute expiration timestamp and runs `after_expire` before
174    /// releasing the shard write lock when a TTL mutation actually occurred.
175    pub(crate) fn expire_routed_then(
176        &self,
177        route: EmbeddedKeyRoute,
178        key: &[u8],
179        expire_at_ms: u64,
180        now_ms: u64,
181        after_expire: impl FnOnce(),
182    ) -> bool {
183        let route = match route.shard_id < self.shards.len() {
184            true => route,
185            false => self.route_key(key),
186        };
187        if self.objects.has_objects() {
188            let mut bucket = self.objects.write_bucket(route.shard_id, route.key_hash);
189            if bucket.delete_expired(key, now_ms) {
190                self.objects.note_deleted(route.shard_id);
191                return false;
192            }
193            if bucket.expire(key, expire_at_ms, now_ms) {
194                after_expire();
195                return true;
196            }
197            if bucket.contains_object(key) {
198                return false;
199            }
200        }
201        let mut shard = self.shards[route.shard_id].write();
202        if let Some(session_prefix) = derived_session_storage_prefix(key)
203            && shard
204                .session_slots
205                .get_ref_hashed(&session_prefix, route.key_hash, key)
206                .is_some()
207        {
208            return false;
209        }
210        let changed = shard.map.expire(key, expire_at_ms, now_ms);
211        if changed {
212            after_expire();
213        }
214        changed
215    }
216
217    /// Returns the Redis type name for a key, or `"none"` when it is missing.
218    pub fn redis_type(&self, key: &[u8]) -> &'static str {
219        let route = self.route_key(key);
220        if self.objects.has_objects() {
221            let now_ms = now_millis();
222            let bucket = self.objects.read_bucket(route.shard_id, route.key_hash);
223            if bucket.object_is_expired(key, now_ms) {
224                drop(bucket);
225                let mut bucket = self.objects.write_bucket(route.shard_id, route.key_hash);
226                if bucket.delete_expired(key, now_ms) {
227                    self.objects.note_deleted(route.shard_id);
228                }
229                return if self.get_value_bytes(key).is_some() {
230                    "string"
231                } else {
232                    "none"
233                };
234            }
235            if let Some(kind) = bucket.type_name(key) {
236                return kind;
237            }
238        }
239        if self.get_value_bytes(key).is_some() {
240            "string"
241        } else {
242            "none"
243        }
244    }
245
246    /// Returns the Redis object encoding name for a key when it exists.
247    pub fn object_encoding(&self, key: &[u8]) -> Option<&'static str> {
248        let route = self.route_key(key);
249        if self.objects.has_objects() {
250            let now_ms = now_millis();
251            let bucket = self.objects.read_bucket(route.shard_id, route.key_hash);
252            if bucket.object_is_expired(key, now_ms) {
253                drop(bucket);
254                let mut bucket = self.objects.write_bucket(route.shard_id, route.key_hash);
255                if bucket.delete_expired(key, now_ms) {
256                    self.objects.note_deleted(route.shard_id);
257                }
258                return self.get_value_bytes(key).map(|_| "raw");
259            }
260            if let Some(encoding) = bucket.encoding(key) {
261                return Some(encoding);
262            }
263        }
264        self.get_value_bytes(key).map(|_| "raw")
265    }
266
267    /// Returns per-shard statistics snapshots.
268    #[cfg(feature = "embedded")]
269    pub fn shard_stats_snapshot(&self) -> Vec<ShardStatsSnapshot> {
270        self.shards
271            .iter()
272            .enumerate()
273            .map(|(shard_id, shard)| {
274                let shard = shard.read();
275                let (hot, warm, cold) = shard.map.stats_snapshot();
276                let reads = hot
277                    .hits
278                    .saturating_add(hot.misses)
279                    .saturating_add(warm.hits)
280                    .saturating_add(warm.misses)
281                    .saturating_add(cold.hits)
282                    .saturating_add(cold.misses);
283                let expired = hot
284                    .expirations
285                    .saturating_add(warm.expirations)
286                    .saturating_add(cold.expirations);
287                ShardStatsSnapshot {
288                    shard_id,
289                    key_count: shard.map.len().saturating_add(shard.session_slots.len()),
290                    reads,
291                    writes: 0,
292                    deletes: 0,
293                    expired,
294                    maintenance_runs: 0,
295                    hot,
296                    warm,
297                    cold,
298                }
299            })
300            .collect()
301    }
302
303    /// Returns aggregate hot, warm, and cold tier statistics.
304    #[cfg(feature = "embedded")]
305    pub fn stats_snapshot(&self) -> (TierStatsSnapshot, TierStatsSnapshot, TierStatsSnapshot) {
306        let mut hot = TierStatsSnapshot {
307            name: "hot",
308            ..TierStatsSnapshot::default()
309        };
310        let mut warm = TierStatsSnapshot {
311            name: "warm",
312            ..TierStatsSnapshot::default()
313        };
314        let mut cold = TierStatsSnapshot {
315            name: "cold",
316            ..TierStatsSnapshot::default()
317        };
318
319        for shard in &self.shards {
320            let shard = shard.read();
321            let (shard_hot, shard_warm, shard_cold) = shard.map.stats_snapshot();
322            accumulate_tier_stats(&mut hot, &shard_hot);
323            accumulate_tier_stats(&mut warm, &shard_warm);
324            accumulate_tier_stats(&mut cold, &shard_cold);
325        }
326
327        (hot, warm, cold)
328    }
329
330    /// Runs maintenance on every shard and returns the number of expired entries.
331    pub fn process_maintenance(&self) -> usize {
332        let now_ms = now_millis();
333        self.shards
334            .iter()
335            .map(|shard| {
336                let mut shard = shard.write();
337                shard.map.process_maintenance(now_ms)
338            })
339            .sum()
340    }
341
342    /// Restores persisted entries, skipping records that are already expired.
343    pub fn restore_entries<I>(&self, entries: I)
344    where
345        I: IntoIterator<Item = StoredEntry>,
346    {
347        let now_ms = now_millis();
348        for entry in entries {
349            if entry
350                .expire_at_ms
351                .is_some_and(|expire_at_ms| expire_at_ms <= now_ms)
352            {
353                continue;
354            }
355            let route = self.route_key(&entry.key);
356            let mut shard = self.shards[route.shard_id].write();
357            if let Some(session_prefix) = derived_session_storage_prefix(&entry.key) {
358                shard
359                    .session_slots
360                    .delete_hashed(&session_prefix, route.key_hash, &entry.key);
361            }
362            shard.map.set_hashed(
363                route.key_hash,
364                entry.key,
365                entry.value,
366                entry.expire_at_ms,
367                now_ms,
368            );
369            shard.enforce_memory_limit(now_ms);
370        }
371    }
372}