fast_cache/storage/embedded_store/
lifecycle.rs1use super::*;
2
3impl EmbeddedStore {
4 pub fn delete(&self, key: &[u8]) -> bool {
6 let now_ms = now_millis();
7 let route = self.route_key(key);
8 self.delete_routed_then(route, key, now_ms, || {})
9 }
10
11 pub(crate) fn delete_routed_then(
14 &self,
15 route: EmbeddedKeyRoute,
16 key: &[u8],
17 now_ms: u64,
18 after_delete: impl FnOnce(),
19 ) -> bool {
20 let route = match route.shard_id < self.shards.len() {
21 true => route,
22 false => self.route_key(key),
23 };
24 if self.objects.has_objects() {
25 let mut bucket = self.objects.write_bucket(route.shard_id, route.key_hash);
26 let mut shard = self.shards[route.shard_id].write();
27 let deleted_object = bucket.delete_any(key);
28 if deleted_object {
29 self.objects.note_deleted(route.shard_id);
30 }
31 let deleted_session = if let Some(session_prefix) = derived_session_storage_prefix(key)
32 {
33 shard
34 .session_slots
35 .delete_hashed(&session_prefix, route.key_hash, key)
36 } else {
37 false
38 };
39 let deleted_map = shard.map.delete_hashed(route.key_hash, key, now_ms);
40 let deleted = deleted_object || deleted_session || deleted_map;
41 if deleted {
42 after_delete();
43 }
44 return deleted;
45 }
46 let mut shard = self.shards[route.shard_id].write();
47 if let Some(session_prefix) = derived_session_storage_prefix(key)
48 && shard
49 .session_slots
50 .delete_hashed(&session_prefix, route.key_hash, key)
51 {
52 after_delete();
53 return true;
54 }
55 let deleted = shard.map.delete_hashed(route.key_hash, key, now_ms);
56 if deleted {
57 after_delete();
58 }
59 deleted
60 }
61
62 pub fn exists(&self, key: &[u8]) -> bool {
64 let route = self.route_key(key);
65 if self.objects.has_objects() {
66 let bucket = self.objects.read_bucket(route.shard_id, route.key_hash);
67 if bucket.object_is_expired(key, now_millis()) {
68 drop(bucket);
69 let mut bucket = self.objects.write_bucket(route.shard_id, route.key_hash);
70 if bucket.delete_expired(key, now_millis()) {
71 self.objects.note_deleted(route.shard_id);
72 }
73 return self.get(key).is_some();
74 }
75 if bucket.contains_object(key) {
76 return true;
77 }
78 }
79 self.get(key).is_some()
80 }
81
82 pub fn ttl_seconds(&self, key: &[u8]) -> i64 {
84 let route = self.route_key(key);
85 let now_ms = now_millis();
86 if self.objects.has_objects() {
87 let mut bucket = self.objects.write_bucket(route.shard_id, route.key_hash);
88 if bucket.delete_expired(key, now_ms) {
89 self.objects.note_deleted(route.shard_id);
90 return -2;
91 }
92 let ttl = bucket.ttl_millis(key, now_ms);
93 if ttl != -2 {
94 return if ttl < 0 { ttl } else { (ttl + 999) / 1_000 };
95 }
96 }
97 let mut shard = self.shards[route.shard_id].write();
98 if let Some(session_prefix) = derived_session_storage_prefix(key)
99 && shard
100 .session_slots
101 .get_ref_hashed(&session_prefix, route.key_hash, key)
102 .is_some()
103 {
104 return -1;
105 }
106 shard.map.ttl_seconds(key, now_ms)
107 }
108
109 pub fn pttl_millis(&self, key: &[u8]) -> i64 {
111 let route = self.route_key(key);
112 let now_ms = now_millis();
113 if self.objects.has_objects() {
114 let mut bucket = self.objects.write_bucket(route.shard_id, route.key_hash);
115 if bucket.delete_expired(key, now_ms) {
116 self.objects.note_deleted(route.shard_id);
117 return -2;
118 }
119 let ttl = bucket.ttl_millis(key, now_ms);
120 if ttl != -2 {
121 return ttl;
122 }
123 }
124 let mut shard = self.shards[route.shard_id].write();
125 if let Some(session_prefix) = derived_session_storage_prefix(key)
126 && shard
127 .session_slots
128 .get_ref_hashed(&session_prefix, route.key_hash, key)
129 .is_some()
130 {
131 return -1;
132 }
133 shard.map.ttl_millis(key, now_ms)
134 }
135
136 pub fn persist(&self, key: &[u8]) -> bool {
138 let route = self.route_key(key);
139 let now_ms = now_millis();
140 if self.objects.has_objects() {
141 let mut bucket = self.objects.write_bucket(route.shard_id, route.key_hash);
142 if bucket.delete_expired(key, now_ms) {
143 self.objects.note_deleted(route.shard_id);
144 return false;
145 }
146 let persisted = bucket.persist(key, now_ms);
147 if persisted {
148 return true;
149 }
150 if bucket.contains_object(key) {
151 return false;
152 }
153 }
154 let mut shard = self.shards[route.shard_id].write();
155 if let Some(session_prefix) = derived_session_storage_prefix(key)
156 && shard
157 .session_slots
158 .get_ref_hashed(&session_prefix, route.key_hash, key)
159 .is_some()
160 {
161 return false;
162 }
163 shard.map.persist(key, now_ms)
164 }
165
166 pub fn expire(&self, key: &[u8], expire_at_ms: u64) -> bool {
168 let route = self.route_key(key);
169 let now_ms = now_millis();
170 self.expire_routed_then(route, key, expire_at_ms, now_ms, || {})
171 }
172
173 pub(crate) fn expire_routed_then(
176 &self,
177 route: EmbeddedKeyRoute,
178 key: &[u8],
179 expire_at_ms: u64,
180 now_ms: u64,
181 after_expire: impl FnOnce(),
182 ) -> bool {
183 let route = match route.shard_id < self.shards.len() {
184 true => route,
185 false => self.route_key(key),
186 };
187 if self.objects.has_objects() {
188 let mut bucket = self.objects.write_bucket(route.shard_id, route.key_hash);
189 if bucket.delete_expired(key, now_ms) {
190 self.objects.note_deleted(route.shard_id);
191 return false;
192 }
193 if bucket.expire(key, expire_at_ms, now_ms) {
194 after_expire();
195 return true;
196 }
197 if bucket.contains_object(key) {
198 return false;
199 }
200 }
201 let mut shard = self.shards[route.shard_id].write();
202 if let Some(session_prefix) = derived_session_storage_prefix(key)
203 && shard
204 .session_slots
205 .get_ref_hashed(&session_prefix, route.key_hash, key)
206 .is_some()
207 {
208 return false;
209 }
210 let changed = shard.map.expire(key, expire_at_ms, now_ms);
211 if changed {
212 after_expire();
213 }
214 changed
215 }
216
217 pub fn redis_type(&self, key: &[u8]) -> &'static str {
219 let route = self.route_key(key);
220 if self.objects.has_objects() {
221 let now_ms = now_millis();
222 let bucket = self.objects.read_bucket(route.shard_id, route.key_hash);
223 if bucket.object_is_expired(key, now_ms) {
224 drop(bucket);
225 let mut bucket = self.objects.write_bucket(route.shard_id, route.key_hash);
226 if bucket.delete_expired(key, now_ms) {
227 self.objects.note_deleted(route.shard_id);
228 }
229 return if self.get_value_bytes(key).is_some() {
230 "string"
231 } else {
232 "none"
233 };
234 }
235 if let Some(kind) = bucket.type_name(key) {
236 return kind;
237 }
238 }
239 if self.get_value_bytes(key).is_some() {
240 "string"
241 } else {
242 "none"
243 }
244 }
245
246 pub fn object_encoding(&self, key: &[u8]) -> Option<&'static str> {
248 let route = self.route_key(key);
249 if self.objects.has_objects() {
250 let now_ms = now_millis();
251 let bucket = self.objects.read_bucket(route.shard_id, route.key_hash);
252 if bucket.object_is_expired(key, now_ms) {
253 drop(bucket);
254 let mut bucket = self.objects.write_bucket(route.shard_id, route.key_hash);
255 if bucket.delete_expired(key, now_ms) {
256 self.objects.note_deleted(route.shard_id);
257 }
258 return self.get_value_bytes(key).map(|_| "raw");
259 }
260 if let Some(encoding) = bucket.encoding(key) {
261 return Some(encoding);
262 }
263 }
264 self.get_value_bytes(key).map(|_| "raw")
265 }
266
267 #[cfg(feature = "embedded")]
269 pub fn shard_stats_snapshot(&self) -> Vec<ShardStatsSnapshot> {
270 self.shards
271 .iter()
272 .enumerate()
273 .map(|(shard_id, shard)| {
274 let shard = shard.read();
275 let (hot, warm, cold) = shard.map.stats_snapshot();
276 let reads = hot
277 .hits
278 .saturating_add(hot.misses)
279 .saturating_add(warm.hits)
280 .saturating_add(warm.misses)
281 .saturating_add(cold.hits)
282 .saturating_add(cold.misses);
283 let expired = hot
284 .expirations
285 .saturating_add(warm.expirations)
286 .saturating_add(cold.expirations);
287 ShardStatsSnapshot {
288 shard_id,
289 key_count: shard.map.len().saturating_add(shard.session_slots.len()),
290 reads,
291 writes: 0,
292 deletes: 0,
293 expired,
294 maintenance_runs: 0,
295 hot,
296 warm,
297 cold,
298 }
299 })
300 .collect()
301 }
302
303 #[cfg(feature = "embedded")]
305 pub fn stats_snapshot(&self) -> (TierStatsSnapshot, TierStatsSnapshot, TierStatsSnapshot) {
306 let mut hot = TierStatsSnapshot {
307 name: "hot",
308 ..TierStatsSnapshot::default()
309 };
310 let mut warm = TierStatsSnapshot {
311 name: "warm",
312 ..TierStatsSnapshot::default()
313 };
314 let mut cold = TierStatsSnapshot {
315 name: "cold",
316 ..TierStatsSnapshot::default()
317 };
318
319 for shard in &self.shards {
320 let shard = shard.read();
321 let (shard_hot, shard_warm, shard_cold) = shard.map.stats_snapshot();
322 accumulate_tier_stats(&mut hot, &shard_hot);
323 accumulate_tier_stats(&mut warm, &shard_warm);
324 accumulate_tier_stats(&mut cold, &shard_cold);
325 }
326
327 (hot, warm, cold)
328 }
329
330 pub fn process_maintenance(&self) -> usize {
332 let now_ms = now_millis();
333 self.shards
334 .iter()
335 .map(|shard| {
336 let mut shard = shard.write();
337 shard.map.process_maintenance(now_ms)
338 })
339 .sum()
340 }
341
342 pub fn restore_entries<I>(&self, entries: I)
344 where
345 I: IntoIterator<Item = StoredEntry>,
346 {
347 let now_ms = now_millis();
348 for entry in entries {
349 if entry
350 .expire_at_ms
351 .is_some_and(|expire_at_ms| expire_at_ms <= now_ms)
352 {
353 continue;
354 }
355 let route = self.route_key(&entry.key);
356 let mut shard = self.shards[route.shard_id].write();
357 if let Some(session_prefix) = derived_session_storage_prefix(&entry.key) {
358 shard
359 .session_slots
360 .delete_hashed(&session_prefix, route.key_hash, &entry.key);
361 }
362 shard.map.set_hashed(
363 route.key_hash,
364 entry.key,
365 entry.value,
366 entry.expire_at_ms,
367 now_ms,
368 );
369 shard.enforce_memory_limit(now_ms);
370 }
371 }
372}