1use super::*;
2
3impl EmbeddedStore {
4 #[inline(always)]
6 pub fn has_redis_objects(&self) -> bool {
7 self.objects.has_objects()
8 }
9
10 pub fn get_string_value_into<F>(&self, key: &[u8], mut write: F) -> RedisStringLookup
11 where
12 F: FnMut(&bytes::Bytes),
13 {
14 let route = self.route_key(key);
15 if self.objects.has_objects() {
16 let bucket = self.objects.read_bucket(route.shard_id, route.key_hash);
17 if bucket.has_expirations() && bucket.object_is_expired(key, now_millis()) {
18 drop(bucket);
19 let mut bucket = self.objects.write_bucket(route.shard_id, route.key_hash);
20 if bucket.delete_expired(key, now_millis()) {
21 self.objects.note_deleted(route.shard_id);
22 }
23 drop(bucket);
24 return if self.with_shared_value_bytes_routed(route, key, &mut write) {
25 RedisStringLookup::Hit
26 } else {
27 RedisStringLookup::Miss
28 };
29 }
30 if bucket.contains_object(key) {
31 return RedisStringLookup::WrongType;
32 }
33 drop(bucket);
34 if self.with_shared_value_bytes_routed(route, key, &mut write) {
35 RedisStringLookup::Hit
36 } else {
37 RedisStringLookup::Miss
38 }
39 } else if self.with_shared_value_bytes_routed(route, key, &mut write) {
40 RedisStringLookup::Hit
41 } else {
42 RedisStringLookup::Miss
43 }
44 }
45
46 pub fn hset(&self, key: &[u8], field: &[u8], value: &[u8]) -> RedisObjectResult {
47 self.hset_hashed(hash_key(key), key, field, value)
48 }
49
50 pub fn hset_many(&self, key: &[u8], fields: &[(&[u8], &[u8])]) -> RedisObjectResult {
51 self.object_write(key, |bucket| bucket.hset_many(key, fields))
52 }
53
54 pub fn hget(&self, key: &[u8], field: &[u8]) -> RedisObjectResult {
55 self.object_read(key, |bucket| bucket.hget(key, field))
56 }
57
58 pub fn hexists(&self, key: &[u8], field: &[u8]) -> RedisObjectResult {
59 self.object_read(key, |bucket| bucket.hexists(key, field))
60 }
61
62 pub fn hdel(&self, key: &[u8], field: &[u8]) -> RedisObjectResult {
63 self.object_write(key, |bucket| bucket.hdel(key, field))
64 }
65
66 pub fn hdel_many(&self, key: &[u8], fields: &[&[u8]]) -> RedisObjectResult {
67 self.object_write(key, |bucket| bucket.hdel_many(key, fields))
68 }
69
70 pub fn hlen(&self, key: &[u8]) -> RedisObjectResult {
71 self.object_read(key, |bucket| bucket.hlen(key))
72 }
73
74 pub fn hmget(&self, key: &[u8], fields: &[&[u8]]) -> RedisObjectResult {
75 self.object_read(key, |bucket| bucket.hmget(key, fields))
76 }
77
78 pub fn hkeys(&self, key: &[u8]) -> RedisObjectResult {
79 self.object_read(key, |bucket| bucket.hkeys(key))
80 }
81
82 pub fn hvals(&self, key: &[u8]) -> RedisObjectResult {
83 self.object_read(key, |bucket| bucket.hvals(key))
84 }
85
86 pub fn hgetall(&self, key: &[u8]) -> RedisObjectResult {
87 self.object_read(key, |bucket| bucket.hgetall(key))
88 }
89
90 pub fn hsetnx(&self, key: &[u8], field: &[u8], value: &[u8]) -> RedisObjectResult {
91 self.object_write(key, |bucket| bucket.hsetnx(key, field, value))
92 }
93
94 pub fn hincrby(&self, key: &[u8], field: &[u8], delta: i64) -> RedisObjectResult {
95 self.object_write(key, |bucket| bucket.hincrby(key, field, delta))
96 }
97
98 pub fn hincrbyfloat(&self, key: &[u8], field: &[u8], delta: f64) -> RedisObjectResult {
99 self.object_write(key, |bucket| bucket.hincrbyfloat(key, field, delta))
100 }
101
102 pub fn hrandfield(
103 &self,
104 key: &[u8],
105 count: Option<i64>,
106 with_values: bool,
107 ) -> RedisObjectResult {
108 self.object_read(key, |bucket| bucket.hrandfield(key, count, with_values))
109 }
110
111 pub fn lpush(&self, key: &[u8], values: &[&[u8]]) -> RedisObjectResult {
112 self.push_list_hashed(hash_key(key), key, values, true)
113 }
114
115 pub fn rpush(&self, key: &[u8], values: &[&[u8]]) -> RedisObjectResult {
116 self.push_list_hashed(hash_key(key), key, values, false)
117 }
118
119 pub fn lpushx(&self, key: &[u8], values: &[&[u8]]) -> RedisObjectResult {
120 self.object_write(key, |bucket| bucket.push_list_existing(key, values, true))
121 }
122
123 pub fn rpushx(&self, key: &[u8], values: &[&[u8]]) -> RedisObjectResult {
124 self.object_write(key, |bucket| bucket.push_list_existing(key, values, false))
125 }
126
127 pub fn lpop(&self, key: &[u8]) -> RedisObjectResult {
128 self.object_write(key, |bucket| bucket.pop_list(key, true))
129 }
130
131 pub fn rpop(&self, key: &[u8]) -> RedisObjectResult {
132 self.object_write(key, |bucket| bucket.pop_list(key, false))
133 }
134
135 pub fn lpop_count(&self, key: &[u8], count: usize) -> RedisObjectResult {
136 self.object_write(key, |bucket| bucket.pop_list_count(key, count, true))
137 }
138
139 pub fn rpop_count(&self, key: &[u8], count: usize) -> RedisObjectResult {
140 self.object_write(key, |bucket| bucket.pop_list_count(key, count, false))
141 }
142
143 pub fn llen(&self, key: &[u8]) -> RedisObjectResult {
144 self.object_read(key, |bucket| bucket.llen(key))
145 }
146
147 pub fn lindex(&self, key: &[u8], index: i64) -> RedisObjectResult {
148 self.object_read(key, |bucket| bucket.lindex(key, index))
149 }
150
151 pub fn lrange(&self, key: &[u8], start: i64, stop: i64) -> RedisObjectResult {
152 self.object_read(key, |bucket| bucket.lrange(key, start, stop))
153 }
154
155 pub fn lset(&self, key: &[u8], index: i64, value: &[u8]) -> RedisObjectResult {
156 self.object_write(key, |bucket| bucket.lset(key, index, value))
157 }
158
159 pub fn lrem(&self, key: &[u8], count: i64, value: &[u8]) -> RedisObjectResult {
160 self.object_write(key, |bucket| bucket.lrem(key, count, value))
161 }
162
163 pub fn ltrim(&self, key: &[u8], start: i64, stop: i64) -> RedisObjectResult {
164 self.object_write(key, |bucket| bucket.ltrim(key, start, stop))
165 }
166
167 pub fn linsert(
168 &self,
169 key: &[u8],
170 before: bool,
171 pivot: &[u8],
172 value: &[u8],
173 ) -> RedisObjectResult {
174 self.object_write(key, |bucket| bucket.linsert(key, before, pivot, value))
175 }
176
177 pub fn sadd(&self, key: &[u8], members: &[&[u8]]) -> RedisObjectResult {
178 self.sadd_hashed(hash_key(key), key, members)
179 }
180
181 pub fn srem(&self, key: &[u8], members: &[&[u8]]) -> RedisObjectResult {
182 self.object_write(key, |bucket| bucket.srem(key, members))
183 }
184
185 pub fn sismember(&self, key: &[u8], member: &[u8]) -> RedisObjectResult {
186 self.object_read(key, |bucket| bucket.sismember(key, member))
187 }
188
189 pub fn smismember(&self, key: &[u8], members: &[&[u8]]) -> RedisObjectResult {
190 self.object_read(key, |bucket| bucket.smismember(key, members))
191 }
192
193 pub fn scard(&self, key: &[u8]) -> RedisObjectResult {
194 self.object_read(key, |bucket| bucket.scard(key))
195 }
196
197 pub fn smembers(&self, key: &[u8]) -> RedisObjectResult {
198 self.object_read(key, |bucket| bucket.smembers(key))
199 }
200
201 pub fn set_members(&self, key: &[u8]) -> Result<Vec<Bytes>, RedisObjectError> {
202 let route = self.route_key(key);
203 let bucket = self.objects.read_bucket(route.shard_id, route.key_hash);
204 let result = bucket
205 .set_members(key)
206 .map_err(|()| RedisObjectError::WrongType);
207 if result.is_err() || bucket.contains_object(key) {
208 return result;
209 }
210 drop(bucket);
211 if self.string_exists_routed(route, key) {
212 Err(RedisObjectError::WrongType)
213 } else {
214 result
215 }
216 }
217
218 pub fn spop(&self, key: &[u8], count: Option<usize>) -> RedisObjectResult {
219 self.object_write(key, |bucket| bucket.spop(key, count))
220 }
221
222 pub fn srandmember(&self, key: &[u8], count: Option<i64>) -> RedisObjectResult {
223 self.object_read(key, |bucket| bucket.srandmember(key, count))
224 }
225
226 pub fn zadd(&self, key: &[u8], score: f64, member: &[u8]) -> RedisObjectResult {
227 self.zadd_hashed(hash_key(key), key, score, member)
228 }
229
230 #[allow(clippy::too_many_arguments)]
231 pub fn zadd_cond(
232 &self,
233 key: &[u8],
234 score: f64,
235 member: &[u8],
236 nx: bool,
237 xx: bool,
238 gt: bool,
239 lt: bool,
240 ch: bool,
241 incr: bool,
242 ) -> RedisObjectResult {
243 self.object_write(key, |bucket| {
244 bucket.zadd_cond(key, score, member, nx, xx, gt, lt, ch, incr)
245 })
246 }
247
248 pub fn zrem(&self, key: &[u8], member: &[u8]) -> RedisObjectResult {
249 self.object_write(key, |bucket| bucket.zrem(key, member))
250 }
251
252 pub fn zrem_many(&self, key: &[u8], members: &[&[u8]]) -> RedisObjectResult {
253 self.object_write(key, |bucket| bucket.zrem_many(key, members))
254 }
255
256 pub fn zscore(&self, key: &[u8], member: &[u8]) -> RedisObjectResult {
257 self.object_read(key, |bucket| bucket.zscore(key, member))
258 }
259
260 pub fn zmscore(&self, key: &[u8], members: &[&[u8]]) -> RedisObjectResult {
261 self.object_read(key, |bucket| bucket.zmscore(key, members))
262 }
263
264 pub fn zincrby(&self, key: &[u8], delta: f64, member: &[u8]) -> RedisObjectResult {
265 self.object_write(key, |bucket| bucket.zincrby(key, delta, member))
266 }
267
268 pub fn zcard(&self, key: &[u8]) -> RedisObjectResult {
269 self.object_read(key, |bucket| bucket.zcard(key))
270 }
271
272 pub fn zrange(&self, key: &[u8], start: i64, stop: i64) -> RedisObjectResult {
273 self.object_read(key, |bucket| bucket.zrange(key, start, stop))
274 }
275
276 pub fn zentries(&self, key: &[u8]) -> Result<Vec<(Bytes, f64)>, RedisObjectError> {
277 let route = self.route_key(key);
278 let bucket = self.objects.read_bucket(route.shard_id, route.key_hash);
279 let result = bucket
280 .zentries(key)
281 .map_err(|()| RedisObjectError::WrongType);
282 if result.is_err() || bucket.contains_object(key) {
283 return result;
284 }
285 drop(bucket);
286 if self.string_exists_routed(route, key) {
287 Err(RedisObjectError::WrongType)
288 } else {
289 result
290 }
291 }
292
293 pub fn zrank(&self, key: &[u8], member: &[u8], rev: bool) -> RedisObjectResult {
294 self.object_read(key, |bucket| bucket.zrank(key, member, rev))
295 }
296
297 pub fn zcount(&self, key: &[u8], min: f64, max: f64) -> RedisObjectResult {
298 self.object_read(key, |bucket| bucket.zcount(key, min, max))
299 }
300
301 pub fn zpop(&self, key: &[u8], count: usize, max: bool) -> RedisObjectResult {
302 self.object_write(key, |bucket| bucket.zpop(key, count, max))
303 }
304
305 pub(crate) fn hset_hashed(
306 &self,
307 key_hash: u64,
308 key: &[u8],
309 field: &[u8],
310 value: &[u8],
311 ) -> RedisObjectResult {
312 self.object_create_hashed(
313 key_hash,
314 key,
315 |bucket, key_hash| {
316 bucket.hset_existing_or_wrongtype_hashed(key_hash, key, field, value)
317 },
318 |bucket, key_hash| bucket.hset_new_unchecked_hashed(key_hash, key, field, value),
319 )
320 }
321
322 pub(crate) fn push_list_hashed(
323 &self,
324 key_hash: u64,
325 key: &[u8],
326 values: &[&[u8]],
327 front: bool,
328 ) -> RedisObjectResult {
329 self.object_create_hashed(
330 key_hash,
331 key,
332 |bucket, key_hash| {
333 bucket.push_list_existing_or_wrongtype_hashed(key_hash, key, values, front)
334 },
335 |bucket, key_hash| bucket.push_list_new_unchecked_hashed(key_hash, key, values, front),
336 )
337 }
338
339 pub(crate) fn sadd_hashed(
340 &self,
341 key_hash: u64,
342 key: &[u8],
343 members: &[&[u8]],
344 ) -> RedisObjectResult {
345 self.object_create_hashed(
346 key_hash,
347 key,
348 |bucket, key_hash| bucket.sadd_existing_or_wrongtype_hashed(key_hash, key, members),
349 |bucket, key_hash| bucket.sadd_new_unchecked_hashed(key_hash, key, members),
350 )
351 }
352
353 pub(crate) fn zadd_hashed(
354 &self,
355 key_hash: u64,
356 key: &[u8],
357 score: f64,
358 member: &[u8],
359 ) -> RedisObjectResult {
360 self.object_create_hashed(
361 key_hash,
362 key,
363 |bucket, key_hash| {
364 bucket.zadd_existing_or_wrongtype_hashed(key_hash, key, score, member)
365 },
366 |bucket, key_hash| bucket.zadd_new_unchecked_hashed(key_hash, key, score, member),
367 )
368 }
369
370 fn object_read(
371 &self,
372 key: &[u8],
373 op: impl FnOnce(&RedisObjectBucket) -> RedisObjectResult,
374 ) -> RedisObjectResult {
375 self.object_read_routed(self.route_key(key), key, op)
376 }
377
378 #[allow(dead_code)]
379 pub(crate) fn object_read_hashed(
380 &self,
381 key_hash: u64,
382 key: &[u8],
383 op: impl FnOnce(&RedisObjectBucket) -> RedisObjectResult,
384 ) -> RedisObjectResult {
385 self.object_read_routed(self.route_key_prehashed(key_hash, key), key, op)
386 }
387
388 #[allow(dead_code)]
389 pub(crate) fn object_read_hashed_visit(
390 &self,
391 key_hash: u64,
392 key: &[u8],
393 op: impl FnOnce(&RedisObjectBucket) -> RedisObjectReadOutcome,
394 ) -> RedisObjectReadOutcome {
395 let route = self.route_key_prehashed(key_hash, key);
396 let bucket = self.objects.read_bucket(route.shard_id, route.key_hash);
397 if bucket.has_expirations() && bucket.object_is_expired(key, now_millis()) {
398 drop(bucket);
399 let mut bucket = self.objects.write_bucket(route.shard_id, route.key_hash);
400 if bucket.delete_expired(key, now_millis()) {
401 self.objects.note_deleted(route.shard_id);
402 }
403 drop(bucket);
404 return if self.string_exists_routed(route, key) {
405 RedisObjectReadOutcome::WrongType
406 } else {
407 RedisObjectReadOutcome::Missing
408 };
409 }
410 let outcome = op(&bucket);
411 if !matches!(outcome, RedisObjectReadOutcome::Missing) {
412 return outcome;
413 }
414 drop(bucket);
415 if self.string_exists_routed(route, key) {
416 RedisObjectReadOutcome::WrongType
417 } else {
418 RedisObjectReadOutcome::Missing
419 }
420 }
421
422 fn object_read_routed(
423 &self,
424 route: EmbeddedKeyRoute,
425 key: &[u8],
426 op: impl FnOnce(&RedisObjectBucket) -> RedisObjectResult,
427 ) -> RedisObjectResult {
428 let bucket = self.objects.read_bucket(route.shard_id, route.key_hash);
429 if bucket.has_expirations() && bucket.object_is_expired(key, now_millis()) {
430 drop(bucket);
431 let mut bucket = self.objects.write_bucket(route.shard_id, route.key_hash);
432 if bucket.delete_expired(key, now_millis()) {
433 self.objects.note_deleted(route.shard_id);
434 }
435 drop(bucket);
436 return if self.string_exists_routed(route, key) {
437 RedisObjectResult::WrongType
438 } else {
439 op(&self.objects.read_bucket(route.shard_id, route.key_hash))
440 };
441 }
442 let result = op(&bucket);
443 if matches!(result, RedisObjectResult::WrongType) || bucket.contains_object(key) {
444 return result;
445 }
446 drop(bucket);
447 if self.string_exists_routed(route, key) {
448 RedisObjectResult::WrongType
449 } else {
450 result
451 }
452 }
453
454 fn object_write(
455 &self,
456 key: &[u8],
457 op: impl FnOnce(&mut RedisObjectBucket) -> (RedisObjectResult, bool),
458 ) -> RedisObjectResult {
459 self.object_write_routed(self.route_key(key), key, op)
460 }
461
462 #[allow(dead_code)]
463 pub(crate) fn object_write_hashed(
464 &self,
465 key_hash: u64,
466 key: &[u8],
467 op: impl FnOnce(&mut RedisObjectBucket) -> (RedisObjectResult, bool),
468 ) -> RedisObjectResult {
469 self.object_write_routed(self.route_key_prehashed(key_hash, key), key, op)
470 }
471
472 fn object_create_hashed(
473 &self,
474 key_hash: u64,
475 key: &[u8],
476 existing: impl FnOnce(&mut RedisObjectBucket, u64) -> RedisObjectWriteAttempt,
477 create: impl FnOnce(&mut RedisObjectBucket, u64) -> RedisObjectResult,
478 ) -> RedisObjectResult {
479 let route = self.route_key_prehashed(key_hash, key);
480 let mut bucket = self.objects.write_bucket(route.shard_id, route.key_hash);
481 if bucket.has_expirations() && bucket.delete_expired(key, now_millis()) {
482 self.objects.note_deleted(route.shard_id);
483 }
484 match existing(&mut bucket, route.key_hash) {
485 RedisObjectWriteAttempt::Complete(result) => result,
486 RedisObjectWriteAttempt::Missing => {
487 if self.string_exists_routed(route, key) {
488 RedisObjectResult::WrongType
489 } else {
490 let result = create(&mut bucket, route.key_hash);
491 self.objects.note_created(route.shard_id);
492 result
493 }
494 }
495 }
496 }
497
498 fn object_write_routed(
499 &self,
500 route: EmbeddedKeyRoute,
501 key: &[u8],
502 op: impl FnOnce(&mut RedisObjectBucket) -> (RedisObjectResult, bool),
503 ) -> RedisObjectResult {
504 let mut bucket = self.objects.write_bucket(route.shard_id, route.key_hash);
505 if bucket.has_expirations() && bucket.delete_expired(key, now_millis()) {
506 self.objects.note_deleted(route.shard_id);
507 }
508 let had_object = bucket.contains_object(key);
509 if !had_object && self.string_exists_routed(route, key) {
510 return RedisObjectResult::WrongType;
511 }
512 let (result, object_changed) = op(&mut bucket);
513 if !had_object && object_changed {
514 self.objects.note_created(route.shard_id);
515 } else if had_object && object_changed {
516 self.objects.note_deleted(route.shard_id);
517 }
518 result
519 }
520
521 pub(crate) fn clone_object_value(&self, key: &[u8]) -> Option<RedisObjectValue> {
522 let route = self.route_key(key);
523 let bucket = self.objects.read_bucket(route.shard_id, route.key_hash);
524 if bucket.has_expirations() && bucket.object_is_expired(key, now_millis()) {
525 drop(bucket);
526 let mut bucket = self.objects.write_bucket(route.shard_id, route.key_hash);
527 if bucket.delete_expired(key, now_millis()) {
528 self.objects.note_deleted(route.shard_id);
529 }
530 return None;
531 }
532 bucket.clone_value(key)
533 }
534
535 pub(crate) fn set_object_value(
536 &self,
537 key: &[u8],
538 value: RedisObjectValue,
539 ttl_ms: Option<u64>,
540 ) {
541 let route = self.route_key(key);
542 let now_ms = now_millis();
543 let expire_at_ms = ttl_ms.map(|ttl| now_ms.saturating_add(ttl));
544 let mut bucket = self.objects.write_bucket(route.shard_id, route.key_hash);
545 let mut shard = self.shards[route.shard_id].write();
546 let had_object = bucket.contains_object(key);
547 if let Some(session_prefix) = point_write_session_storage_prefix(key) {
548 shard
549 .session_slots
550 .delete_hashed(&session_prefix, route.key_hash, key);
551 }
552 shard.map.delete_hashed(route.key_hash, key, now_ms);
553 let created = bucket.insert_value(key.to_vec(), value);
554 if created && !had_object {
555 self.objects.note_created(route.shard_id);
556 }
557 if let Some(expire_at_ms) = expire_at_ms {
558 bucket.expire(key, expire_at_ms, now_ms);
559 }
560 }
561
562 pub fn rename_key(
563 &self,
564 source: &[u8],
565 dest: &[u8],
566 nx: bool,
567 ) -> std::result::Result<bool, RedisObjectError> {
568 if source == dest {
569 if !self.exists(source) {
570 return Err(RedisObjectError::MissingKey);
571 }
572 return Ok(!nx);
573 }
574 if nx && self.exists(dest) {
575 return Ok(false);
576 }
577 let ttl_ms = match self.pttl_millis(source) {
578 ttl if ttl >= 0 => Some(ttl as u64),
579 _ => None,
580 };
581 if let Some(value) = self.get_value_bytes(source) {
582 self.set_value_bytes(dest, value, ttl_ms);
583 self.delete(source);
584 return Ok(true);
585 }
586 if let Some(value) = self.clone_object_value(source) {
587 self.delete(dest);
588 self.set_object_value(dest, value, ttl_ms);
589 self.delete(source);
590 return Ok(true);
591 }
592 Err(RedisObjectError::MissingKey)
593 }
594
595 fn string_exists_routed(&self, route: EmbeddedKeyRoute, key: &[u8]) -> bool {
596 if uses_flat_key_storage(self.route_mode, key) {
597 let shard = self.shards[route.shard_id].read();
598 if shard.map.is_empty() && shard.session_slots.is_empty() {
599 return false;
600 }
601 if shard.map.has_no_ttl_entries() {
602 return shard.map.with_shared_value_bytes_hashed_no_ttl(
603 route.key_hash,
604 key,
605 &mut |_| {},
606 );
607 }
608 return shard.map.with_shared_value_bytes_hashed(
609 route.key_hash,
610 key,
611 now_millis(),
612 &mut |_| {},
613 );
614 }
615 self.with_shared_value_bytes_routed(route, key, &mut |_| {})
616 }
617}