1use std::convert::TryFrom;
2use std::ops::Deref;
3use std::ops::DerefMut;
4use std::os::raw::c_void;
5use std::ptr;
6use std::ptr::NonNull;
7use std::time::Duration;
8
9use libc::size_t;
10use std::os::raw::c_int;
11
12use raw::KeyType;
13
14use crate::native_types::RedisType;
15use crate::raw;
16use crate::redismodule::REDIS_OK;
17pub use crate::redisraw::bindings::*;
18use crate::stream::StreamIterator;
19use crate::RedisError;
20use crate::RedisResult;
21use crate::RedisString;
22use bitflags::bitflags;
23
24#[derive(Clone, Copy, Debug, PartialEq, Eq)]
33pub enum KeyMode {
34 Read,
35 ReadWrite,
36}
37
38bitflags! {
39 pub struct KeyFlags: c_int {
40 const NOTOUCH = REDISMODULE_OPEN_KEY_NOTOUCH as c_int;
42 const NONOTIFY = REDISMODULE_OPEN_KEY_NONOTIFY as c_int;
44 const NOSTATS = REDISMODULE_OPEN_KEY_NOSTATS as c_int;
46 const NOEXPIRE = REDISMODULE_OPEN_KEY_NOEXPIRE as c_int;
48 const NOEFFECTS = REDISMODULE_OPEN_KEY_NOEFFECTS as c_int;
50 }
51}
52
53#[derive(Debug)]
54pub struct RedisKey {
55 pub(crate) ctx: *mut raw::RedisModuleCtx,
56 pub(crate) key_inner: *mut raw::RedisModuleKey,
57}
58
59impl RedisKey {
60 pub(crate) fn take(mut self) -> *mut raw::RedisModuleKey {
61 let res = self.key_inner;
62 self.key_inner = std::ptr::null_mut();
63 res
64 }
65
66 pub fn open(ctx: *mut raw::RedisModuleCtx, key: &RedisString) -> Self {
67 let key_inner = raw::open_key(ctx, key.inner, to_raw_mode(KeyMode::Read));
68 Self { ctx, key_inner }
69 }
70
71 pub(crate) fn open_with_flags(
72 ctx: *mut raw::RedisModuleCtx,
73 key: &RedisString,
74 flags: KeyFlags,
75 ) -> Self {
76 let key_inner =
77 raw::open_key_with_flags(ctx, key.inner, to_raw_mode(KeyMode::Read), flags.bits());
78 Self { ctx, key_inner }
79 }
80
81 pub(crate) const fn from_raw_parts(
82 ctx: *mut raw::RedisModuleCtx,
83 key_inner: *mut raw::RedisModuleKey,
84 ) -> Self {
85 Self { ctx, key_inner }
86 }
87
88 pub fn get_value<T>(&self, redis_type: &RedisType) -> Result<Option<&T>, RedisError> {
92 verify_type(self.key_inner, redis_type)?;
93
94 let value =
95 unsafe { raw::RedisModule_ModuleTypeGetValue.unwrap()(self.key_inner).cast::<T>() };
96
97 if value.is_null() {
98 return Ok(None);
99 }
100
101 let value = unsafe { &*value };
102
103 Ok(Some(value))
104 }
105
106 #[must_use]
110 pub fn key_type(&self) -> raw::KeyType {
111 unsafe { raw::RedisModule_KeyType.unwrap()(self.key_inner) }.into()
112 }
113
114 #[must_use]
116 pub fn is_null(&self) -> bool {
117 let null_key: *mut raw::RedisModuleKey = ptr::null_mut();
118 self.key_inner == null_key
119 }
120
121 pub fn read(&self) -> Result<Option<&[u8]>, RedisError> {
122 if self.is_null() {
123 Ok(None)
124 } else {
125 let mut length: size_t = 0;
126 let dma = raw::string_dma(self.key_inner, &mut length, raw::KeyMode::READ);
127 if dma.is_null() {
128 Err(RedisError::Str("Could not read key"))
129 } else {
130 Ok(Some(unsafe {
131 std::slice::from_raw_parts(dma.cast::<u8>(), length)
132 }))
133 }
134 }
135 }
136
137 pub fn hash_get(&self, field: &str) -> Result<Option<RedisString>, RedisError> {
138 let val = if self.is_null() {
139 None
140 } else {
141 hash_mget_key(self.ctx, self.key_inner, &[field])?
142 .pop()
143 .expect("hash_mget_key should return vector of same length as input")
144 };
145 Ok(val)
146 }
147
148 pub fn hash_get_multi<'a, A, B>(
151 &self,
152 fields: &'a [A],
153 ) -> Result<Option<HMGetResult<'a, A, B>>, RedisError>
154 where
155 A: Into<Vec<u8>> + Clone,
156 RedisString: Into<B>,
157 {
158 let val = if self.is_null() {
159 None
160 } else {
161 Some(HMGetResult {
162 fields,
163 values: hash_mget_key(self.ctx, self.key_inner, fields)?,
164 phantom: std::marker::PhantomData,
165 })
166 };
167 Ok(val)
168 }
169
170 pub fn get_stream_iterator(&self, reverse: bool) -> Result<StreamIterator, RedisError> {
171 StreamIterator::new(self, None, None, false, reverse)
172 }
173
174 pub fn get_stream_range_iterator(
175 &self,
176 from: Option<raw::RedisModuleStreamID>,
177 to: Option<raw::RedisModuleStreamID>,
178 exclusive: bool,
179 reverse: bool,
180 ) -> Result<StreamIterator, RedisError> {
181 StreamIterator::new(self, from, to, exclusive, reverse)
182 }
183}
184
185impl Drop for RedisKey {
186 fn drop(&mut self) {
188 if !self.key_inner.is_null() {
189 raw::close_key(self.key_inner);
190 }
191 }
192}
193
194pub struct RedisKeyWritable {
197 ctx: *mut raw::RedisModuleCtx,
198 key_inner: *mut raw::RedisModuleKey,
199}
200
201impl RedisKeyWritable {
202 pub fn open(ctx: *mut raw::RedisModuleCtx, key: &RedisString) -> Self {
203 let key_inner = raw::open_key(ctx, key.inner, to_raw_mode(KeyMode::ReadWrite));
204 Self { ctx, key_inner }
205 }
206
207 pub(crate) fn open_with_flags(
208 ctx: *mut raw::RedisModuleCtx,
209 key: &RedisString,
210 flags: KeyFlags,
211 ) -> Self {
212 let key_inner = raw::open_key_with_flags(
213 ctx,
214 key.inner,
215 to_raw_mode(KeyMode::ReadWrite),
216 flags.bits(),
217 );
218 Self { ctx, key_inner }
219 }
220
221 pub fn as_string_dma(&self) -> Result<StringDMA, RedisError> {
239 StringDMA::new(self)
240 }
241
242 #[allow(clippy::must_use_candidate)]
243 pub fn hash_set(&self, field: &str, value: RedisString) -> raw::Status {
244 raw::hash_set(self.key_inner, field, value.inner)
245 }
246
247 #[allow(clippy::must_use_candidate)]
248 pub fn hash_del(&self, field: &str) -> raw::Status {
249 raw::hash_del(self.key_inner, field)
250 }
251
252 pub fn hash_get(&self, field: &str) -> Result<Option<RedisString>, RedisError> {
253 Ok(hash_mget_key(self.ctx, self.key_inner, &[field])?
254 .pop()
255 .expect("hash_mget_key should return vector of same length as input"))
256 }
257
258 pub fn hash_get_multi<'a, A, B>(
260 &self,
261 fields: &'a [A],
262 ) -> Result<HMGetResult<'a, A, B>, RedisError>
263 where
264 A: Into<Vec<u8>> + Clone,
265 RedisString: Into<B>,
266 {
267 Ok(HMGetResult {
268 fields,
269 values: hash_mget_key(self.ctx, self.key_inner, fields)?,
270 phantom: std::marker::PhantomData,
271 })
272 }
273
274 #[allow(clippy::must_use_candidate)]
276 pub fn list_push_head(&self, element: RedisString) -> raw::Status {
277 raw::list_push(self.key_inner, raw::Where::ListHead, element.inner)
278 }
279
280 #[allow(clippy::must_use_candidate)]
282 pub fn list_push_tail(&self, element: RedisString) -> raw::Status {
283 raw::list_push(self.key_inner, raw::Where::ListTail, element.inner)
284 }
285
286 #[allow(clippy::must_use_candidate)]
291 pub fn list_pop_head(&self) -> Option<RedisString> {
292 let ptr = raw::list_pop(self.key_inner, raw::Where::ListHead);
293
294 if ptr.is_null() {
295 return None;
296 }
297
298 Some(RedisString::new(NonNull::new(self.ctx), ptr))
299 }
300
301 #[must_use]
306 pub fn list_pop_tail(&self) -> Option<RedisString> {
307 let ptr = raw::list_pop(self.key_inner, raw::Where::ListTail);
308
309 if ptr.is_null() {
310 return None;
311 }
312
313 Some(RedisString::new(NonNull::new(self.ctx), ptr))
314 }
315
316 pub fn set_expire(&self, expire: Duration) -> RedisResult {
317 let exp_millis = expire.as_millis();
318
319 let exp_time = i64::try_from(exp_millis).map_err(|_| {
320 RedisError::String(format!("Error expire duration {exp_millis} is not allowed"))
321 })?;
322
323 match raw::set_expire(self.key_inner, exp_time) {
324 raw::Status::Ok => REDIS_OK,
325
326 raw::Status::Err => Err(RedisError::Str("Error while setting key expire")),
329 }
330 }
331
332 pub fn write(&self, val: &str) -> RedisResult {
333 let val_str = RedisString::create(NonNull::new(self.ctx), val);
334 match raw::string_set(self.key_inner, val_str.inner) {
335 raw::Status::Ok => REDIS_OK,
336 raw::Status::Err => Err(RedisError::Str("Error while setting key")),
337 }
338 }
339
340 pub fn delete(&self) -> RedisResult {
344 unsafe { raw::RedisModule_DeleteKey.unwrap()(self.key_inner) };
345 REDIS_OK
346 }
347
348 pub fn unlink(&self) -> RedisResult {
352 unsafe { raw::RedisModule_UnlinkKey.unwrap()(self.key_inner) };
353 REDIS_OK
354 }
355
356 #[must_use]
360 pub fn key_type(&self) -> raw::KeyType {
361 unsafe { raw::RedisModule_KeyType.unwrap()(self.key_inner) }.into()
362 }
363
364 #[must_use]
365 pub fn is_empty(&self) -> bool {
366 self.key_type() == KeyType::Empty
367 }
368
369 pub fn open_with_redis_string(
370 ctx: *mut raw::RedisModuleCtx,
371 key: *mut raw::RedisModuleString,
372 ) -> Self {
373 let key_inner = raw::open_key(ctx, key, to_raw_mode(KeyMode::ReadWrite));
374 Self { ctx, key_inner }
375 }
376
377 #[allow(clippy::needless_lifetimes)]
383 pub fn get_value<'a, 'b, T>(
384 &'a self,
385 redis_type: &RedisType,
386 ) -> Result<Option<&'b mut T>, RedisError> {
387 verify_type(self.key_inner, redis_type)?;
388 let value =
389 unsafe { raw::RedisModule_ModuleTypeGetValue.unwrap()(self.key_inner).cast::<T>() };
390
391 if value.is_null() {
392 return Ok(None);
393 }
394
395 let value = unsafe { &mut *value };
396 Ok(Some(value))
397 }
398
399 pub fn set_value<T>(&self, redis_type: &RedisType, value: T) -> Result<(), RedisError> {
403 verify_type(self.key_inner, redis_type)?;
404 let value = Box::into_raw(Box::new(value)).cast::<c_void>();
405 let status: raw::Status = unsafe {
406 raw::RedisModule_ModuleTypeSetValue.unwrap()(
407 self.key_inner,
408 *redis_type.raw_type.borrow(),
409 value,
410 )
411 }
412 .into();
413
414 status.into()
415 }
416
417 pub fn trim_stream_by_id(
418 &self,
419 mut id: raw::RedisModuleStreamID,
420 approx: bool,
421 ) -> Result<usize, RedisError> {
422 let flags = if approx {
423 raw::REDISMODULE_STREAM_TRIM_APPROX
424 } else {
425 0
426 };
427 let res = unsafe {
428 raw::RedisModule_StreamTrimByID.unwrap()(self.key_inner, flags as i32, &mut id)
429 };
430 if res <= 0 {
431 Err(RedisError::Str("Failed trimming the stream"))
432 } else {
433 Ok(res as usize)
434 }
435 }
436}
437
438pub struct HMGetResult<'a, A, B>
441where
442 A: Into<Vec<u8>> + Clone,
443 RedisString: Into<B>,
444{
445 fields: &'a [A],
446 values: Vec<Option<RedisString>>,
447 phantom: std::marker::PhantomData<B>,
448}
449
450pub struct HMGetIter<'a, A, B>
451where
452 A: Into<Vec<u8>>,
453 RedisString: Into<B>,
454{
455 fields_iter: std::slice::Iter<'a, A>,
456 values_iter: std::vec::IntoIter<Option<RedisString>>,
457 phantom: std::marker::PhantomData<B>,
458}
459
460impl<'a, A, B> Iterator for HMGetIter<'a, A, B>
461where
462 A: Into<Vec<u8>> + Clone,
463 RedisString: Into<B>,
464{
465 type Item = (A, B);
466
467 fn next(&mut self) -> Option<Self::Item> {
468 loop {
469 let a = self.fields_iter.next();
470 let b = self.values_iter.next();
471 match b {
472 None => return None,
473 Some(None) => continue,
474 Some(Some(rs)) => {
475 return Some((
476 a.expect("field and value slices not of same length")
477 .clone(),
478 rs.into(),
479 ))
480 }
481 }
482 }
483 }
484}
485
486impl<'a, A, B> IntoIterator for HMGetResult<'a, A, B>
487where
488 A: Into<Vec<u8>> + Clone,
489 RedisString: Into<B>,
490{
491 type Item = (A, B);
492 type IntoIter = HMGetIter<'a, A, B>;
493
494 fn into_iter(self) -> Self::IntoIter {
540 Self::IntoIter {
541 fields_iter: self.fields.iter(),
542 values_iter: self.values.into_iter(),
543 phantom: std::marker::PhantomData,
544 }
545 }
546}
547
548pub struct StringDMA<'a> {
549 key: &'a RedisKeyWritable,
550 buffer: &'a mut [u8],
551}
552
553impl<'a> Deref for StringDMA<'a> {
554 type Target = [u8];
555
556 fn deref(&self) -> &Self::Target {
557 self.buffer
558 }
559}
560
561impl<'a> DerefMut for StringDMA<'a> {
562 fn deref_mut(&mut self) -> &mut Self::Target {
563 self.buffer
564 }
565}
566
567impl<'a> StringDMA<'a> {
568 fn new(key: &'a RedisKeyWritable) -> Result<StringDMA<'a>, RedisError> {
569 let mut length: size_t = 0;
570 let dma = raw::string_dma(key.key_inner, &mut length, raw::KeyMode::WRITE);
571 if dma.is_null() {
572 Err(RedisError::Str("Could not read key"))
573 } else {
574 let buffer = unsafe { std::slice::from_raw_parts_mut(dma.cast::<u8>(), length) };
575 Ok(StringDMA { key, buffer })
576 }
577 }
578
579 pub fn write(&mut self, data: &[u8]) -> Result<&mut Self, RedisError> {
580 if self.buffer.len() != data.len() {
581 if raw::Status::Ok == raw::string_truncate(self.key.key_inner, data.len()) {
582 let mut length: size_t = 0;
583 let dma = raw::string_dma(self.key.key_inner, &mut length, raw::KeyMode::WRITE);
584 self.buffer = unsafe { std::slice::from_raw_parts_mut(dma.cast::<u8>(), length) };
585 } else {
586 return Err(RedisError::Str("Failed to truncate string"));
587 }
588 }
589 self.buffer[..data.len()].copy_from_slice(data);
590 Ok(self)
591 }
592
593 pub fn append(&mut self, data: &[u8]) -> Result<&mut Self, RedisError> {
594 let current_len = self.buffer.len();
595 let new_len = current_len + data.len();
596 if raw::Status::Ok == raw::string_truncate(self.key.key_inner, new_len) {
597 let mut length: size_t = 0;
598 let dma = raw::string_dma(self.key.key_inner, &mut length, raw::KeyMode::WRITE);
599 self.buffer = unsafe { std::slice::from_raw_parts_mut(dma.cast::<u8>(), length) };
600 } else {
601 return Err(RedisError::Str("Failed to truncate string"));
602 }
603 self.buffer[current_len..new_len].copy_from_slice(data);
604 Ok(self)
605 }
606}
607
608impl Drop for RedisKeyWritable {
609 fn drop(&mut self) {
611 raw::close_key(self.key_inner);
612 }
613}
614
615fn hash_mget_key<T>(
618 ctx: *mut raw::RedisModuleCtx,
619 key: *mut raw::RedisModuleKey,
620 fields: &[T],
621) -> Result<Vec<Option<RedisString>>, RedisError>
622where
623 T: Into<Vec<u8>> + Clone,
624{
625 const BATCH_SIZE: usize = 12;
626
627 let mut values = Vec::with_capacity(fields.len());
628 let mut values_raw = [std::ptr::null_mut(); BATCH_SIZE];
629
630 for chunk_fields in fields.chunks(BATCH_SIZE) {
631 let chunk_values = &mut values_raw[..chunk_fields.len()];
632 raw::hash_get_multi(key, chunk_fields, chunk_values)?;
633 values.extend(chunk_values.iter().map(|ptr| {
634 if ptr.is_null() {
635 None
636 } else {
637 Some(RedisString::from_redis_module_string(ctx, *ptr))
638 }
639 }));
640 }
641
642 Ok(values)
643}
644
645fn to_raw_mode(mode: KeyMode) -> raw::KeyMode {
646 match mode {
647 KeyMode::Read => raw::KeyMode::READ,
648 KeyMode::ReadWrite => raw::KeyMode::READ | raw::KeyMode::WRITE,
649 }
650}
651
652#[allow(clippy::not_unsafe_ptr_arg_deref)]
656pub fn verify_type(key_inner: *mut raw::RedisModuleKey, redis_type: &RedisType) -> RedisResult {
657 let key_type: KeyType = unsafe { raw::RedisModule_KeyType.unwrap()(key_inner) }.into();
658
659 if key_type != KeyType::Empty {
660 let raw_type = unsafe { raw::RedisModule_ModuleTypeGetType.unwrap()(key_inner) };
662
663 if raw_type != *redis_type.raw_type.borrow() {
664 return Err(RedisError::Str("Existing key has wrong Redis type"));
665 }
666 }
667
668 REDIS_OK
669}