Skip to main content

fast_cache/storage/flat_map/
lifecycle.rs

1use super::*;
2
3impl FlatMap {
4    pub fn delete(&mut self, key: &[u8], now_ms: u64) -> bool {
5        self.delete_hashed_internal(hash_key(key), key, now_ms, DeleteReason::Explicit)
6    }
7
8    pub fn delete_hashed(&mut self, hash: u64, key: &[u8], _now_ms: u64) -> bool {
9        self.delete_hashed_internal(hash, key, _now_ms, DeleteReason::Explicit)
10    }
11
12    pub(crate) fn remove_value_hashed(
13        &mut self,
14        hash: u64,
15        key: &[u8],
16        now_ms: u64,
17    ) -> Option<SharedBytes> {
18        self.disable_fast_point_map();
19        self.reclaim_retired_if_quiescent();
20        let entry = self
21            .entries
22            .find_entry(hash, |entry| entry.matches(hash, key))
23            .ok()?;
24        if entry.get().is_expired(now_ms) {
25            let _ = entry;
26            let _ = self.delete_hashed_internal(hash, key, now_ms, DeleteReason::Expired);
27            return None;
28        }
29
30        let removed_key_len = entry.get().key.len();
31        let removed_value_len = entry.get().value.len();
32        if entry.get().expire_at_ms.is_some() {
33            self.ttl_entries = self.ttl_entries.saturating_sub(1);
34        }
35        let (removed, _) = entry.remove();
36        self.stored_bytes = self
37            .stored_bytes
38            .saturating_sub(removed_key_len.saturating_add(removed_value_len));
39        #[cfg(feature = "telemetry")]
40        self.record_delete_metrics(
41            DeleteReason::Explicit,
42            -1,
43            -((removed_key_len + removed_value_len) as isize),
44        );
45        Some(removed.value)
46    }
47
48    #[cfg(feature = "embedded")]
49    pub fn delete_hashed_local(&mut self, hash: u64, key: &[u8], now_ms: u64) -> bool {
50        self.delete_hashed_local_internal(hash, key, now_ms, DeleteReason::Explicit)
51    }
52
53    pub(super) fn delete_hashed_internal(
54        &mut self,
55        hash: u64,
56        key: &[u8],
57        _now_ms: u64,
58        #[cfg_attr(not(feature = "telemetry"), allow(unused_variables))] reason: DeleteReason,
59    ) -> bool {
60        self.disable_fast_point_map();
61        self.reclaim_retired_if_quiescent();
62        let Some(entry) = self
63            .entries
64            .find_entry(hash, |entry| entry.matches(hash, key))
65            .ok()
66        else {
67            return false;
68        };
69
70        let removed_key_len = entry.get().key.len();
71        let removed_value_len = entry.get().value.len();
72        if entry.get().expire_at_ms.is_some() {
73            self.ttl_entries = self.ttl_entries.saturating_sub(1);
74        }
75        let (removed, _) = entry.remove();
76        self.stored_bytes = self
77            .stored_bytes
78            .saturating_sub(removed_key_len.saturating_add(removed_value_len));
79        self.retire_value(removed.value);
80        if reason == DeleteReason::Evicted {
81            self.evictions = self.evictions.saturating_add(1);
82        }
83        #[cfg(feature = "telemetry")]
84        self.record_delete_metrics(
85            reason,
86            -1,
87            -((removed_key_len + removed_value_len) as isize),
88        );
89        true
90    }
91
92    #[cfg(feature = "embedded")]
93    pub(super) fn delete_hashed_local_internal(
94        &mut self,
95        hash: u64,
96        key: &[u8],
97        _now_ms: u64,
98        #[cfg_attr(not(feature = "telemetry"), allow(unused_variables))] reason: DeleteReason,
99    ) -> bool {
100        self.disable_fast_point_map();
101        let Some(entry) = self
102            .entries
103            .find_entry(hash, |entry| entry.matches(hash, key))
104            .ok()
105        else {
106            return false;
107        };
108
109        let removed_key_len = entry.get().key.len();
110        let removed_value_len = entry.get().value.len();
111        if entry.get().expire_at_ms.is_some() {
112            self.ttl_entries = self.ttl_entries.saturating_sub(1);
113        }
114        let (removed, _) = entry.remove();
115        drop(removed);
116        self.stored_bytes = self
117            .stored_bytes
118            .saturating_sub(removed_key_len.saturating_add(removed_value_len));
119        if reason == DeleteReason::Evicted {
120            self.evictions = self.evictions.saturating_add(1);
121        }
122        #[cfg(feature = "telemetry")]
123        self.record_delete_metrics(
124            reason,
125            -1,
126            -((removed_key_len + removed_value_len) as isize),
127        );
128        true
129    }
130
131    pub fn ttl_seconds(&mut self, key: &[u8], now_ms: u64) -> i64 {
132        self.disable_fast_point_map();
133        let hash = hash_key(key);
134        let Some(entry) = self.entries.find(hash, |entry| entry.matches(hash, key)) else {
135            return -2;
136        };
137        let Some(expire_at_ms) = entry.expire_at_ms else {
138            return -1;
139        };
140        if expire_at_ms <= now_ms {
141            self.delete_hashed_internal(hash, key, now_ms, DeleteReason::Expired);
142            return -2;
143        }
144        expire_at_ms.saturating_sub(now_ms).div_ceil(1_000) as i64
145    }
146
147    pub fn ttl_millis(&mut self, key: &[u8], now_ms: u64) -> i64 {
148        self.disable_fast_point_map();
149        let hash = hash_key(key);
150        let Some(entry) = self.entries.find(hash, |entry| entry.matches(hash, key)) else {
151            return -2;
152        };
153        let Some(expire_at_ms) = entry.expire_at_ms else {
154            return -1;
155        };
156        if expire_at_ms <= now_ms {
157            self.delete_hashed_internal(hash, key, now_ms, DeleteReason::Expired);
158            return -2;
159        }
160        expire_at_ms.saturating_sub(now_ms) as i64
161    }
162
163    pub fn persist(&mut self, key: &[u8], now_ms: u64) -> bool {
164        self.disable_fast_point_map();
165        let hash = hash_key(key);
166        if self.entry_is_expired_hashed(hash, key, now_ms) {
167            self.delete_hashed(hash, key, now_ms);
168            return false;
169        }
170
171        let Some(mut entry) = self
172            .entries
173            .find_entry(hash, |entry| entry.matches(hash, key))
174            .ok()
175        else {
176            return false;
177        };
178        if entry.get().expire_at_ms.is_none() {
179            return false;
180        }
181        entry.get_mut().expire_at_ms = None;
182        self.adjust_ttl_count(true, false);
183        true
184    }
185
186    pub fn expire(&mut self, key: &[u8], expire_at_ms: u64, now_ms: u64) -> bool {
187        self.disable_fast_point_map();
188        let hash = hash_key(key);
189        if self.entry_is_expired_hashed(hash, key, now_ms) {
190            self.delete_hashed(hash, key, now_ms);
191            return false;
192        }
193
194        let Some(mut entry) = self
195            .entries
196            .find_entry(hash, |entry| entry.matches(hash, key))
197            .ok()
198        else {
199            return false;
200        };
201        let had_ttl = entry.get().expire_at_ms.is_some();
202        entry.get_mut().expire_at_ms = Some(expire_at_ms);
203        self.adjust_ttl_count(had_ttl, true);
204        true
205    }
206
207    pub fn snapshot_entries(&self, now_ms: u64) -> Vec<StoredEntry> {
208        #[cfg(feature = "fast-point-map")]
209        if self.fast_points.is_active() {
210            return self.fast_points.snapshot_entries();
211        }
212        self.entries
213            .iter()
214            .filter(|entry| !entry.is_expired(now_ms))
215            .map(|entry| StoredEntry {
216                key: entry.key.as_ref().to_vec(),
217                value: entry.value.as_ref().to_vec(),
218                expire_at_ms: entry.expire_at_ms,
219            })
220            .collect()
221    }
222
223    pub fn process_maintenance(&mut self, now_ms: u64) -> usize {
224        self.reclaim_retired_if_quiescent();
225        if self.ttl_entries == 0 {
226            return 0;
227        }
228
229        let expired = self
230            .entries
231            .iter()
232            .filter(|entry| entry.is_expired(now_ms))
233            .map(|entry| (entry.hash, entry.key.as_ref().to_vec()))
234            .collect::<Vec<_>>();
235
236        let removed = expired.len();
237        for (hash, key) in expired {
238            let _ = self.delete_hashed_internal(hash, &key, now_ms, DeleteReason::Expired);
239        }
240        removed
241    }
242
243    pub fn stats_snapshot(&self) -> (TierStatsSnapshot, TierStatsSnapshot, TierStatsSnapshot) {
244        (
245            TierStatsSnapshot {
246                name: "hot",
247                len: 0,
248                capacity: 0,
249                ..TierStatsSnapshot::default()
250            },
251            TierStatsSnapshot {
252                name: "warm",
253                len: 0,
254                capacity: 0,
255                ..TierStatsSnapshot::default()
256            },
257            TierStatsSnapshot {
258                name: "cold",
259                len: self.len(),
260                capacity: self.len(),
261                evictions: self.evictions,
262                ..TierStatsSnapshot::default()
263            },
264        )
265    }
266}