1use std::convert::TryFrom;
2use std::ops::Deref;
3use std::ops::DerefMut;
4use std::os::raw::c_void;
5use std::ptr;
6use std::ptr::NonNull;
7use std::time::Duration;
8
9use libc::size_t;
10use std::os::raw::c_int;
11
12use raw::KeyType;
13
14use crate::native_types::ValkeyType;
15use crate::raw;
16use crate::redismodule::VALKEY_OK;
17pub use crate::redisraw::bindings::*;
18use crate::stream::StreamIterator;
19use crate::ValkeyError;
20use crate::ValkeyResult;
21use crate::ValkeyString;
22use bitflags::bitflags;
23
24#[derive(Clone, Copy, Debug, PartialEq, Eq)]
33pub enum KeyMode {
34 Read,
35 ReadWrite,
36}
37
38bitflags! {
39 pub struct KeyFlags: c_int {
40 const NOTOUCH = REDISMODULE_OPEN_KEY_NOTOUCH as c_int;
42 const NONOTIFY = REDISMODULE_OPEN_KEY_NONOTIFY as c_int;
44 const NOSTATS = REDISMODULE_OPEN_KEY_NOSTATS as c_int;
46 const NOEXPIRE = REDISMODULE_OPEN_KEY_NOEXPIRE as c_int;
48 const NOEFFECTS = REDISMODULE_OPEN_KEY_NOEFFECTS as c_int;
50 }
51}
52
53#[derive(Debug)]
54pub struct ValkeyKey {
55 pub(crate) ctx: *mut raw::RedisModuleCtx,
56 pub(crate) key_inner: *mut raw::RedisModuleKey,
57}
58
59impl ValkeyKey {
60 pub(crate) fn take(mut self) -> *mut raw::RedisModuleKey {
61 let res = self.key_inner;
62 self.key_inner = std::ptr::null_mut();
63 res
64 }
65
66 pub fn open(ctx: *mut raw::RedisModuleCtx, key: &ValkeyString) -> Self {
67 let key_inner = raw::open_key(ctx, key.inner, to_raw_mode(KeyMode::Read));
68 Self { ctx, key_inner }
69 }
70
71 pub(crate) fn open_with_flags(
72 ctx: *mut raw::RedisModuleCtx,
73 key: &ValkeyString,
74 flags: KeyFlags,
75 ) -> Self {
76 let key_inner =
77 raw::open_key_with_flags(ctx, key.inner, to_raw_mode(KeyMode::Read), flags.bits());
78 Self { ctx, key_inner }
79 }
80
81 pub(crate) const fn from_raw_parts(
82 ctx: *mut raw::RedisModuleCtx,
83 key_inner: *mut raw::RedisModuleKey,
84 ) -> Self {
85 Self { ctx, key_inner }
86 }
87
88 pub fn get_value<T>(&self, redis_type: &ValkeyType) -> Result<Option<&T>, ValkeyError> {
92 verify_type(self.key_inner, redis_type)?;
93
94 let value =
95 unsafe { raw::RedisModule_ModuleTypeGetValue.unwrap()(self.key_inner).cast::<T>() };
96
97 if value.is_null() {
98 return Ok(None);
99 }
100
101 let value = unsafe { &*value };
102
103 Ok(Some(value))
104 }
105
106 #[must_use]
110 pub fn key_type(&self) -> raw::KeyType {
111 unsafe { raw::RedisModule_KeyType.unwrap()(self.key_inner) }.into()
112 }
113
114 #[must_use]
116 pub fn is_null(&self) -> bool {
117 let null_key: *mut raw::RedisModuleKey = ptr::null_mut();
118 self.key_inner == null_key
119 }
120
121 pub fn read(&self) -> Result<Option<&[u8]>, ValkeyError> {
122 if self.is_null() {
123 Ok(None)
124 } else {
125 let mut length: size_t = 0;
126 let dma = raw::string_dma(self.key_inner, &mut length, raw::KeyMode::READ);
127 if dma.is_null() {
128 Err(ValkeyError::Str("Could not read key"))
129 } else {
130 Ok(Some(unsafe {
131 std::slice::from_raw_parts(dma.cast::<u8>(), length)
132 }))
133 }
134 }
135 }
136
137 pub fn hash_get(&self, field: &str) -> Result<Option<ValkeyString>, ValkeyError> {
138 let val = if self.is_null() {
139 None
140 } else {
141 hash_mget_key(self.ctx, self.key_inner, &[field])?
142 .pop()
143 .expect("hash_mget_key should return vector of same length as input")
144 };
145 Ok(val)
146 }
147
148 pub fn hash_get_multi<'a, A, B>(
151 &self,
152 fields: &'a [A],
153 ) -> Result<Option<HMGetResult<'a, A, B>>, ValkeyError>
154 where
155 A: Into<Vec<u8>> + Clone,
156 ValkeyString: Into<B>,
157 {
158 let val = if self.is_null() {
159 None
160 } else {
161 Some(HMGetResult {
162 fields,
163 values: hash_mget_key(self.ctx, self.key_inner, fields)?,
164 phantom: std::marker::PhantomData,
165 })
166 };
167 Ok(val)
168 }
169
170 pub fn get_stream_iterator(&self, reverse: bool) -> Result<StreamIterator, ValkeyError> {
171 StreamIterator::new(self, None, None, false, reverse)
172 }
173
174 pub fn get_stream_range_iterator(
175 &self,
176 from: Option<raw::RedisModuleStreamID>,
177 to: Option<raw::RedisModuleStreamID>,
178 exclusive: bool,
179 reverse: bool,
180 ) -> Result<StreamIterator, ValkeyError> {
181 StreamIterator::new(self, from, to, exclusive, reverse)
182 }
183}
184
185impl Drop for ValkeyKey {
186 fn drop(&mut self) {
188 if !self.key_inner.is_null() {
189 raw::close_key(self.key_inner);
190 }
191 }
192}
193
194pub struct ValkeyKeyWritable {
197 ctx: *mut raw::RedisModuleCtx,
198 key_inner: *mut raw::RedisModuleKey,
199}
200
201impl ValkeyKeyWritable {
202 pub fn open(ctx: *mut raw::RedisModuleCtx, key: &ValkeyString) -> Self {
203 let key_inner = raw::open_key(ctx, key.inner, to_raw_mode(KeyMode::ReadWrite));
204 Self { ctx, key_inner }
205 }
206
207 pub(crate) fn open_with_flags(
208 ctx: *mut raw::RedisModuleCtx,
209 key: &ValkeyString,
210 flags: KeyFlags,
211 ) -> Self {
212 let key_inner = raw::open_key_with_flags(
213 ctx,
214 key.inner,
215 to_raw_mode(KeyMode::ReadWrite),
216 flags.bits(),
217 );
218 Self { ctx, key_inner }
219 }
220
221 #[must_use]
242 pub fn is_empty(&self) -> bool {
243 self.key_type() == KeyType::Empty
244 }
245
246 pub fn as_string_dma(&self) -> Result<StringDMA, ValkeyError> {
247 StringDMA::new(self)
248 }
249
250 #[allow(clippy::must_use_candidate)]
251 pub fn hash_set(&self, field: &str, value: ValkeyString) -> raw::Status {
252 raw::hash_set(self.key_inner, field, value.inner)
253 }
254
255 #[allow(clippy::must_use_candidate)]
256 pub fn hash_del(&self, field: &str) -> raw::Status {
257 raw::hash_del(self.key_inner, field)
258 }
259
260 pub fn hash_get(&self, field: &str) -> Result<Option<ValkeyString>, ValkeyError> {
261 Ok(hash_mget_key(self.ctx, self.key_inner, &[field])?
262 .pop()
263 .expect("hash_mget_key should return vector of same length as input"))
264 }
265
266 pub fn hash_get_multi<'a, A, B>(
268 &self,
269 fields: &'a [A],
270 ) -> Result<HMGetResult<'a, A, B>, ValkeyError>
271 where
272 A: Into<Vec<u8>> + Clone,
273 ValkeyString: Into<B>,
274 {
275 Ok(HMGetResult {
276 fields,
277 values: hash_mget_key(self.ctx, self.key_inner, fields)?,
278 phantom: std::marker::PhantomData,
279 })
280 }
281
282 #[allow(clippy::must_use_candidate)]
284 pub fn list_push_head(&self, element: ValkeyString) -> raw::Status {
285 raw::list_push(self.key_inner, raw::Where::ListHead, element.inner)
286 }
287
288 #[allow(clippy::must_use_candidate)]
290 pub fn list_push_tail(&self, element: ValkeyString) -> raw::Status {
291 raw::list_push(self.key_inner, raw::Where::ListTail, element.inner)
292 }
293
294 #[allow(clippy::must_use_candidate)]
299 pub fn list_pop_head(&self) -> Option<ValkeyString> {
300 let ptr = raw::list_pop(self.key_inner, raw::Where::ListHead);
301
302 if ptr.is_null() {
303 return None;
304 }
305
306 Some(ValkeyString::new(NonNull::new(self.ctx), ptr))
307 }
308
309 #[must_use]
314 pub fn list_pop_tail(&self) -> Option<ValkeyString> {
315 let ptr = raw::list_pop(self.key_inner, raw::Where::ListTail);
316
317 if ptr.is_null() {
318 return None;
319 }
320
321 Some(ValkeyString::new(NonNull::new(self.ctx), ptr))
322 }
323
324 pub fn set_expire(&self, expire: Duration) -> ValkeyResult {
325 let exp_millis = expire.as_millis();
326
327 let exp_time = i64::try_from(exp_millis).map_err(|_| {
328 ValkeyError::String(format!("Error expire duration {exp_millis} is not allowed"))
329 })?;
330
331 match raw::set_expire(self.key_inner, exp_time) {
332 raw::Status::Ok => VALKEY_OK,
333
334 raw::Status::Err => Err(ValkeyError::Str("Error while setting key expire")),
337 }
338 }
339
340 pub fn remove_expire(&self) -> ValkeyResult {
342 match raw::set_expire(self.key_inner, REDISMODULE_NO_EXPIRE.into()) {
343 raw::Status::Ok => VALKEY_OK,
344
345 raw::Status::Err => Err(ValkeyError::Str("Error while removing key expire")),
348 }
349 }
350
351 pub fn write(&self, val: &str) -> ValkeyResult {
352 let val_str = ValkeyString::create(NonNull::new(self.ctx), val);
353 match raw::string_set(self.key_inner, val_str.inner) {
354 raw::Status::Ok => VALKEY_OK,
355 raw::Status::Err => Err(ValkeyError::Str("Error while setting key")),
356 }
357 }
358
359 pub fn delete(&self) -> ValkeyResult {
363 unsafe { raw::RedisModule_DeleteKey.unwrap()(self.key_inner) };
364 VALKEY_OK
365 }
366
367 pub fn unlink(&self) -> ValkeyResult {
371 unsafe { raw::RedisModule_UnlinkKey.unwrap()(self.key_inner) };
372 VALKEY_OK
373 }
374
375 #[must_use]
379 pub fn key_type(&self) -> raw::KeyType {
380 unsafe { raw::RedisModule_KeyType.unwrap()(self.key_inner) }.into()
381 }
382
383 pub fn open_with_redis_string(
384 ctx: *mut raw::RedisModuleCtx,
385 key: *mut raw::RedisModuleString,
386 ) -> Self {
387 let key_inner = raw::open_key(ctx, key, to_raw_mode(KeyMode::ReadWrite));
388 Self { ctx, key_inner }
389 }
390
391 #[allow(clippy::needless_lifetimes)]
397 pub fn get_value<'a, 'b, T>(
398 &'a self,
399 redis_type: &ValkeyType,
400 ) -> Result<Option<&'b mut T>, ValkeyError> {
401 verify_type(self.key_inner, redis_type)?;
402 let value =
403 unsafe { raw::RedisModule_ModuleTypeGetValue.unwrap()(self.key_inner).cast::<T>() };
404
405 if value.is_null() {
406 return Ok(None);
407 }
408
409 let value = unsafe { &mut *value };
410 Ok(Some(value))
411 }
412
413 pub fn set_value<T>(&self, redis_type: &ValkeyType, value: T) -> Result<(), ValkeyError> {
417 verify_type(self.key_inner, redis_type)?;
418 let value = Box::into_raw(Box::new(value)).cast::<c_void>();
419 let status: raw::Status = unsafe {
420 raw::RedisModule_ModuleTypeSetValue.unwrap()(
421 self.key_inner,
422 *redis_type.raw_type.borrow(),
423 value,
424 )
425 }
426 .into();
427
428 status.into()
429 }
430
431 pub fn trim_stream_by_id(
432 &self,
433 mut id: raw::RedisModuleStreamID,
434 approx: bool,
435 ) -> Result<usize, ValkeyError> {
436 let flags = if approx {
437 raw::REDISMODULE_STREAM_TRIM_APPROX
438 } else {
439 0
440 };
441 let res = unsafe {
442 raw::RedisModule_StreamTrimByID.unwrap()(self.key_inner, flags as i32, &mut id)
443 };
444 if res <= 0 {
445 Err(ValkeyError::Str("Failed trimming the stream"))
446 } else {
447 Ok(res as usize)
448 }
449 }
450}
451
452pub struct HMGetResult<'a, A, B>
455where
456 A: Into<Vec<u8>> + Clone,
457 ValkeyString: Into<B>,
458{
459 fields: &'a [A],
460 values: Vec<Option<ValkeyString>>,
461 phantom: std::marker::PhantomData<B>,
462}
463
464pub struct HMGetIter<'a, A, B>
465where
466 A: Into<Vec<u8>>,
467 ValkeyString: Into<B>,
468{
469 fields_iter: std::slice::Iter<'a, A>,
470 values_iter: std::vec::IntoIter<Option<ValkeyString>>,
471 phantom: std::marker::PhantomData<B>,
472}
473
474impl<'a, A, B> Iterator for HMGetIter<'a, A, B>
475where
476 A: Into<Vec<u8>> + Clone,
477 ValkeyString: Into<B>,
478{
479 type Item = (A, B);
480
481 fn next(&mut self) -> Option<Self::Item> {
482 loop {
483 let a = self.fields_iter.next();
484 let b = self.values_iter.next();
485 match b {
486 None => return None,
487 Some(None) => continue,
488 Some(Some(rs)) => {
489 return Some((
490 a.expect("field and value slices not of same length")
491 .clone(),
492 rs.into(),
493 ))
494 }
495 }
496 }
497 }
498}
499
500impl<'a, A, B> IntoIterator for HMGetResult<'a, A, B>
501where
502 A: Into<Vec<u8>> + Clone,
503 ValkeyString: Into<B>,
504{
505 type Item = (A, B);
506 type IntoIter = HMGetIter<'a, A, B>;
507
508 fn into_iter(self) -> Self::IntoIter {
554 Self::IntoIter {
555 fields_iter: self.fields.iter(),
556 values_iter: self.values.into_iter(),
557 phantom: std::marker::PhantomData,
558 }
559 }
560}
561
562pub struct StringDMA<'a> {
563 key: &'a ValkeyKeyWritable,
564 buffer: &'a mut [u8],
565}
566
567impl<'a> Deref for StringDMA<'a> {
568 type Target = [u8];
569
570 fn deref(&self) -> &Self::Target {
571 self.buffer
572 }
573}
574
575impl<'a> DerefMut for StringDMA<'a> {
576 fn deref_mut(&mut self) -> &mut Self::Target {
577 self.buffer
578 }
579}
580
581impl<'a> StringDMA<'a> {
582 fn new(key: &'a ValkeyKeyWritable) -> Result<StringDMA<'a>, ValkeyError> {
583 let mut length: size_t = 0;
584 let dma = raw::string_dma(key.key_inner, &mut length, raw::KeyMode::WRITE);
585 if dma.is_null() {
586 Err(ValkeyError::Str("Could not read key"))
587 } else {
588 let buffer = unsafe { std::slice::from_raw_parts_mut(dma.cast::<u8>(), length) };
589 Ok(StringDMA { key, buffer })
590 }
591 }
592
593 pub fn write(&mut self, data: &[u8]) -> Result<&mut Self, ValkeyError> {
594 if self.buffer.len() != data.len() {
595 if raw::Status::Ok == raw::string_truncate(self.key.key_inner, data.len()) {
596 let mut length: size_t = 0;
597 let dma = raw::string_dma(self.key.key_inner, &mut length, raw::KeyMode::WRITE);
598 self.buffer = unsafe { std::slice::from_raw_parts_mut(dma.cast::<u8>(), length) };
599 } else {
600 return Err(ValkeyError::Str("Failed to truncate string"));
601 }
602 }
603 self.buffer[..data.len()].copy_from_slice(data);
604 Ok(self)
605 }
606
607 pub fn append(&mut self, data: &[u8]) -> Result<&mut Self, ValkeyError> {
608 let current_len = self.buffer.len();
609 let new_len = current_len + data.len();
610 if raw::Status::Ok == raw::string_truncate(self.key.key_inner, new_len) {
611 let mut length: size_t = 0;
612 let dma = raw::string_dma(self.key.key_inner, &mut length, raw::KeyMode::WRITE);
613 self.buffer = unsafe { std::slice::from_raw_parts_mut(dma.cast::<u8>(), length) };
614 } else {
615 return Err(ValkeyError::Str("Failed to truncate string"));
616 }
617 self.buffer[current_len..new_len].copy_from_slice(data);
618 Ok(self)
619 }
620}
621
622impl Drop for ValkeyKeyWritable {
623 fn drop(&mut self) {
625 raw::close_key(self.key_inner);
626 }
627}
628
629fn hash_mget_key<T>(
632 ctx: *mut raw::RedisModuleCtx,
633 key: *mut raw::RedisModuleKey,
634 fields: &[T],
635) -> Result<Vec<Option<ValkeyString>>, ValkeyError>
636where
637 T: Into<Vec<u8>> + Clone,
638{
639 const BATCH_SIZE: usize = 12;
640
641 let mut values = Vec::with_capacity(fields.len());
642 let mut values_raw = [std::ptr::null_mut(); BATCH_SIZE];
643
644 for chunk_fields in fields.chunks(BATCH_SIZE) {
645 let chunk_values = &mut values_raw[..chunk_fields.len()];
646 raw::hash_get_multi(key, chunk_fields, chunk_values)?;
647 values.extend(chunk_values.iter().map(|ptr| {
648 if ptr.is_null() {
649 None
650 } else {
651 Some(ValkeyString::from_redis_module_string(ctx, *ptr))
652 }
653 }));
654 }
655
656 Ok(values)
657}
658
659fn to_raw_mode(mode: KeyMode) -> raw::KeyMode {
660 match mode {
661 KeyMode::Read => raw::KeyMode::READ,
662 KeyMode::ReadWrite => raw::KeyMode::READ | raw::KeyMode::WRITE,
663 }
664}
665
666#[allow(clippy::not_unsafe_ptr_arg_deref)]
670pub fn verify_type(key_inner: *mut raw::RedisModuleKey, redis_type: &ValkeyType) -> ValkeyResult {
671 let key_type: KeyType = unsafe { raw::RedisModule_KeyType.unwrap()(key_inner) }.into();
672
673 if key_type != KeyType::Empty {
674 let raw_type = unsafe { raw::RedisModule_ModuleTypeGetType.unwrap()(key_inner) };
676
677 if raw_type != *redis_type.raw_type.borrow() {
678 return Err(ValkeyError::Str("Existing key has wrong Valkey type"));
679 }
680 }
681
682 VALKEY_OK
683}