1use std::convert::TryFrom;
2use std::ops::Deref;
3use std::ops::DerefMut;
4use std::os::raw::c_void;
5use std::ptr;
6use std::ptr::NonNull;
7use std::time::Duration;
8
9use libc::size_t;
10use std::os::raw::c_int;
11
12use raw::KeyType;
13
14use crate::native_types::RedisType;
15use crate::raw;
16use crate::redismodule::REDIS_OK;
17pub use crate::redisraw::bindings::*;
18use crate::stream::StreamIterator;
19use crate::RedisError;
20use crate::RedisResult;
21use crate::RedisString;
22use bitflags::bitflags;
23
24#[derive(Clone, Copy, Debug, PartialEq, Eq)]
33pub enum KeyMode {
34 Read,
35 ReadWrite,
36}
37
38bitflags! {
39 pub struct KeyFlags: c_int {
40 const NOTOUCH = REDISMODULE_OPEN_KEY_NOTOUCH as c_int;
42 const NONOTIFY = REDISMODULE_OPEN_KEY_NONOTIFY as c_int;
44 const NOSTATS = REDISMODULE_OPEN_KEY_NOSTATS as c_int;
46 const NOEXPIRE = REDISMODULE_OPEN_KEY_NOEXPIRE as c_int;
48 const NOEFFECTS = REDISMODULE_OPEN_KEY_NOEFFECTS as c_int;
50 const ACCESS_EXPIRED = REDISMODULE_OPEN_KEY_ACCESS_EXPIRED as c_int;
52 }
53}
54
55#[derive(Debug)]
56pub struct RedisKey {
57 pub(crate) ctx: *mut raw::RedisModuleCtx,
58 pub(crate) key_inner: *mut raw::RedisModuleKey,
59}
60
61impl RedisKey {
62 pub fn open(ctx: *mut raw::RedisModuleCtx, key: &RedisString) -> Self {
63 let key_inner = raw::open_key(ctx, key.inner, to_raw_mode(KeyMode::Read));
64 Self { ctx, key_inner }
65 }
66
67 pub fn open_with_flags(
68 ctx: *mut raw::RedisModuleCtx,
69 key: &RedisString,
70 flags: KeyFlags,
71 ) -> Self {
72 let key_inner =
73 raw::open_key_with_flags(ctx, key.inner, to_raw_mode(KeyMode::Read), flags.bits());
74 Self { ctx, key_inner }
75 }
76
77 pub const unsafe fn from_raw_parts(
85 ctx: *mut raw::RedisModuleCtx,
86 key_inner: *mut raw::RedisModuleKey,
87 ) -> Self {
88 Self { ctx, key_inner }
89 }
90
91 pub fn to_raw_parts(self) -> (*mut raw::RedisModuleCtx, *mut raw::RedisModuleKey) {
97 (self.ctx, self.key_inner)
98 }
99
100 pub fn get_value<T>(&self, redis_type: &RedisType) -> Result<Option<&T>, RedisError> {
104 verify_type(self.key_inner, redis_type)?;
105
106 let value =
107 unsafe { raw::RedisModule_ModuleTypeGetValue.unwrap()(self.key_inner).cast::<T>() };
108
109 if value.is_null() {
110 return Ok(None);
111 }
112
113 let value = unsafe { &*value };
114
115 Ok(Some(value))
116 }
117
118 #[must_use]
122 pub fn key_type(&self) -> raw::KeyType {
123 unsafe { raw::RedisModule_KeyType.unwrap()(self.key_inner) }.into()
124 }
125
126 #[must_use]
128 pub fn is_null(&self) -> bool {
129 let null_key: *mut raw::RedisModuleKey = ptr::null_mut();
130 self.key_inner == null_key
131 }
132
133 pub fn read(&self) -> Result<Option<&[u8]>, RedisError> {
134 if self.is_null() {
135 Ok(None)
136 } else {
137 let mut length: size_t = 0;
138 let dma = raw::string_dma(self.key_inner, &mut length, raw::KeyMode::READ);
139 if dma.is_null() {
140 Err(RedisError::Str("Could not read key"))
141 } else {
142 Ok(Some(unsafe {
143 std::slice::from_raw_parts(dma.cast::<u8>(), length)
144 }))
145 }
146 }
147 }
148
149 pub fn hash_get(&self, field: &str) -> Result<Option<RedisString>, RedisError> {
150 let val = if self.is_null() {
151 None
152 } else {
153 hash_mget_key(self.ctx, self.key_inner, &[field])?
154 .pop()
155 .expect("hash_mget_key should return vector of same length as input")
156 };
157 Ok(val)
158 }
159
160 pub fn hash_get_multi<'a, A, B>(
163 &self,
164 fields: &'a [A],
165 ) -> Result<Option<HMGetResult<'a, A, B>>, RedisError>
166 where
167 A: Into<Vec<u8>> + Clone,
168 RedisString: Into<B>,
169 {
170 let val = if self.is_null() {
171 None
172 } else {
173 Some(HMGetResult {
174 fields,
175 values: hash_mget_key(self.ctx, self.key_inner, fields)?,
176 phantom: std::marker::PhantomData,
177 })
178 };
179 Ok(val)
180 }
181
182 pub fn get_stream_iterator(&self, reverse: bool) -> Result<StreamIterator<'_>, RedisError> {
183 StreamIterator::new(self, None, None, false, reverse)
184 }
185
186 pub fn get_stream_range_iterator(
187 &self,
188 from: Option<raw::RedisModuleStreamID>,
189 to: Option<raw::RedisModuleStreamID>,
190 exclusive: bool,
191 reverse: bool,
192 ) -> Result<StreamIterator<'_>, RedisError> {
193 StreamIterator::new(self, from, to, exclusive, reverse)
194 }
195}
196
197impl Drop for RedisKey {
198 fn drop(&mut self) {
200 if !self.key_inner.is_null() {
201 raw::close_key(self.key_inner);
202 }
203 }
204}
205
206pub struct RedisKeyWritable {
209 ctx: *mut raw::RedisModuleCtx,
210 key_inner: *mut raw::RedisModuleKey,
211}
212
213impl RedisKeyWritable {
214 pub fn open(ctx: *mut raw::RedisModuleCtx, key: &RedisString) -> Self {
215 let key_inner = raw::open_key(ctx, key.inner, to_raw_mode(KeyMode::ReadWrite));
216 Self { ctx, key_inner }
217 }
218
219 pub fn open_with_flags(
220 ctx: *mut raw::RedisModuleCtx,
221 key: &RedisString,
222 flags: KeyFlags,
223 ) -> Self {
224 let key_inner = raw::open_key_with_flags(
225 ctx,
226 key.inner,
227 to_raw_mode(KeyMode::ReadWrite),
228 flags.bits(),
229 );
230 Self { ctx, key_inner }
231 }
232
233 #[must_use]
254 pub fn is_empty(&self) -> bool {
255 self.key_type() == KeyType::Empty
256 }
257
258 pub fn as_string_dma(&self) -> Result<StringDMA<'_>, RedisError> {
259 StringDMA::new(self)
260 }
261
262 #[allow(clippy::must_use_candidate)]
263 pub fn hash_set(&self, field: &str, value: RedisString) -> raw::Status {
264 raw::hash_set(self.key_inner, field, value.inner)
265 }
266
267 #[allow(clippy::must_use_candidate)]
268 pub fn hash_del(&self, field: &str) -> raw::Status {
269 raw::hash_del(self.key_inner, field)
270 }
271
272 pub fn hash_get(&self, field: &str) -> Result<Option<RedisString>, RedisError> {
273 Ok(hash_mget_key(self.ctx, self.key_inner, &[field])?
274 .pop()
275 .expect("hash_mget_key should return vector of same length as input"))
276 }
277
278 pub fn hash_get_multi<'a, A, B>(
280 &self,
281 fields: &'a [A],
282 ) -> Result<HMGetResult<'a, A, B>, RedisError>
283 where
284 A: Into<Vec<u8>> + Clone,
285 RedisString: Into<B>,
286 {
287 Ok(HMGetResult {
288 fields,
289 values: hash_mget_key(self.ctx, self.key_inner, fields)?,
290 phantom: std::marker::PhantomData,
291 })
292 }
293
294 #[allow(clippy::must_use_candidate)]
296 pub fn list_push_head(&self, element: RedisString) -> raw::Status {
297 raw::list_push(self.key_inner, raw::Where::ListHead, element.inner)
298 }
299
300 #[allow(clippy::must_use_candidate)]
302 pub fn list_push_tail(&self, element: RedisString) -> raw::Status {
303 raw::list_push(self.key_inner, raw::Where::ListTail, element.inner)
304 }
305
306 #[allow(clippy::must_use_candidate)]
311 pub fn list_pop_head(&self) -> Option<RedisString> {
312 let ptr = raw::list_pop(self.key_inner, raw::Where::ListHead);
313
314 if ptr.is_null() {
315 return None;
316 }
317
318 Some(RedisString::new(NonNull::new(self.ctx), ptr))
319 }
320
321 #[must_use]
326 pub fn list_pop_tail(&self) -> Option<RedisString> {
327 let ptr = raw::list_pop(self.key_inner, raw::Where::ListTail);
328
329 if ptr.is_null() {
330 return None;
331 }
332
333 Some(RedisString::new(NonNull::new(self.ctx), ptr))
334 }
335
336 pub fn set_expire(&self, expire: Duration) -> RedisResult {
337 let exp_millis = expire.as_millis();
338
339 let exp_time = i64::try_from(exp_millis).map_err(|_| {
340 RedisError::String(format!("Error expire duration {exp_millis} is not allowed"))
341 })?;
342
343 match raw::set_expire(self.key_inner, exp_time) {
344 raw::Status::Ok => REDIS_OK,
345
346 raw::Status::Err => Err(RedisError::Str("Error while setting key expire")),
349 }
350 }
351
352 pub fn remove_expire(&self) -> RedisResult {
354 match raw::set_expire(self.key_inner, REDISMODULE_NO_EXPIRE.into()) {
355 raw::Status::Ok => REDIS_OK,
356
357 raw::Status::Err => Err(RedisError::Str("Error while removing key expire")),
360 }
361 }
362
363 pub fn write(&self, val: &str) -> RedisResult {
364 let val_str = RedisString::create(NonNull::new(self.ctx), val);
365 match raw::string_set(self.key_inner, val_str.inner) {
366 raw::Status::Ok => REDIS_OK,
367 raw::Status::Err => Err(RedisError::Str("Error while setting key")),
368 }
369 }
370
371 pub fn delete(&self) -> RedisResult {
375 unsafe { raw::RedisModule_DeleteKey.unwrap()(self.key_inner) };
376 REDIS_OK
377 }
378
379 pub fn unlink(&self) -> RedisResult {
383 unsafe { raw::RedisModule_UnlinkKey.unwrap()(self.key_inner) };
384 REDIS_OK
385 }
386
387 #[must_use]
391 pub fn key_type(&self) -> raw::KeyType {
392 unsafe { raw::RedisModule_KeyType.unwrap()(self.key_inner) }.into()
393 }
394
395 pub fn open_with_redis_string(
396 ctx: *mut raw::RedisModuleCtx,
397 key: *mut raw::RedisModuleString,
398 ) -> Self {
399 let key_inner = raw::open_key(ctx, key, to_raw_mode(KeyMode::ReadWrite));
400 Self { ctx, key_inner }
401 }
402
403 #[allow(clippy::needless_lifetimes)]
409 pub fn get_value<'a, 'b, T>(
410 &'a self,
411 redis_type: &RedisType,
412 ) -> Result<Option<&'b mut T>, RedisError> {
413 verify_type(self.key_inner, redis_type)?;
414 let value =
415 unsafe { raw::RedisModule_ModuleTypeGetValue.unwrap()(self.key_inner).cast::<T>() };
416
417 if value.is_null() {
418 return Ok(None);
419 }
420
421 let value = unsafe { &mut *value };
422 Ok(Some(value))
423 }
424
425 pub fn set_value<T>(&self, redis_type: &RedisType, value: T) -> Result<(), RedisError> {
429 verify_type(self.key_inner, redis_type)?;
430 let value = Box::into_raw(Box::new(value)).cast::<c_void>();
431 let status: raw::Status = unsafe {
432 raw::RedisModule_ModuleTypeSetValue.unwrap()(
433 self.key_inner,
434 *redis_type.raw_type.borrow(),
435 value,
436 )
437 }
438 .into();
439
440 status.into()
441 }
442
443 pub fn trim_stream_by_id(
444 &self,
445 mut id: raw::RedisModuleStreamID,
446 approx: bool,
447 ) -> Result<usize, RedisError> {
448 let flags = if approx {
449 raw::REDISMODULE_STREAM_TRIM_APPROX
450 } else {
451 0
452 };
453 let res = unsafe {
454 raw::RedisModule_StreamTrimByID.unwrap()(self.key_inner, flags as i32, &mut id)
455 };
456 if res <= 0 {
457 Err(RedisError::Str("Failed trimming the stream"))
458 } else {
459 Ok(res as usize)
460 }
461 }
462}
463
464pub struct HMGetResult<'a, A, B>
467where
468 A: Into<Vec<u8>> + Clone,
469 RedisString: Into<B>,
470{
471 fields: &'a [A],
472 values: Vec<Option<RedisString>>,
473 phantom: std::marker::PhantomData<B>,
474}
475
476pub struct HMGetIter<'a, A, B>
477where
478 A: Into<Vec<u8>>,
479 RedisString: Into<B>,
480{
481 fields_iter: std::slice::Iter<'a, A>,
482 values_iter: std::vec::IntoIter<Option<RedisString>>,
483 phantom: std::marker::PhantomData<B>,
484}
485
486impl<'a, A, B> Iterator for HMGetIter<'a, A, B>
487where
488 A: Into<Vec<u8>> + Clone,
489 RedisString: Into<B>,
490{
491 type Item = (A, B);
492
493 fn next(&mut self) -> Option<Self::Item> {
494 loop {
495 let a = self.fields_iter.next();
496 let b = self.values_iter.next();
497 match b {
498 None => return None,
499 Some(None) => continue,
500 Some(Some(rs)) => {
501 return Some((
502 a.expect("field and value slices not of same length")
503 .clone(),
504 rs.into(),
505 ))
506 }
507 }
508 }
509 }
510}
511
512impl<'a, A, B> IntoIterator for HMGetResult<'a, A, B>
513where
514 A: Into<Vec<u8>> + Clone,
515 RedisString: Into<B>,
516{
517 type Item = (A, B);
518 type IntoIter = HMGetIter<'a, A, B>;
519
520 fn into_iter(self) -> Self::IntoIter {
566 Self::IntoIter {
567 fields_iter: self.fields.iter(),
568 values_iter: self.values.into_iter(),
569 phantom: std::marker::PhantomData,
570 }
571 }
572}
573
574pub struct StringDMA<'a> {
575 key: &'a RedisKeyWritable,
576 buffer: &'a mut [u8],
577}
578
579impl<'a> Deref for StringDMA<'a> {
580 type Target = [u8];
581
582 fn deref(&self) -> &Self::Target {
583 self.buffer
584 }
585}
586
587impl<'a> DerefMut for StringDMA<'a> {
588 fn deref_mut(&mut self) -> &mut Self::Target {
589 self.buffer
590 }
591}
592
593impl<'a> StringDMA<'a> {
594 fn new(key: &'a RedisKeyWritable) -> Result<StringDMA<'a>, RedisError> {
595 let mut length: size_t = 0;
596 let dma = raw::string_dma(key.key_inner, &mut length, raw::KeyMode::WRITE);
597 if dma.is_null() {
598 Err(RedisError::Str("Could not read key"))
599 } else {
600 let buffer = unsafe { std::slice::from_raw_parts_mut(dma.cast::<u8>(), length) };
601 Ok(StringDMA { key, buffer })
602 }
603 }
604
605 pub fn write(&mut self, data: &[u8]) -> Result<&mut Self, RedisError> {
606 if self.buffer.len() != data.len() {
607 if raw::Status::Ok == raw::string_truncate(self.key.key_inner, data.len()) {
608 let mut length: size_t = 0;
609 let dma = raw::string_dma(self.key.key_inner, &mut length, raw::KeyMode::WRITE);
610 self.buffer = unsafe { std::slice::from_raw_parts_mut(dma.cast::<u8>(), length) };
611 } else {
612 return Err(RedisError::Str("Failed to truncate string"));
613 }
614 }
615 self.buffer[..data.len()].copy_from_slice(data);
616 Ok(self)
617 }
618
619 pub fn append(&mut self, data: &[u8]) -> Result<&mut Self, RedisError> {
620 let current_len = self.buffer.len();
621 let new_len = current_len + data.len();
622 if raw::Status::Ok == raw::string_truncate(self.key.key_inner, new_len) {
623 let mut length: size_t = 0;
624 let dma = raw::string_dma(self.key.key_inner, &mut length, raw::KeyMode::WRITE);
625 self.buffer = unsafe { std::slice::from_raw_parts_mut(dma.cast::<u8>(), length) };
626 } else {
627 return Err(RedisError::Str("Failed to truncate string"));
628 }
629 self.buffer[current_len..new_len].copy_from_slice(data);
630 Ok(self)
631 }
632}
633
634impl Drop for RedisKeyWritable {
635 fn drop(&mut self) {
637 raw::close_key(self.key_inner);
638 }
639}
640
641fn hash_mget_key<T>(
644 ctx: *mut raw::RedisModuleCtx,
645 key: *mut raw::RedisModuleKey,
646 fields: &[T],
647) -> Result<Vec<Option<RedisString>>, RedisError>
648where
649 T: Into<Vec<u8>> + Clone,
650{
651 const BATCH_SIZE: usize = 12;
652
653 let mut values = Vec::with_capacity(fields.len());
654 let mut values_raw = [std::ptr::null_mut(); BATCH_SIZE];
655
656 for chunk_fields in fields.chunks(BATCH_SIZE) {
657 let chunk_values = &mut values_raw[..chunk_fields.len()];
658 raw::hash_get_multi(key, chunk_fields, chunk_values)?;
659 values.extend(chunk_values.iter().map(|ptr| {
660 if ptr.is_null() {
661 None
662 } else {
663 Some(RedisString::from_redis_module_string(ctx, *ptr))
664 }
665 }));
666 }
667
668 Ok(values)
669}
670
671fn to_raw_mode(mode: KeyMode) -> raw::KeyMode {
672 match mode {
673 KeyMode::Read => raw::KeyMode::READ,
674 KeyMode::ReadWrite => raw::KeyMode::READ | raw::KeyMode::WRITE,
675 }
676}
677
678#[allow(clippy::not_unsafe_ptr_arg_deref)]
682pub fn verify_type(key_inner: *mut raw::RedisModuleKey, redis_type: &RedisType) -> RedisResult {
683 let key_type: KeyType = unsafe { raw::RedisModule_KeyType.unwrap()(key_inner) }.into();
684
685 if key_type != KeyType::Empty {
686 let raw_type = unsafe { raw::RedisModule_ModuleTypeGetType.unwrap()(key_inner) };
688
689 if raw_type != *redis_type.raw_type.borrow() {
690 return Err(RedisError::Str("Existing key has wrong Redis type"));
691 }
692 }
693
694 REDIS_OK
695}