Skip to main content

fast_cache/storage/flat_map/
read.rs

1use super::*;
2
3impl FlatMap {
4    #[inline(always)]
5    pub fn get_ref_hashed_no_ttl(&mut self, hash: u64, key: &[u8]) -> Option<&[u8]> {
6        self.lookup_ref_hashed_lazy(hash, key)
7    }
8
9    /// `&self` read path. Skips entry access tracking (LFU/LRU touch). Safe for
10    /// any caller that does not depend on read-touch tracking — including the
11    /// shared-store hot path under `RwLock::read`.
12    #[inline(always)]
13    pub fn get_ref_hashed_shared(&self, hash: u64, key: &[u8], now_ms: u64) -> Option<&[u8]> {
14        // Skip the expiration filter entirely when no TTL entries exist — the
15        // common case for caches that don't use SET ... EX. Saves a branch
16        // and removes the `now_ms` dependency from the hot path.
17        if self.ttl_entries == 0 {
18            #[cfg(feature = "fast-point-map")]
19            if let Some(value) = self.fast_points.get(hash, key) {
20                return Some(value.as_ref());
21            }
22            return self
23                .entries
24                .find(hash, |entry| entry.matches_hashed_key(hash, key))
25                .map(|entry| entry.value.as_ref());
26        }
27        self.entries
28            .find(hash, |entry| entry.matches(hash, key))
29            .filter(|entry| !entry.is_expired(now_ms))
30            .map(|entry| entry.value.as_ref())
31    }
32
33    /// Returns true when there are no TTL'd entries — caller can skip a
34    /// `now_millis()` call since no entries can expire.
35    #[inline(always)]
36    pub fn has_no_ttl_entries(&self) -> bool {
37        self.ttl_entries == 0
38    }
39
40    #[inline(always)]
41    pub fn get_ref_hashed_shared_no_ttl(&self, hash: u64, key: &[u8]) -> Option<&[u8]> {
42        #[cfg(feature = "fast-point-map")]
43        if let Some(value) = self.fast_points.get(hash, key) {
44            return Some(value.as_ref());
45        }
46        self.entries
47            .find(hash, |entry| entry.matches_hashed_key(hash, key))
48            .map(|entry| entry.value.as_ref())
49    }
50
51    #[inline(always)]
52    pub fn with_shared_value_bytes_hashed_no_ttl<F>(
53        &self,
54        hash: u64,
55        key: &[u8],
56        write: &mut F,
57    ) -> bool
58    where
59        F: FnMut(&SharedBytes),
60    {
61        #[cfg(feature = "fast-point-map")]
62        if let Some(value) = self.fast_points.get(hash, key) {
63            write(value);
64            return true;
65        }
66        if let Some(entry) = self
67            .entries
68            .find(hash, |entry| entry.matches_hashed_key(hash, key))
69        {
70            write(&entry.value);
71            true
72        } else {
73            false
74        }
75    }
76
77    #[inline(always)]
78    pub fn get_shared_value_bytes_hashed_no_ttl(
79        &self,
80        hash: u64,
81        key: &[u8],
82    ) -> Option<&SharedBytes> {
83        #[cfg(feature = "fast-point-map")]
84        if let Some(value) = self.fast_points.get(hash, key) {
85            return Some(value);
86        }
87        self.entries
88            .find(hash, |entry| entry.matches_hashed_key(hash, key))
89            .map(|entry| &entry.value)
90    }
91
92    #[inline(always)]
93    pub fn get_shared_value_bytes_hashed_tagged_no_ttl(
94        &self,
95        hash: u64,
96        key_tag: u64,
97        key_len: usize,
98    ) -> Option<&SharedBytes> {
99        #[cfg(feature = "fast-point-map")]
100        if let Some(value) = self.fast_points.get_tagged(hash, key_tag, key_len) {
101            return Some(value);
102        }
103        self.entries
104            .find(hash, |entry| entry.matches_tagged(hash, key_tag, key_len))
105            .map(|entry| &entry.value)
106    }
107
108    #[inline(always)]
109    pub fn with_shared_value_bytes_hashed<F>(
110        &self,
111        hash: u64,
112        key: &[u8],
113        now_ms: u64,
114        write: &mut F,
115    ) -> bool
116    where
117        F: FnMut(&SharedBytes),
118    {
119        if let Some(entry) = self
120            .entries
121            .find(hash, |entry| entry.matches(hash, key))
122            .filter(|entry| !entry.is_expired(now_ms))
123        {
124            write(&entry.value);
125            true
126        } else {
127            false
128        }
129    }
130
131    #[inline(always)]
132    pub fn get_shared_value_bytes_hashed(
133        &self,
134        hash: u64,
135        key: &[u8],
136        now_ms: u64,
137    ) -> Option<&SharedBytes> {
138        self.entries
139            .find(hash, |entry| entry.matches(hash, key))
140            .filter(|entry| !entry.is_expired(now_ms))
141            .map(|entry| &entry.value)
142    }
143
144    /// Returns a refcount-only clone of the stored `bytes::Bytes`. Avoids the
145    /// `Vec<u8>` allocation that `get_ref_hashed_shared` callers do via
146    /// `to_vec`. Hot path for multi-direct GET.
147    #[inline(always)]
148    pub fn get_value_bytes_hashed(
149        &self,
150        hash: u64,
151        key: &[u8],
152        now_ms: u64,
153    ) -> Option<SharedBytes> {
154        #[cfg(feature = "fast-point-map")]
155        if let Some(value) = self.fast_points.get(hash, key) {
156            return Some(value.clone());
157        }
158        self.entries
159            .find(hash, |entry| entry.matches(hash, key))
160            .filter(|entry| !entry.is_expired(now_ms))
161            .map(|entry| entry.value.clone())
162    }
163
164    #[inline(always)]
165    pub fn get_value_bytes_hashed_prepared(
166        &self,
167        hash: u64,
168        key: &[u8],
169        key_tag: u64,
170        now_ms: u64,
171    ) -> Option<SharedBytes> {
172        #[cfg(feature = "fast-point-map")]
173        if let Some(value) = self.fast_points.get(hash, key) {
174            return Some(value.clone());
175        }
176        self.entries
177            .find(hash, |entry| entry.matches_prepared(hash, key, key_tag))
178            .filter(|entry| !entry.is_expired(now_ms))
179            .map(|entry| entry.value.clone())
180    }
181
182    /// Returns the current value and updates its expiration while holding the
183    /// shard write lock. This is the native GETEX primitive.
184    #[inline(always)]
185    pub fn get_value_bytes_hashed_and_expire(
186        &mut self,
187        hash: u64,
188        key: &[u8],
189        expire_at_ms: u64,
190        now_ms: u64,
191    ) -> Option<SharedBytes> {
192        self.disable_fast_point_map();
193
194        let mut entry = self
195            .entries
196            .find_entry(hash, |entry| entry.matches_hashed_key(hash, key))
197            .ok()?;
198        if entry.get().is_expired(now_ms) {
199            let _ = entry;
200            self.delete_hashed(hash, key, now_ms);
201            return None;
202        }
203        let had_ttl = entry.get().expire_at_ms.is_some();
204        let value = entry.get().value.clone();
205        entry.get_mut().expire_at_ms = Some(expire_at_ms);
206        self.adjust_ttl_count(had_ttl, true);
207        Some(value)
208    }
209
210    /// Calls `write` with the current value and updates its expiration while
211    /// holding the shard write lock. This is the borrowed native GETEX path:
212    /// it avoids the `Bytes` refcount clone/drop pair needed by
213    /// `get_value_bytes_hashed_and_expire`.
214    #[inline(always)]
215    pub fn with_value_bytes_hashed_and_expire<F>(
216        &mut self,
217        hash: u64,
218        key: &[u8],
219        expire_at_ms: u64,
220        now_ms: u64,
221        write: &mut F,
222    ) -> bool
223    where
224        F: FnMut(&[u8]),
225    {
226        self.disable_fast_point_map();
227
228        let Some(mut entry) = self
229            .entries
230            .find_entry(hash, |entry| entry.matches_hashed_key(hash, key))
231            .ok()
232        else {
233            return false;
234        };
235        if entry.get().is_expired(now_ms) {
236            let _ = entry;
237            self.delete_hashed(hash, key, now_ms);
238            return false;
239        }
240
241        let had_ttl = entry.get().expire_at_ms.is_some();
242        write(entry.get().value.as_ref());
243        entry.get_mut().expire_at_ms = Some(expire_at_ms);
244        self.adjust_ttl_count(had_ttl, true);
245        true
246    }
247
248    #[inline(always)]
249    pub fn get_ref_hashed_prepared_no_ttl(
250        &mut self,
251        hash: u64,
252        key: &[u8],
253        key_tag: u64,
254    ) -> Option<&[u8]> {
255        self.lookup_ref_hashed_prepared_lazy(hash, key, key_tag)
256    }
257
258    #[inline(always)]
259    pub fn get_ref(&mut self, key: &[u8], now_ms: u64) -> Option<&[u8]> {
260        self.get_ref_hashed(hash_key(key), key, now_ms)
261    }
262
263    #[inline(always)]
264    pub fn get_ref_hashed(&mut self, hash: u64, key: &[u8], now_ms: u64) -> Option<&[u8]> {
265        #[cfg(feature = "telemetry")]
266        let start = self.telemetry.as_ref().map(|_| Instant::now());
267        #[cfg(feature = "telemetry")]
268        let telemetry = self.telemetry.clone();
269
270        let value = if self.ttl_entries == 0 {
271            self.lookup_ref_hashed_lazy(hash, key)
272        } else if self.entry_is_expired_hashed(hash, key, now_ms) {
273            let _ = self.delete_hashed_internal(hash, key, now_ms, DeleteReason::Expired);
274            None
275        } else {
276            self.lookup_ref_hashed_lazy(hash, key)
277        };
278
279        #[cfg(feature = "telemetry")]
280        if let (Some(telemetry), Some(start)) = (telemetry, start) {
281            telemetry.metrics.record_get(
282                telemetry.shard_id,
283                value.is_some(),
284                value.map_or(0, |bytes| bytes.len()),
285                start.elapsed().as_nanos() as u64,
286            );
287        }
288
289        value
290    }
291
292    #[cfg(feature = "embedded")]
293    #[inline(always)]
294    pub fn get_ref_hashed_local(&mut self, hash: u64, key: &[u8], now_ms: u64) -> Option<&[u8]> {
295        #[cfg(feature = "telemetry")]
296        let start = self.telemetry.as_ref().map(|_| Instant::now());
297        #[cfg(feature = "telemetry")]
298        let telemetry = self.telemetry.clone();
299
300        let value = if self.ttl_entries == 0 {
301            self.lookup_ref_hashed_lazy(hash, key)
302        } else if self.entry_is_expired_hashed(hash, key, now_ms) {
303            let _ = self.delete_hashed_local_internal(hash, key, now_ms, DeleteReason::Expired);
304            None
305        } else {
306            self.lookup_ref_hashed_lazy(hash, key)
307        };
308
309        #[cfg(feature = "telemetry")]
310        if let (Some(telemetry), Some(start)) = (telemetry, start) {
311            telemetry.metrics.record_get(
312                telemetry.shard_id,
313                value.is_some(),
314                value.map_or(0, |bytes| bytes.len()),
315                start.elapsed().as_nanos() as u64,
316            );
317        }
318
319        value
320    }
321
322    pub fn get(&mut self, key: &[u8], now_ms: u64) -> Option<Bytes> {
323        self.get_ref(key, now_ms).map(<[u8]>::to_vec)
324    }
325
326    pub fn exists(&mut self, key: &[u8], now_ms: u64) -> bool {
327        self.disable_fast_point_map();
328        let hash = hash_key(key);
329        if self.ttl_entries != 0 && self.entry_is_expired_hashed(hash, key, now_ms) {
330            let _ = self.delete_hashed_internal(hash, key, now_ms, DeleteReason::Expired);
331            return false;
332        }
333        self.entries
334            .find(hash, |entry| entry.matches(hash, key))
335            .is_some()
336    }
337
338    /// Starts a shard-local read epoch.
339    ///
340    /// While at least one read epoch is active, value replacements and deletes
341    /// retire old buffers instead of freeing them immediately. That keeps any
342    /// zero-copy readers pointing at stable memory without introducing shared
343    /// reference counting.
344    #[inline(always)]
345    pub fn begin_read_epoch(&self) {
346        self.active_readers.fetch_add(1, Ordering::AcqRel);
347    }
348
349    /// Ends a shard-local read epoch and reclaims retired values once the last
350    /// reader for this shard has exited. Reclamation itself stays on the owner
351    /// thread and runs lazily from the write path.
352    #[inline(always)]
353    pub fn end_read_epoch(&self) {
354        self.active_readers.fetch_sub(1, Ordering::AcqRel);
355    }
356}