Skip to main content

fast_cache/storage/embedded_store/
objects.rs

1use super::*;
2
3impl EmbeddedStore {
4    /// Returns true when Redis object containers are present.
5    #[inline(always)]
6    pub fn has_redis_objects(&self) -> bool {
7        self.objects.has_objects()
8    }
9
10    pub fn get_string_value_into<F>(&self, key: &[u8], mut write: F) -> RedisStringLookup
11    where
12        F: FnMut(&bytes::Bytes),
13    {
14        let route = self.route_key(key);
15        if self.objects.has_objects() {
16            let bucket = self.objects.read_bucket(route.shard_id, route.key_hash);
17            if bucket.has_expirations() && bucket.object_is_expired(key, now_millis()) {
18                drop(bucket);
19                let mut bucket = self.objects.write_bucket(route.shard_id, route.key_hash);
20                if bucket.delete_expired(key, now_millis()) {
21                    self.objects.note_deleted(route.shard_id);
22                }
23                drop(bucket);
24                return if self.with_shared_value_bytes_routed(route, key, &mut write) {
25                    RedisStringLookup::Hit
26                } else {
27                    RedisStringLookup::Miss
28                };
29            }
30            if bucket.contains_object(key) {
31                return RedisStringLookup::WrongType;
32            }
33            drop(bucket);
34            if self.with_shared_value_bytes_routed(route, key, &mut write) {
35                RedisStringLookup::Hit
36            } else {
37                RedisStringLookup::Miss
38            }
39        } else if self.with_shared_value_bytes_routed(route, key, &mut write) {
40            RedisStringLookup::Hit
41        } else {
42            RedisStringLookup::Miss
43        }
44    }
45
46    pub fn hset(&self, key: &[u8], field: &[u8], value: &[u8]) -> RedisObjectResult {
47        self.hset_hashed(hash_key(key), key, field, value)
48    }
49
50    pub fn hset_many(&self, key: &[u8], fields: &[(&[u8], &[u8])]) -> RedisObjectResult {
51        self.object_write(key, |bucket| bucket.hset_many(key, fields))
52    }
53
54    pub fn hget(&self, key: &[u8], field: &[u8]) -> RedisObjectResult {
55        self.object_read(key, |bucket| bucket.hget(key, field))
56    }
57
58    pub fn hexists(&self, key: &[u8], field: &[u8]) -> RedisObjectResult {
59        self.object_read(key, |bucket| bucket.hexists(key, field))
60    }
61
62    pub fn hdel(&self, key: &[u8], field: &[u8]) -> RedisObjectResult {
63        self.object_write(key, |bucket| bucket.hdel(key, field))
64    }
65
66    pub fn hdel_many(&self, key: &[u8], fields: &[&[u8]]) -> RedisObjectResult {
67        self.object_write(key, |bucket| bucket.hdel_many(key, fields))
68    }
69
70    pub fn hlen(&self, key: &[u8]) -> RedisObjectResult {
71        self.object_read(key, |bucket| bucket.hlen(key))
72    }
73
74    pub fn hmget(&self, key: &[u8], fields: &[&[u8]]) -> RedisObjectResult {
75        self.object_read(key, |bucket| bucket.hmget(key, fields))
76    }
77
78    pub fn hkeys(&self, key: &[u8]) -> RedisObjectResult {
79        self.object_read(key, |bucket| bucket.hkeys(key))
80    }
81
82    pub fn hvals(&self, key: &[u8]) -> RedisObjectResult {
83        self.object_read(key, |bucket| bucket.hvals(key))
84    }
85
86    pub fn hgetall(&self, key: &[u8]) -> RedisObjectResult {
87        self.object_read(key, |bucket| bucket.hgetall(key))
88    }
89
90    pub fn hsetnx(&self, key: &[u8], field: &[u8], value: &[u8]) -> RedisObjectResult {
91        self.object_write(key, |bucket| bucket.hsetnx(key, field, value))
92    }
93
94    pub fn hincrby(&self, key: &[u8], field: &[u8], delta: i64) -> RedisObjectResult {
95        self.object_write(key, |bucket| bucket.hincrby(key, field, delta))
96    }
97
98    pub fn hincrbyfloat(&self, key: &[u8], field: &[u8], delta: f64) -> RedisObjectResult {
99        self.object_write(key, |bucket| bucket.hincrbyfloat(key, field, delta))
100    }
101
102    pub fn hrandfield(
103        &self,
104        key: &[u8],
105        count: Option<i64>,
106        with_values: bool,
107    ) -> RedisObjectResult {
108        self.object_read(key, |bucket| bucket.hrandfield(key, count, with_values))
109    }
110
111    pub fn lpush(&self, key: &[u8], values: &[&[u8]]) -> RedisObjectResult {
112        self.push_list_hashed(hash_key(key), key, values, true)
113    }
114
115    pub fn rpush(&self, key: &[u8], values: &[&[u8]]) -> RedisObjectResult {
116        self.push_list_hashed(hash_key(key), key, values, false)
117    }
118
119    pub fn lpushx(&self, key: &[u8], values: &[&[u8]]) -> RedisObjectResult {
120        self.object_write(key, |bucket| bucket.push_list_existing(key, values, true))
121    }
122
123    pub fn rpushx(&self, key: &[u8], values: &[&[u8]]) -> RedisObjectResult {
124        self.object_write(key, |bucket| bucket.push_list_existing(key, values, false))
125    }
126
127    pub fn lpop(&self, key: &[u8]) -> RedisObjectResult {
128        self.object_write(key, |bucket| bucket.pop_list(key, true))
129    }
130
131    pub fn rpop(&self, key: &[u8]) -> RedisObjectResult {
132        self.object_write(key, |bucket| bucket.pop_list(key, false))
133    }
134
135    pub fn lpop_count(&self, key: &[u8], count: usize) -> RedisObjectResult {
136        self.object_write(key, |bucket| bucket.pop_list_count(key, count, true))
137    }
138
139    pub fn rpop_count(&self, key: &[u8], count: usize) -> RedisObjectResult {
140        self.object_write(key, |bucket| bucket.pop_list_count(key, count, false))
141    }
142
143    pub fn llen(&self, key: &[u8]) -> RedisObjectResult {
144        self.object_read(key, |bucket| bucket.llen(key))
145    }
146
147    pub fn lindex(&self, key: &[u8], index: i64) -> RedisObjectResult {
148        self.object_read(key, |bucket| bucket.lindex(key, index))
149    }
150
151    pub fn lrange(&self, key: &[u8], start: i64, stop: i64) -> RedisObjectResult {
152        self.object_read(key, |bucket| bucket.lrange(key, start, stop))
153    }
154
155    pub fn lset(&self, key: &[u8], index: i64, value: &[u8]) -> RedisObjectResult {
156        self.object_write(key, |bucket| bucket.lset(key, index, value))
157    }
158
159    pub fn lrem(&self, key: &[u8], count: i64, value: &[u8]) -> RedisObjectResult {
160        self.object_write(key, |bucket| bucket.lrem(key, count, value))
161    }
162
163    pub fn ltrim(&self, key: &[u8], start: i64, stop: i64) -> RedisObjectResult {
164        self.object_write(key, |bucket| bucket.ltrim(key, start, stop))
165    }
166
167    pub fn linsert(
168        &self,
169        key: &[u8],
170        before: bool,
171        pivot: &[u8],
172        value: &[u8],
173    ) -> RedisObjectResult {
174        self.object_write(key, |bucket| bucket.linsert(key, before, pivot, value))
175    }
176
177    pub fn sadd(&self, key: &[u8], members: &[&[u8]]) -> RedisObjectResult {
178        self.sadd_hashed(hash_key(key), key, members)
179    }
180
181    pub fn srem(&self, key: &[u8], members: &[&[u8]]) -> RedisObjectResult {
182        self.object_write(key, |bucket| bucket.srem(key, members))
183    }
184
185    pub fn sismember(&self, key: &[u8], member: &[u8]) -> RedisObjectResult {
186        self.object_read(key, |bucket| bucket.sismember(key, member))
187    }
188
189    pub fn smismember(&self, key: &[u8], members: &[&[u8]]) -> RedisObjectResult {
190        self.object_read(key, |bucket| bucket.smismember(key, members))
191    }
192
193    pub fn scard(&self, key: &[u8]) -> RedisObjectResult {
194        self.object_read(key, |bucket| bucket.scard(key))
195    }
196
197    pub fn smembers(&self, key: &[u8]) -> RedisObjectResult {
198        self.object_read(key, |bucket| bucket.smembers(key))
199    }
200
201    pub fn set_members(&self, key: &[u8]) -> Result<Vec<Bytes>, RedisObjectError> {
202        let route = self.route_key(key);
203        let bucket = self.objects.read_bucket(route.shard_id, route.key_hash);
204        let result = bucket
205            .set_members(key)
206            .map_err(|()| RedisObjectError::WrongType);
207        if result.is_err() || bucket.contains_object(key) {
208            return result;
209        }
210        drop(bucket);
211        if self.string_exists_routed(route, key) {
212            Err(RedisObjectError::WrongType)
213        } else {
214            result
215        }
216    }
217
218    pub fn spop(&self, key: &[u8], count: Option<usize>) -> RedisObjectResult {
219        self.object_write(key, |bucket| bucket.spop(key, count))
220    }
221
222    pub fn srandmember(&self, key: &[u8], count: Option<i64>) -> RedisObjectResult {
223        self.object_read(key, |bucket| bucket.srandmember(key, count))
224    }
225
226    pub fn zadd(&self, key: &[u8], score: f64, member: &[u8]) -> RedisObjectResult {
227        self.zadd_hashed(hash_key(key), key, score, member)
228    }
229
230    #[allow(clippy::too_many_arguments)]
231    pub fn zadd_cond(
232        &self,
233        key: &[u8],
234        score: f64,
235        member: &[u8],
236        nx: bool,
237        xx: bool,
238        gt: bool,
239        lt: bool,
240        ch: bool,
241        incr: bool,
242    ) -> RedisObjectResult {
243        self.object_write(key, |bucket| {
244            bucket.zadd_cond(key, score, member, nx, xx, gt, lt, ch, incr)
245        })
246    }
247
248    pub fn zrem(&self, key: &[u8], member: &[u8]) -> RedisObjectResult {
249        self.object_write(key, |bucket| bucket.zrem(key, member))
250    }
251
252    pub fn zrem_many(&self, key: &[u8], members: &[&[u8]]) -> RedisObjectResult {
253        self.object_write(key, |bucket| bucket.zrem_many(key, members))
254    }
255
256    pub fn zscore(&self, key: &[u8], member: &[u8]) -> RedisObjectResult {
257        self.object_read(key, |bucket| bucket.zscore(key, member))
258    }
259
260    pub fn zmscore(&self, key: &[u8], members: &[&[u8]]) -> RedisObjectResult {
261        self.object_read(key, |bucket| bucket.zmscore(key, members))
262    }
263
264    pub fn zincrby(&self, key: &[u8], delta: f64, member: &[u8]) -> RedisObjectResult {
265        self.object_write(key, |bucket| bucket.zincrby(key, delta, member))
266    }
267
268    pub fn zcard(&self, key: &[u8]) -> RedisObjectResult {
269        self.object_read(key, |bucket| bucket.zcard(key))
270    }
271
272    pub fn zrange(&self, key: &[u8], start: i64, stop: i64) -> RedisObjectResult {
273        self.object_read(key, |bucket| bucket.zrange(key, start, stop))
274    }
275
276    pub fn zentries(&self, key: &[u8]) -> Result<Vec<(Bytes, f64)>, RedisObjectError> {
277        let route = self.route_key(key);
278        let bucket = self.objects.read_bucket(route.shard_id, route.key_hash);
279        let result = bucket
280            .zentries(key)
281            .map_err(|()| RedisObjectError::WrongType);
282        if result.is_err() || bucket.contains_object(key) {
283            return result;
284        }
285        drop(bucket);
286        if self.string_exists_routed(route, key) {
287            Err(RedisObjectError::WrongType)
288        } else {
289            result
290        }
291    }
292
293    pub fn zrank(&self, key: &[u8], member: &[u8], rev: bool) -> RedisObjectResult {
294        self.object_read(key, |bucket| bucket.zrank(key, member, rev))
295    }
296
297    pub fn zcount(&self, key: &[u8], min: f64, max: f64) -> RedisObjectResult {
298        self.object_read(key, |bucket| bucket.zcount(key, min, max))
299    }
300
301    pub fn zpop(&self, key: &[u8], count: usize, max: bool) -> RedisObjectResult {
302        self.object_write(key, |bucket| bucket.zpop(key, count, max))
303    }
304
305    pub(crate) fn hset_hashed(
306        &self,
307        key_hash: u64,
308        key: &[u8],
309        field: &[u8],
310        value: &[u8],
311    ) -> RedisObjectResult {
312        self.object_create_hashed(
313            key_hash,
314            key,
315            |bucket, key_hash| {
316                bucket.hset_existing_or_wrongtype_hashed(key_hash, key, field, value)
317            },
318            |bucket, key_hash| bucket.hset_new_unchecked_hashed(key_hash, key, field, value),
319        )
320    }
321
322    pub(crate) fn push_list_hashed(
323        &self,
324        key_hash: u64,
325        key: &[u8],
326        values: &[&[u8]],
327        front: bool,
328    ) -> RedisObjectResult {
329        self.object_create_hashed(
330            key_hash,
331            key,
332            |bucket, key_hash| {
333                bucket.push_list_existing_or_wrongtype_hashed(key_hash, key, values, front)
334            },
335            |bucket, key_hash| bucket.push_list_new_unchecked_hashed(key_hash, key, values, front),
336        )
337    }
338
339    pub(crate) fn sadd_hashed(
340        &self,
341        key_hash: u64,
342        key: &[u8],
343        members: &[&[u8]],
344    ) -> RedisObjectResult {
345        self.object_create_hashed(
346            key_hash,
347            key,
348            |bucket, key_hash| bucket.sadd_existing_or_wrongtype_hashed(key_hash, key, members),
349            |bucket, key_hash| bucket.sadd_new_unchecked_hashed(key_hash, key, members),
350        )
351    }
352
353    pub(crate) fn zadd_hashed(
354        &self,
355        key_hash: u64,
356        key: &[u8],
357        score: f64,
358        member: &[u8],
359    ) -> RedisObjectResult {
360        self.object_create_hashed(
361            key_hash,
362            key,
363            |bucket, key_hash| {
364                bucket.zadd_existing_or_wrongtype_hashed(key_hash, key, score, member)
365            },
366            |bucket, key_hash| bucket.zadd_new_unchecked_hashed(key_hash, key, score, member),
367        )
368    }
369
370    fn object_read(
371        &self,
372        key: &[u8],
373        op: impl FnOnce(&RedisObjectBucket) -> RedisObjectResult,
374    ) -> RedisObjectResult {
375        self.object_read_routed(self.route_key(key), key, op)
376    }
377
378    #[allow(dead_code)]
379    pub(crate) fn object_read_hashed(
380        &self,
381        key_hash: u64,
382        key: &[u8],
383        op: impl FnOnce(&RedisObjectBucket) -> RedisObjectResult,
384    ) -> RedisObjectResult {
385        self.object_read_routed(self.route_key_prehashed(key_hash, key), key, op)
386    }
387
388    #[allow(dead_code)]
389    pub(crate) fn object_read_hashed_visit(
390        &self,
391        key_hash: u64,
392        key: &[u8],
393        op: impl FnOnce(&RedisObjectBucket) -> RedisObjectReadOutcome,
394    ) -> RedisObjectReadOutcome {
395        let route = self.route_key_prehashed(key_hash, key);
396        let bucket = self.objects.read_bucket(route.shard_id, route.key_hash);
397        if bucket.has_expirations() && bucket.object_is_expired(key, now_millis()) {
398            drop(bucket);
399            let mut bucket = self.objects.write_bucket(route.shard_id, route.key_hash);
400            if bucket.delete_expired(key, now_millis()) {
401                self.objects.note_deleted(route.shard_id);
402            }
403            drop(bucket);
404            return if self.string_exists_routed(route, key) {
405                RedisObjectReadOutcome::WrongType
406            } else {
407                RedisObjectReadOutcome::Missing
408            };
409        }
410        let outcome = op(&bucket);
411        if !matches!(outcome, RedisObjectReadOutcome::Missing) {
412            return outcome;
413        }
414        drop(bucket);
415        if self.string_exists_routed(route, key) {
416            RedisObjectReadOutcome::WrongType
417        } else {
418            RedisObjectReadOutcome::Missing
419        }
420    }
421
422    fn object_read_routed(
423        &self,
424        route: EmbeddedKeyRoute,
425        key: &[u8],
426        op: impl FnOnce(&RedisObjectBucket) -> RedisObjectResult,
427    ) -> RedisObjectResult {
428        let bucket = self.objects.read_bucket(route.shard_id, route.key_hash);
429        if bucket.has_expirations() && bucket.object_is_expired(key, now_millis()) {
430            drop(bucket);
431            let mut bucket = self.objects.write_bucket(route.shard_id, route.key_hash);
432            if bucket.delete_expired(key, now_millis()) {
433                self.objects.note_deleted(route.shard_id);
434            }
435            drop(bucket);
436            return if self.string_exists_routed(route, key) {
437                RedisObjectResult::WrongType
438            } else {
439                op(&self.objects.read_bucket(route.shard_id, route.key_hash))
440            };
441        }
442        let result = op(&bucket);
443        if matches!(result, RedisObjectResult::WrongType) || bucket.contains_object(key) {
444            return result;
445        }
446        drop(bucket);
447        if self.string_exists_routed(route, key) {
448            RedisObjectResult::WrongType
449        } else {
450            result
451        }
452    }
453
454    fn object_write(
455        &self,
456        key: &[u8],
457        op: impl FnOnce(&mut RedisObjectBucket) -> (RedisObjectResult, bool),
458    ) -> RedisObjectResult {
459        self.object_write_routed(self.route_key(key), key, op)
460    }
461
462    #[allow(dead_code)]
463    pub(crate) fn object_write_hashed(
464        &self,
465        key_hash: u64,
466        key: &[u8],
467        op: impl FnOnce(&mut RedisObjectBucket) -> (RedisObjectResult, bool),
468    ) -> RedisObjectResult {
469        self.object_write_routed(self.route_key_prehashed(key_hash, key), key, op)
470    }
471
472    fn object_create_hashed(
473        &self,
474        key_hash: u64,
475        key: &[u8],
476        existing: impl FnOnce(&mut RedisObjectBucket, u64) -> RedisObjectWriteAttempt,
477        create: impl FnOnce(&mut RedisObjectBucket, u64) -> RedisObjectResult,
478    ) -> RedisObjectResult {
479        let route = self.route_key_prehashed(key_hash, key);
480        let mut bucket = self.objects.write_bucket(route.shard_id, route.key_hash);
481        if bucket.has_expirations() && bucket.delete_expired(key, now_millis()) {
482            self.objects.note_deleted(route.shard_id);
483        }
484        match existing(&mut bucket, route.key_hash) {
485            RedisObjectWriteAttempt::Complete(result) => result,
486            RedisObjectWriteAttempt::Missing => {
487                if self.string_exists_routed(route, key) {
488                    RedisObjectResult::WrongType
489                } else {
490                    let result = create(&mut bucket, route.key_hash);
491                    self.objects.note_created(route.shard_id);
492                    result
493                }
494            }
495        }
496    }
497
498    fn object_write_routed(
499        &self,
500        route: EmbeddedKeyRoute,
501        key: &[u8],
502        op: impl FnOnce(&mut RedisObjectBucket) -> (RedisObjectResult, bool),
503    ) -> RedisObjectResult {
504        let mut bucket = self.objects.write_bucket(route.shard_id, route.key_hash);
505        if bucket.has_expirations() && bucket.delete_expired(key, now_millis()) {
506            self.objects.note_deleted(route.shard_id);
507        }
508        let had_object = bucket.contains_object(key);
509        if !had_object && self.string_exists_routed(route, key) {
510            return RedisObjectResult::WrongType;
511        }
512        let (result, object_changed) = op(&mut bucket);
513        if !had_object && object_changed {
514            self.objects.note_created(route.shard_id);
515        } else if had_object && object_changed {
516            self.objects.note_deleted(route.shard_id);
517        }
518        result
519    }
520
521    pub(crate) fn clone_object_value(&self, key: &[u8]) -> Option<RedisObjectValue> {
522        let route = self.route_key(key);
523        let bucket = self.objects.read_bucket(route.shard_id, route.key_hash);
524        if bucket.has_expirations() && bucket.object_is_expired(key, now_millis()) {
525            drop(bucket);
526            let mut bucket = self.objects.write_bucket(route.shard_id, route.key_hash);
527            if bucket.delete_expired(key, now_millis()) {
528                self.objects.note_deleted(route.shard_id);
529            }
530            return None;
531        }
532        bucket.clone_value(key)
533    }
534
535    pub(crate) fn set_object_value(
536        &self,
537        key: &[u8],
538        value: RedisObjectValue,
539        ttl_ms: Option<u64>,
540    ) {
541        let route = self.route_key(key);
542        let now_ms = now_millis();
543        let expire_at_ms = ttl_ms.map(|ttl| now_ms.saturating_add(ttl));
544        let mut bucket = self.objects.write_bucket(route.shard_id, route.key_hash);
545        let mut shard = self.shards[route.shard_id].write();
546        let had_object = bucket.contains_object(key);
547        if let Some(session_prefix) = point_write_session_storage_prefix(key) {
548            shard
549                .session_slots
550                .delete_hashed(&session_prefix, route.key_hash, key);
551        }
552        shard.map.delete_hashed(route.key_hash, key, now_ms);
553        let created = bucket.insert_value(key.to_vec(), value);
554        if created && !had_object {
555            self.objects.note_created(route.shard_id);
556        }
557        if let Some(expire_at_ms) = expire_at_ms {
558            bucket.expire(key, expire_at_ms, now_ms);
559        }
560    }
561
562    pub fn rename_key(
563        &self,
564        source: &[u8],
565        dest: &[u8],
566        nx: bool,
567    ) -> std::result::Result<bool, RedisObjectError> {
568        if source == dest {
569            if !self.exists(source) {
570                return Err(RedisObjectError::MissingKey);
571            }
572            return Ok(!nx);
573        }
574        if nx && self.exists(dest) {
575            return Ok(false);
576        }
577        let ttl_ms = match self.pttl_millis(source) {
578            ttl if ttl >= 0 => Some(ttl as u64),
579            _ => None,
580        };
581        if let Some(value) = self.get_value_bytes(source) {
582            self.set_value_bytes(dest, value, ttl_ms);
583            self.delete(source);
584            return Ok(true);
585        }
586        if let Some(value) = self.clone_object_value(source) {
587            self.delete(dest);
588            self.set_object_value(dest, value, ttl_ms);
589            self.delete(source);
590            return Ok(true);
591        }
592        Err(RedisObjectError::MissingKey)
593    }
594
595    fn string_exists_routed(&self, route: EmbeddedKeyRoute, key: &[u8]) -> bool {
596        if uses_flat_key_storage(self.route_mode, key) {
597            let shard = self.shards[route.shard_id].read();
598            if shard.map.is_empty() && shard.session_slots.is_empty() {
599                return false;
600            }
601            if shard.map.has_no_ttl_entries() {
602                return shard.map.with_shared_value_bytes_hashed_no_ttl(
603                    route.key_hash,
604                    key,
605                    &mut |_| {},
606                );
607            }
608            return shard.map.with_shared_value_bytes_hashed(
609                route.key_hash,
610                key,
611                now_millis(),
612                &mut |_| {},
613            );
614        }
615        self.with_shared_value_bytes_routed(route, key, &mut |_| {})
616    }
617}