1use bitflags::bitflags;
2use std::collections::{BTreeMap, HashMap};
3use std::ffi::CString;
4use std::os::raw::c_void;
5use std::os::raw::{c_char, c_int, c_long, c_longlong};
6use std::ptr::{self, NonNull};
7use std::sync::atomic::{AtomicPtr, Ordering};
8use valkey_module_macros_internals::api;
9
10use crate::key::{KeyFlags, ValkeyKey, ValkeyKeyWritable};
11use crate::logging::ValkeyLogLevel;
12use crate::raw::{ModuleOptions, Version};
13use crate::redisvalue::ValkeyValueKey;
14use crate::{
15 add_info_begin_dict_field, add_info_end_dict_field, add_info_field_double,
16 add_info_field_long_long, add_info_field_str, add_info_field_unsigned_long_long, raw, utils,
17 Status,
18};
19use crate::{add_info_section, ValkeyResult};
20use crate::{ValkeyError, ValkeyString, ValkeyValue};
21use std::ops::Deref;
22
23use std::ffi::CStr;
24
25use self::call_reply::{create_promise_call_reply, CallResult, PromiseCallReply};
26use self::thread_safe::ValkeyLockIndicator;
27
28mod timer;
29
30pub mod blocked;
31pub mod call_reply;
32pub mod client;
33pub mod commands;
34pub mod info;
35pub mod keys_cursor;
36pub mod server_events;
37pub mod thread_safe;
38
39pub struct CallOptionsBuilder {
40 options: String,
41}
42
43impl Default for CallOptionsBuilder {
44 fn default() -> Self {
45 CallOptionsBuilder {
46 options: "v".to_string(),
47 }
48 }
49}
50
51#[derive(Clone)]
52pub struct CallOptions {
53 options: CString,
54}
55
56#[derive(Clone)]
57#[cfg(all(any(
58 feature = "min-valkey-compatibility-version-8-0",
59 feature = "min-redis-compatibility-version-7-2"
60)))]
61pub struct BlockingCallOptions {
62 options: CString,
63}
64
65#[derive(Copy, Clone)]
66pub enum CallOptionResp {
67 Resp2,
68 Resp3,
69 Auto,
70}
71
72impl CallOptionsBuilder {
73 pub fn new() -> CallOptionsBuilder {
74 Self::default()
75 }
76
77 fn add_flag(&mut self, flag: &str) {
78 self.options.push_str(flag);
79 }
80
81 pub fn no_writes(mut self) -> CallOptionsBuilder {
83 self.add_flag("W");
84 self
85 }
86
87 pub fn script_mode(mut self) -> CallOptionsBuilder {
92 self.add_flag("S");
93 self
94 }
95
96 pub fn verify_acl(mut self) -> CallOptionsBuilder {
99 self.add_flag("C");
100 self
101 }
102
103 pub fn verify_oom(mut self) -> CallOptionsBuilder {
105 self.add_flag("M");
106 self
107 }
108
109 pub fn errors_as_replies(mut self) -> CallOptionsBuilder {
112 self.add_flag("E");
113 self
114 }
115
116 pub fn replicate(mut self) -> CallOptionsBuilder {
118 self.add_flag("!");
119 self
120 }
121
122 pub fn resp(mut self, resp: CallOptionResp) -> CallOptionsBuilder {
124 match resp {
125 CallOptionResp::Auto => self.add_flag("0"),
126 CallOptionResp::Resp2 => (),
127 CallOptionResp::Resp3 => self.add_flag("3"),
128 }
129 self
130 }
131
132 pub fn build(self) -> CallOptions {
134 CallOptions {
135 options: CString::new(self.options).unwrap(), }
137 }
138
139 #[cfg(all(any(
143 feature = "min-valkey-compatibility-version-8-0",
144 feature = "min-redis-compatibility-version-7-2"
145 )))]
146 pub fn build_blocking(mut self) -> BlockingCallOptions {
147 self.add_flag("K");
148 BlockingCallOptions {
149 options: CString::new(self.options).unwrap(), }
151 }
152}
153
154pub struct DetachedContext {
158 pub(crate) ctx: AtomicPtr<raw::RedisModuleCtx>,
159}
160
161impl DetachedContext {
162 pub const fn new() -> Self {
163 DetachedContext {
164 ctx: AtomicPtr::new(ptr::null_mut()),
165 }
166 }
167}
168
169impl Default for DetachedContext {
170 fn default() -> Self {
171 Self::new()
172 }
173}
174
175pub struct DetachedContextGuard {
182 pub(crate) ctx: Context,
183}
184
185unsafe impl ValkeyLockIndicator for DetachedContextGuard {}
186
187impl Drop for DetachedContextGuard {
188 fn drop(&mut self) {
189 unsafe {
190 raw::RedisModule_ThreadSafeContextUnlock.unwrap()(self.ctx.ctx);
191 };
192 }
193}
194
195impl Deref for DetachedContextGuard {
196 type Target = Context;
197
198 fn deref(&self) -> &Self::Target {
199 &self.ctx
200 }
201}
202
203impl DetachedContext {
204 pub fn log(&self, level: ValkeyLogLevel, message: &str) {
205 let c = self.ctx.load(Ordering::Relaxed);
206 crate::logging::log_internal(c, level, message);
207 }
208
209 pub fn log_debug(&self, message: &str) {
210 self.log(ValkeyLogLevel::Debug, message);
211 }
212
213 pub fn log_notice(&self, message: &str) {
214 self.log(ValkeyLogLevel::Notice, message);
215 }
216
217 pub fn log_verbose(&self, message: &str) {
218 self.log(ValkeyLogLevel::Verbose, message);
219 }
220
221 pub fn log_warning(&self, message: &str) {
222 self.log(ValkeyLogLevel::Warning, message);
223 }
224
225 pub fn set_context(&self, ctx: &Context) -> Result<(), ValkeyError> {
226 let c = self.ctx.load(Ordering::Relaxed);
227 if !c.is_null() {
228 return Err(ValkeyError::Str("Detached context is already set"));
229 }
230 let ctx = unsafe { raw::RedisModule_GetDetachedThreadSafeContext.unwrap()(ctx.ctx) };
231 self.ctx.store(ctx, Ordering::Relaxed);
232 Ok(())
233 }
234
235 pub fn lock(&self) -> DetachedContextGuard {
240 let c = self.ctx.load(Ordering::Relaxed);
241 unsafe { raw::RedisModule_ThreadSafeContextLock.unwrap()(c) };
242 let ctx = Context::new(c);
243 DetachedContextGuard { ctx }
244 }
245}
246
247unsafe impl Send for DetachedContext {}
248unsafe impl Sync for DetachedContext {}
249
250#[derive(Debug)]
253pub struct Context {
254 pub ctx: *mut raw::RedisModuleCtx,
255}
256
257#[derive(Debug)]
263pub struct ContextUserScope<'ctx> {
264 ctx: &'ctx Context,
265 user: *mut raw::RedisModuleUser,
266}
267
268impl<'ctx> Drop for ContextUserScope<'ctx> {
269 fn drop(&mut self) {
270 self.ctx.deautenticate_user();
271 unsafe { raw::RedisModule_FreeModuleUser.unwrap()(self.user) };
272 }
273}
274
275impl<'ctx> ContextUserScope<'ctx> {
276 fn new(ctx: &'ctx Context, user: *mut raw::RedisModuleUser) -> ContextUserScope<'ctx> {
277 ContextUserScope { ctx, user }
278 }
279}
280
281pub struct StrCallArgs<'a> {
282 is_owner: bool,
283 args: Vec<*mut raw::RedisModuleString>,
284 phantom: std::marker::PhantomData<&'a raw::RedisModuleString>,
286}
287
288impl<'a> Drop for StrCallArgs<'a> {
289 fn drop(&mut self) {
290 if self.is_owner {
291 self.args.iter_mut().for_each(|v| unsafe {
292 raw::RedisModule_FreeString.unwrap()(std::ptr::null_mut(), *v)
293 });
294 }
295 }
296}
297
298impl<'a, T: AsRef<[u8]> + ?Sized> From<&'a [&T]> for StrCallArgs<'a> {
299 fn from(vals: &'a [&T]) -> Self {
300 StrCallArgs {
301 is_owner: true,
302 args: vals
303 .iter()
304 .map(|v| ValkeyString::create_from_slice(std::ptr::null_mut(), v.as_ref()).take())
305 .collect(),
306 phantom: std::marker::PhantomData,
307 }
308 }
309}
310
311impl<'a> From<&'a [&ValkeyString]> for StrCallArgs<'a> {
312 fn from(vals: &'a [&ValkeyString]) -> Self {
313 StrCallArgs {
314 is_owner: false,
315 args: vals.iter().map(|v| v.inner).collect(),
316 phantom: std::marker::PhantomData,
317 }
318 }
319}
320
321impl<'a, const SIZE: usize, T: ?Sized> From<&'a [&T; SIZE]> for StrCallArgs<'a>
322where
323 for<'b> &'a [&'b T]: Into<StrCallArgs<'a>>,
324{
325 fn from(vals: &'a [&T; SIZE]) -> Self {
326 vals.as_ref().into()
327 }
328}
329
330impl<'a> StrCallArgs<'a> {
331 pub(crate) fn args_mut(&mut self) -> &mut [*mut raw::RedisModuleString] {
332 &mut self.args
333 }
334}
335
336impl Context {
337 pub const fn new(ctx: *mut raw::RedisModuleCtx) -> Self {
338 Self { ctx }
339 }
340
341 #[must_use]
342 pub const fn dummy() -> Self {
343 Self {
344 ctx: ptr::null_mut(),
345 }
346 }
347
348 pub fn log(&self, level: ValkeyLogLevel, message: &str) {
349 crate::logging::log_internal(self.ctx, level, message);
350 }
351
352 pub fn log_debug(&self, message: &str) {
353 self.log(ValkeyLogLevel::Debug, message);
354 }
355
356 pub fn log_notice(&self, message: &str) {
357 self.log(ValkeyLogLevel::Notice, message);
358 }
359
360 pub fn log_verbose(&self, message: &str) {
361 self.log(ValkeyLogLevel::Verbose, message);
362 }
363
364 pub fn log_warning(&self, message: &str) {
365 self.log(ValkeyLogLevel::Warning, message);
366 }
367
368 pub fn auto_memory(&self) {
372 unsafe {
373 raw::RedisModule_AutoMemory.unwrap()(self.ctx);
374 }
375 }
376
377 #[must_use]
381 pub fn is_keys_position_request(&self) -> bool {
382 if cfg!(test) {
384 return false;
385 }
386
387 (unsafe { raw::RedisModule_IsKeysPositionRequest.unwrap()(self.ctx) }) != 0
388 }
389
390 pub fn key_at_pos(&self, pos: i32) {
394 unsafe {
397 raw::RedisModule_KeyAtPos.unwrap()(self.ctx, pos as c_int);
398 }
399 }
400
401 fn call_internal<
402 'ctx,
403 'a,
404 T: Into<StrCallArgs<'a>>,
405 R: From<PromiseCallReply<'static, 'ctx>>,
406 >(
407 &'ctx self,
408 command: &str,
409 fmt: *const c_char,
410 args: T,
411 ) -> R {
412 let mut call_args: StrCallArgs = args.into();
413 let final_args = call_args.args_mut();
414
415 let cmd = CString::new(command).unwrap();
416 let reply: *mut raw::RedisModuleCallReply = unsafe {
417 let p_call = raw::RedisModule_Call.unwrap();
418 p_call(
419 self.ctx,
420 cmd.as_ptr(),
421 fmt,
422 final_args.as_mut_ptr(),
423 final_args.len(),
424 )
425 };
426 let promise = create_promise_call_reply(self, NonNull::new(reply));
427 R::from(promise)
428 }
429
430 pub fn call<'a, T: Into<StrCallArgs<'a>>>(&self, command: &str, args: T) -> ValkeyResult {
431 self.call_internal::<_, CallResult>(command, raw::FMT, args)
432 .map_or_else(|e| Err(e.into()), |v| Ok((&v).into()))
433 }
434
435 pub fn call_ext<'a, T: Into<StrCallArgs<'a>>, R: From<CallResult<'static>>>(
439 &self,
440 command: &str,
441 options: &CallOptions,
442 args: T,
443 ) -> R {
444 let res: CallResult<'static> =
445 self.call_internal(command, options.options.as_ptr() as *const c_char, args);
446 R::from(res)
447 }
448
449 #[cfg(all(any(
451 feature = "min-valkey-compatibility-version-8-0",
452 feature = "min-redis-compatibility-version-7-2"
453 )))]
454 pub fn call_blocking<
455 'ctx,
456 'a,
457 T: Into<StrCallArgs<'a>>,
458 R: From<PromiseCallReply<'static, 'ctx>>,
459 >(
460 &'ctx self,
461 command: &str,
462 options: &BlockingCallOptions,
463 args: T,
464 ) -> R {
465 self.call_internal(command, options.options.as_ptr() as *const c_char, args)
466 }
467
468 #[must_use]
469 pub fn str_as_legal_resp_string(s: &str) -> CString {
470 CString::new(
471 s.chars()
472 .map(|c| match c {
473 '\r' | '\n' | '\0' => b' ',
474 _ => c as u8,
475 })
476 .collect::<Vec<_>>(),
477 )
478 .unwrap()
479 }
480
481 #[allow(clippy::must_use_candidate)]
482 pub fn reply_simple_string(&self, s: &str) -> raw::Status {
483 let msg = Self::str_as_legal_resp_string(s);
484 raw::reply_with_simple_string(self.ctx, msg.as_ptr())
485 }
486
487 #[allow(clippy::must_use_candidate)]
488 pub fn reply_error_string(&self, s: &str) -> raw::Status {
489 let msg = Self::str_as_legal_resp_string(s);
490 unsafe { raw::RedisModule_ReplyWithError.unwrap()(self.ctx, msg.as_ptr()).into() }
491 }
492
493 #[cfg(feature = "min-valkey-compatibility-version-8-0")]
494 pub fn add_acl_category(&self, s: &str) -> raw::Status {
495 let acl_flags = Self::str_as_legal_resp_string(s);
496 unsafe { raw::RedisModule_AddACLCategory.unwrap()(self.ctx, acl_flags.as_ptr()).into() }
497 }
498
499 #[cfg(all(any(
500 feature = "min-redis-compatibility-version-7-2",
501 feature = "min-valkey-compatibility-version-8-0"
502 ),))]
503 pub fn set_acl_category(
504 &self,
505 command_name: *const c_char,
506 acl_flags: *const c_char,
507 ) -> raw::Status {
508 unsafe {
509 let command = raw::RedisModule_GetCommand.unwrap()(self.ctx, command_name);
510 raw::RedisModule_SetCommandACLCategories.unwrap()(command, acl_flags).into()
511 }
512 }
513
514 pub fn reply_with_key(&self, result: ValkeyValueKey) -> raw::Status {
515 match result {
516 ValkeyValueKey::Integer(i) => raw::reply_with_long_long(self.ctx, i),
517 ValkeyValueKey::String(s) => {
518 raw::reply_with_string_buffer(self.ctx, s.as_ptr().cast::<c_char>(), s.len())
519 }
520 ValkeyValueKey::BulkString(b) => {
521 raw::reply_with_string_buffer(self.ctx, b.as_ptr().cast::<c_char>(), b.len())
522 }
523 ValkeyValueKey::BulkValkeyString(s) => raw::reply_with_string(self.ctx, s.inner),
524 ValkeyValueKey::Bool(b) => raw::reply_with_bool(self.ctx, b.into()),
525 }
526 }
527
528 #[allow(clippy::must_use_candidate)]
532 pub fn reply(&self, result: ValkeyResult) -> raw::Status {
533 match result {
534 Ok(ValkeyValue::Bool(v)) => raw::reply_with_bool(self.ctx, v.into()),
535 Ok(ValkeyValue::Integer(v)) => raw::reply_with_long_long(self.ctx, v),
536 Ok(ValkeyValue::Float(v)) => raw::reply_with_double(self.ctx, v),
537 Ok(ValkeyValue::SimpleStringStatic(s)) => {
538 let msg = CString::new(s).unwrap();
539 raw::reply_with_simple_string(self.ctx, msg.as_ptr())
540 }
541
542 Ok(ValkeyValue::SimpleString(s)) => {
543 let msg = CString::new(s).unwrap();
544 raw::reply_with_simple_string(self.ctx, msg.as_ptr())
545 }
546
547 Ok(ValkeyValue::BulkString(s)) => {
548 raw::reply_with_string_buffer(self.ctx, s.as_ptr().cast::<c_char>(), s.len())
549 }
550
551 Ok(ValkeyValue::BigNumber(s)) => {
552 raw::reply_with_big_number(self.ctx, s.as_ptr().cast::<c_char>(), s.len())
553 }
554
555 Ok(ValkeyValue::VerbatimString((format, data))) => raw::reply_with_verbatim_string(
556 self.ctx,
557 data.as_ptr().cast(),
558 data.len(),
559 format.0.as_ptr().cast(),
560 ),
561
562 Ok(ValkeyValue::BulkValkeyString(s)) => raw::reply_with_string(self.ctx, s.inner),
563
564 Ok(ValkeyValue::StringBuffer(s)) => {
565 raw::reply_with_string_buffer(self.ctx, s.as_ptr().cast::<c_char>(), s.len())
566 }
567
568 Ok(ValkeyValue::Array(array)) => {
569 raw::reply_with_array(self.ctx, array.len() as c_long);
570
571 for elem in array {
572 self.reply(Ok(elem));
573 }
574
575 raw::Status::Ok
576 }
577
578 Ok(ValkeyValue::Map(map)) => {
579 raw::reply_with_map(self.ctx, map.len() as c_long);
580
581 for (key, value) in map {
582 self.reply_with_key(key);
583 self.reply(Ok(value));
584 }
585
586 raw::Status::Ok
587 }
588
589 Ok(ValkeyValue::OrderedMap(map)) => {
590 raw::reply_with_map(self.ctx, map.len() as c_long);
591
592 for (key, value) in map {
593 self.reply_with_key(key);
594 self.reply(Ok(value));
595 }
596
597 raw::Status::Ok
598 }
599
600 Ok(ValkeyValue::Set(set)) => {
601 raw::reply_with_set(self.ctx, set.len() as c_long);
602 set.into_iter().for_each(|e| {
603 self.reply_with_key(e);
604 });
605
606 raw::Status::Ok
607 }
608
609 Ok(ValkeyValue::OrderedSet(set)) => {
610 raw::reply_with_set(self.ctx, set.len() as c_long);
611 set.into_iter().for_each(|e| {
612 self.reply_with_key(e);
613 });
614
615 raw::Status::Ok
616 }
617
618 Ok(ValkeyValue::Null) => raw::reply_with_null(self.ctx),
619
620 Ok(ValkeyValue::NoReply) => raw::Status::Ok,
621
622 Ok(ValkeyValue::StaticError(s)) => self.reply_error_string(s),
623
624 Err(ValkeyError::WrongArity) => unsafe {
625 if self.is_keys_position_request() {
626 raw::Status::Err
628 } else {
629 raw::RedisModule_WrongArity.unwrap()(self.ctx).into()
630 }
631 },
632
633 Err(ValkeyError::WrongType) => {
634 self.reply_error_string(ValkeyError::WrongType.to_string().as_str())
635 }
636
637 Err(ValkeyError::String(s)) => self.reply_error_string(s.as_str()),
638
639 Err(ValkeyError::Str(s)) => self.reply_error_string(s),
640 }
641 }
642
643 #[must_use]
644 pub fn open_key(&self, key: &ValkeyString) -> ValkeyKey {
645 ValkeyKey::open(self.ctx, key)
646 }
647
648 #[must_use]
649 pub fn open_key_with_flags(&self, key: &ValkeyString, flags: KeyFlags) -> ValkeyKey {
650 ValkeyKey::open_with_flags(self.ctx, key, flags)
651 }
652
653 #[must_use]
654 pub fn open_key_writable(&self, key: &ValkeyString) -> ValkeyKeyWritable {
655 ValkeyKeyWritable::open(self.ctx, key)
656 }
657
658 #[must_use]
659 pub fn open_key_writable_with_flags(
660 &self,
661 key: &ValkeyString,
662 flags: KeyFlags,
663 ) -> ValkeyKeyWritable {
664 ValkeyKeyWritable::open_with_flags(self.ctx, key, flags)
665 }
666
667 pub fn replicate_verbatim(&self) {
668 raw::replicate_verbatim(self.ctx);
669 }
670
671 pub fn replicate<'a, T: Into<StrCallArgs<'a>>>(&self, command: &str, args: T) {
673 raw::replicate(self.ctx, command, args);
674 }
675
676 #[must_use]
677 pub fn create_string<T: Into<Vec<u8>>>(&self, s: T) -> ValkeyString {
678 ValkeyString::create(NonNull::new(self.ctx), s)
679 }
680
681 #[must_use]
682 pub const fn get_raw(&self) -> *mut raw::RedisModuleCtx {
683 self.ctx
684 }
685
686 pub unsafe fn export_shared_api(
690 &self,
691 func: *const ::std::os::raw::c_void,
692 name: *const ::std::os::raw::c_char,
693 ) {
694 raw::export_shared_api(self.ctx, func, name);
695 }
696
697 #[allow(clippy::must_use_candidate)]
701 pub fn notify_keyspace_event(
702 &self,
703 event_type: raw::NotifyEvent,
704 event: &str,
705 keyname: &ValkeyString,
706 ) -> raw::Status {
707 unsafe { raw::notify_keyspace_event(self.ctx, event_type, event, keyname) }
708 }
709
710 pub fn current_command_name(&self) -> Result<String, ValkeyError> {
711 unsafe {
712 match raw::RedisModule_GetCurrentCommandName {
713 Some(cmd) => Ok(CStr::from_ptr(cmd(self.ctx)).to_str().unwrap().to_string()),
714 None => Err(ValkeyError::Str(
715 "API RedisModule_GetCurrentCommandName is not available",
716 )),
717 }
718 }
719 }
720
721 pub fn get_server_version(&self) -> Result<Version, ValkeyError> {
724 self.get_server_version_internal(false)
725 }
726
727 pub fn get_server_version_rm_call(&self) -> Result<Version, ValkeyError> {
729 self.get_server_version_internal(true)
730 }
731
732 pub fn version_from_info(info: ValkeyValue) -> Result<Version, ValkeyError> {
733 if let ValkeyValue::SimpleString(info_str) = info {
734 if let Some(ver) = utils::get_regexp_captures(
735 info_str.as_str(),
736 r"(?m)\bredis_version:([0-9]+)\.([0-9]+)\.([0-9]+)\b",
737 ) {
738 return Ok(Version {
739 major: ver[0].parse::<c_int>().unwrap(),
740 minor: ver[1].parse::<c_int>().unwrap(),
741 patch: ver[2].parse::<c_int>().unwrap(),
742 });
743 }
744 }
745 Err(ValkeyError::Str("Error getting redis_version"))
746 }
747
748 #[allow(clippy::not_unsafe_ptr_arg_deref)]
749 fn get_server_version_internal(&self, force_use_rm_call: bool) -> Result<Version, ValkeyError> {
750 match unsafe { raw::RedisModule_GetServerVersion } {
751 Some(api) if !force_use_rm_call => {
752 Ok(Version::from(unsafe { api() }))
754 }
755 _ => {
756 if let Ok(info) = self.call("info", &["server"]) {
758 Self::version_from_info(info)
759 } else {
760 Err(ValkeyError::Str("Error calling \"info server\""))
761 }
762 }
763 }
764 }
765 pub fn set_module_options(&self, options: ModuleOptions) {
766 unsafe { raw::RedisModule_SetModuleOptions.unwrap()(self.ctx, options.bits()) };
767 }
768
769 pub fn get_flags(&self) -> ContextFlags {
775 ContextFlags::from_bits_truncate(unsafe {
776 raw::RedisModule_GetContextFlags.unwrap()(self.ctx)
777 })
778 }
779
780 pub fn get_current_user(&self) -> ValkeyString {
782 let user = unsafe { raw::RedisModule_GetCurrentUserName.unwrap()(self.ctx) };
783 ValkeyString::from_redis_module_string(ptr::null_mut(), user)
784 }
785
786 pub fn authenticate_user(
791 &self,
792 user_name: &ValkeyString,
793 ) -> Result<ContextUserScope<'_>, ValkeyError> {
794 let user = unsafe { raw::RedisModule_GetModuleUserFromUserName.unwrap()(user_name.inner) };
795 if user.is_null() {
796 return Err(ValkeyError::Str("User does not exists or disabled"));
797 }
798 unsafe { raw::RedisModule_SetContextUser.unwrap()(self.ctx, user) };
799 Ok(ContextUserScope::new(self, user))
800 }
801
802 fn deautenticate_user(&self) {
803 unsafe { raw::RedisModule_SetContextUser.unwrap()(self.ctx, ptr::null_mut()) };
804 }
805
806 pub fn acl_check_key_permission(
810 &self,
811 user_name: &ValkeyString,
812 key_name: &ValkeyString,
813 permissions: &AclPermissions,
814 ) -> Result<(), ValkeyError> {
815 let user = unsafe { raw::RedisModule_GetModuleUserFromUserName.unwrap()(user_name.inner) };
816 if user.is_null() {
817 return Err(ValkeyError::Str("User does not exists or disabled"));
818 }
819 let acl_permission_result: raw::Status = unsafe {
820 raw::RedisModule_ACLCheckKeyPermissions.unwrap()(
821 user,
822 key_name.inner,
823 permissions.bits(),
824 )
825 }
826 .into();
827 unsafe { raw::RedisModule_FreeModuleUser.unwrap()(user) };
828 let acl_permission_result: Result<(), &str> = acl_permission_result.into();
829 acl_permission_result
830 .map_err(|_e| ValkeyError::Str("User does not have permissions on key"))
831 }
832
833 api!(
834 [RedisModule_AddPostNotificationJob],
835 pub fn add_post_notification_job<F: FnOnce(&Context) + 'static>(
848 &self,
849 callback: F,
850 ) -> Status {
851 let callback = Box::into_raw(Box::new(Some(callback)));
852 unsafe {
853 RedisModule_AddPostNotificationJob(
854 self.ctx,
855 Some(post_notification_job::<F>),
856 callback as *mut c_void,
857 Some(post_notification_job_free_callback::<F>),
858 )
859 }
860 .into()
861 }
862 );
863
864 api!(
865 [RedisModule_AvoidReplicaTraffic],
866 pub fn avoid_replication_traffic(&self) -> bool {
882 unsafe { RedisModule_AvoidReplicaTraffic() == 1 }
883 }
884 );
885}
886
887extern "C" fn post_notification_job_free_callback<F: FnOnce(&Context)>(pd: *mut c_void) {
888 unsafe {
889 drop(Box::from_raw(pd as *mut Option<F>));
890 };
891}
892
893extern "C" fn post_notification_job<F: FnOnce(&Context)>(
894 ctx: *mut raw::RedisModuleCtx,
895 pd: *mut c_void,
896) {
897 let callback = unsafe { &mut *(pd as *mut Option<F>) };
898 let ctx = Context::new(ctx);
899 callback.take().map_or_else(
900 || {
901 ctx.log(
902 ValkeyLogLevel::Warning,
903 "Got a None callback on post notification job.",
904 )
905 },
906 |callback| {
907 callback(&ctx);
908 },
909 );
910}
911
912unsafe impl ValkeyLockIndicator for Context {}
913
914bitflags! {
915 #[derive(Debug)]
918 pub struct AclPermissions : c_int {
919 const ACCESS = raw::REDISMODULE_CMD_KEY_ACCESS as c_int;
921
922 const INSERT = raw::REDISMODULE_CMD_KEY_INSERT as c_int;
924
925 const DELETE = raw::REDISMODULE_CMD_KEY_DELETE as c_int;
927
928 const UPDATE = raw::REDISMODULE_CMD_KEY_UPDATE as c_int;
930 }
931}
932
933#[derive(Debug, Clone)]
935pub enum InfoContextBuilderFieldBottomLevelValue {
936 String(String),
938 I64(i64),
940 U64(u64),
942 F64(f64),
944}
945
946impl From<String> for InfoContextBuilderFieldBottomLevelValue {
947 fn from(value: String) -> Self {
948 Self::String(value)
949 }
950}
951
952impl From<&str> for InfoContextBuilderFieldBottomLevelValue {
953 fn from(value: &str) -> Self {
954 Self::String(value.to_owned())
955 }
956}
957
958impl From<i64> for InfoContextBuilderFieldBottomLevelValue {
959 fn from(value: i64) -> Self {
960 Self::I64(value)
961 }
962}
963
964impl From<u64> for InfoContextBuilderFieldBottomLevelValue {
965 fn from(value: u64) -> Self {
966 Self::U64(value)
967 }
968}
969
970#[derive(Debug, Clone)]
971pub enum InfoContextBuilderFieldTopLevelValue {
972 Value(InfoContextBuilderFieldBottomLevelValue),
974 Dictionary {
998 name: String,
999 fields: InfoContextFieldBottomLevelData,
1000 },
1001}
1002
1003impl<T: Into<InfoContextBuilderFieldBottomLevelValue>> From<T>
1004 for InfoContextBuilderFieldTopLevelValue
1005{
1006 fn from(value: T) -> Self {
1007 Self::Value(value.into())
1008 }
1009}
1010
1011#[derive(Debug)]
1014pub struct InfoContextBuilderDictionaryBuilder<'a> {
1015 info_section_builder: InfoContextBuilderSectionBuilder<'a>,
1017 name: String,
1019 fields: InfoContextFieldBottomLevelData,
1021}
1022
1023impl<'a> InfoContextBuilderDictionaryBuilder<'a> {
1024 pub fn field<F: Into<InfoContextBuilderFieldBottomLevelValue>>(
1026 mut self,
1027 name: &str,
1028 value: F,
1029 ) -> ValkeyResult<Self> {
1030 if self.fields.iter().any(|k| k.0 .0 == name) {
1031 return Err(ValkeyError::String(format!(
1032 "Found duplicate key '{name}' in the info dictionary '{}'",
1033 self.name
1034 )));
1035 }
1036
1037 self.fields.push((name.to_owned(), value.into()).into());
1038 Ok(self)
1039 }
1040
1041 pub fn build_dictionary(self) -> ValkeyResult<InfoContextBuilderSectionBuilder<'a>> {
1043 let name = self.name;
1044 let name_ref = name.clone();
1045 self.info_section_builder.field(
1046 &name_ref,
1047 InfoContextBuilderFieldTopLevelValue::Dictionary {
1048 name,
1049 fields: self.fields.to_owned(),
1050 },
1051 )
1052 }
1053}
1054
1055#[derive(Debug)]
1057pub struct InfoContextBuilderSectionBuilder<'a> {
1058 info_builder: InfoContextBuilder<'a>,
1060 name: String,
1062 fields: InfoContextFieldTopLevelData,
1064}
1065
1066impl<'a> InfoContextBuilderSectionBuilder<'a> {
1067 pub fn field<F: Into<InfoContextBuilderFieldTopLevelValue>>(
1069 mut self,
1070 name: &str,
1071 value: F,
1072 ) -> ValkeyResult<Self> {
1073 if self.fields.iter().any(|(k, _)| k == name) {
1074 return Err(ValkeyError::String(format!(
1075 "Found duplicate key '{name}' in the info section '{}'",
1076 self.name
1077 )));
1078 }
1079 self.fields.push((name.to_owned(), value.into()));
1080 Ok(self)
1081 }
1082
1083 pub fn add_dictionary(self, dictionary_name: &str) -> InfoContextBuilderDictionaryBuilder<'a> {
1085 InfoContextBuilderDictionaryBuilder {
1086 info_section_builder: self,
1087 name: dictionary_name.to_owned(),
1088 fields: InfoContextFieldBottomLevelData::default(),
1089 }
1090 }
1091
1092 pub fn build_section(mut self) -> ValkeyResult<InfoContextBuilder<'a>> {
1094 if self
1095 .info_builder
1096 .sections
1097 .iter()
1098 .any(|(k, _)| k == &self.name)
1099 {
1100 return Err(ValkeyError::String(format!(
1101 "Found duplicate section in the Info reply: {}",
1102 self.name
1103 )));
1104 }
1105
1106 self.info_builder
1107 .sections
1108 .push((self.name.clone(), self.fields));
1109
1110 Ok(self.info_builder)
1111 }
1112}
1113
1114#[derive(Debug, Clone)]
1116#[repr(transparent)]
1117pub struct InfoContextBottomLevelFieldData(pub (String, InfoContextBuilderFieldBottomLevelValue));
1118impl Deref for InfoContextBottomLevelFieldData {
1119 type Target = (String, InfoContextBuilderFieldBottomLevelValue);
1120
1121 fn deref(&self) -> &Self::Target {
1122 &self.0
1123 }
1124}
1125impl std::ops::DerefMut for InfoContextBottomLevelFieldData {
1126 fn deref_mut(&mut self) -> &mut Self::Target {
1127 &mut self.0
1128 }
1129}
1130
1131impl<T: Into<InfoContextBuilderFieldBottomLevelValue>> From<(String, T)>
1132 for InfoContextBottomLevelFieldData
1133{
1134 fn from(value: (String, T)) -> Self {
1135 Self((value.0, value.1.into()))
1136 }
1137}
1138#[derive(Debug, Default, Clone)]
1141#[repr(transparent)]
1142pub struct InfoContextFieldBottomLevelData(pub Vec<InfoContextBottomLevelFieldData>);
1143impl Deref for InfoContextFieldBottomLevelData {
1144 type Target = Vec<InfoContextBottomLevelFieldData>;
1145
1146 fn deref(&self) -> &Self::Target {
1147 &self.0
1148 }
1149}
1150impl std::ops::DerefMut for InfoContextFieldBottomLevelData {
1151 fn deref_mut(&mut self) -> &mut Self::Target {
1152 &mut self.0
1153 }
1154}
1155
1156pub type InfoContextFieldTopLevelData = Vec<(String, InfoContextBuilderFieldTopLevelValue)>;
1159pub type OneInfoSectionData = (String, InfoContextFieldTopLevelData);
1161pub type InfoContextTreeData = Vec<OneInfoSectionData>;
1163
1164impl<T: Into<InfoContextBuilderFieldBottomLevelValue>> From<BTreeMap<String, T>>
1165 for InfoContextFieldBottomLevelData
1166{
1167 fn from(value: BTreeMap<String, T>) -> Self {
1168 Self(
1169 value
1170 .into_iter()
1171 .map(|e| (e.0, e.1.into()).into())
1172 .collect(),
1173 )
1174 }
1175}
1176
1177impl<T: Into<InfoContextBuilderFieldBottomLevelValue>> From<HashMap<String, T>>
1178 for InfoContextFieldBottomLevelData
1179{
1180 fn from(value: HashMap<String, T>) -> Self {
1181 Self(
1182 value
1183 .into_iter()
1184 .map(|e| (e.0, e.1.into()).into())
1185 .collect(),
1186 )
1187 }
1188}
1189
1190#[derive(Debug)]
1191pub struct InfoContextBuilder<'a> {
1192 context: &'a InfoContext,
1193 sections: InfoContextTreeData,
1194}
1195impl<'a> InfoContextBuilder<'a> {
1196 fn add_bottom_level_field(
1197 &self,
1198 key: &str,
1199 value: &InfoContextBuilderFieldBottomLevelValue,
1200 ) -> ValkeyResult<()> {
1201 use InfoContextBuilderFieldBottomLevelValue as BottomLevel;
1202
1203 match value {
1204 BottomLevel::String(string) => add_info_field_str(self.context.ctx, key, string),
1205 BottomLevel::I64(number) => add_info_field_long_long(self.context.ctx, key, *number),
1206 BottomLevel::U64(number) => {
1207 add_info_field_unsigned_long_long(self.context.ctx, key, *number)
1208 }
1209 BottomLevel::F64(number) => add_info_field_double(self.context.ctx, key, *number),
1210 }
1211 .into()
1212 }
1213 fn add_top_level_fields(&self, fields: &InfoContextFieldTopLevelData) -> ValkeyResult<()> {
1216 use InfoContextBuilderFieldTopLevelValue as TopLevel;
1217
1218 fields.iter().try_for_each(|(key, value)| match value {
1219 TopLevel::Value(bottom_level) => self.add_bottom_level_field(key, bottom_level),
1220 TopLevel::Dictionary { name, fields } => {
1221 std::convert::Into::<ValkeyResult<()>>::into(add_info_begin_dict_field(
1222 self.context.ctx,
1223 name,
1224 ))?;
1225 fields
1226 .iter()
1227 .try_for_each(|f| self.add_bottom_level_field(&f.0 .0, &f.0 .1))?;
1228 add_info_end_dict_field(self.context.ctx).into()
1229 }
1230 })
1231 }
1232
1233 fn finalise_data(&self) -> ValkeyResult<()> {
1234 self.sections
1235 .iter()
1236 .try_for_each(|(section_name, section_fields)| -> ValkeyResult<()> {
1237 if add_info_section(self.context.ctx, Some(section_name)) == Status::Ok {
1238 self.add_top_level_fields(section_fields)
1239 } else {
1240 Ok(())
1242 }
1243 })
1244 }
1245
1246 pub fn build_info(self) -> ValkeyResult<&'a InfoContext> {
1248 self.finalise_data().map(|_| self.context)
1249 }
1250
1251 pub fn add_section(self, name: &'a str) -> InfoContextBuilderSectionBuilder {
1253 InfoContextBuilderSectionBuilder {
1254 info_builder: self,
1255 name: name.to_owned(),
1256 fields: InfoContextFieldTopLevelData::new(),
1257 }
1258 }
1259
1260 pub(crate) fn add_section_unchecked(mut self, section: OneInfoSectionData) -> Self {
1263 self.sections.push(section);
1264 self
1265 }
1266}
1267
1268impl<'a> From<&'a InfoContext> for InfoContextBuilder<'a> {
1269 fn from(context: &'a InfoContext) -> Self {
1270 Self {
1271 context,
1272 sections: InfoContextTreeData::new(),
1273 }
1274 }
1275}
1276
1277#[derive(Debug)]
1278pub struct InfoContext {
1279 pub ctx: *mut raw::RedisModuleInfoCtx,
1280}
1281
1282impl InfoContext {
1283 pub const fn new(ctx: *mut raw::RedisModuleInfoCtx) -> Self {
1284 Self { ctx }
1285 }
1286
1287 pub fn builder(&self) -> InfoContextBuilder<'_> {
1289 InfoContextBuilder::from(self)
1290 }
1291
1292 pub fn build_one_section<T: Into<OneInfoSectionData>>(&self, data: T) -> ValkeyResult<()> {
1294 self.builder()
1295 .add_section_unchecked(data.into())
1296 .build_info()?;
1297 Ok(())
1298 }
1299
1300 #[deprecated = "Please use [`InfoContext::builder`] instead."]
1301 pub fn add_info_section(&self, name: Option<&str>) -> Status {
1304 add_info_section(self.ctx, name)
1305 }
1306
1307 #[deprecated = "Please use [`InfoContext::builder`] instead."]
1308 pub fn add_info_field_str(&self, name: &str, content: &str) -> Status {
1312 add_info_field_str(self.ctx, name, content)
1313 }
1314
1315 #[deprecated = "Please use [`InfoContext::builder`] instead."]
1316 pub fn add_info_field_long_long(&self, name: &str, value: c_longlong) -> Status {
1320 add_info_field_long_long(self.ctx, name, value)
1321 }
1322}
1323
1324bitflags! {
1325 pub struct ContextFlags : c_int {
1326 const LUA = raw::REDISMODULE_CTX_FLAGS_LUA as c_int;
1328
1329 const MULTI = raw::REDISMODULE_CTX_FLAGS_MULTI as c_int;
1331
1332 const MASTER = raw::REDISMODULE_CTX_FLAGS_MASTER as c_int;
1334
1335 const SLAVE = raw::REDISMODULE_CTX_FLAGS_SLAVE as c_int;
1337
1338 const READONLY = raw::REDISMODULE_CTX_FLAGS_READONLY as c_int;
1340
1341 const CLUSTER = raw::REDISMODULE_CTX_FLAGS_CLUSTER as c_int;
1343
1344 const AOF = raw::REDISMODULE_CTX_FLAGS_AOF as c_int;
1346
1347 const RDB = raw::REDISMODULE_CTX_FLAGS_RDB as c_int;
1349
1350 const MAXMEMORY = raw::REDISMODULE_CTX_FLAGS_MAXMEMORY as c_int;
1352
1353 const EVICTED = raw::REDISMODULE_CTX_FLAGS_EVICT as c_int;
1355
1356 const OOM = raw::REDISMODULE_CTX_FLAGS_OOM as c_int;
1358
1359 const OOM_WARNING = raw::REDISMODULE_CTX_FLAGS_OOM_WARNING as c_int;
1361
1362 const REPLICATED = raw::REDISMODULE_CTX_FLAGS_REPLICATED as c_int;
1364
1365 const LOADING = raw::REDISMODULE_CTX_FLAGS_LOADING as c_int;
1367
1368 const REPLICA_IS_STALE = raw::REDISMODULE_CTX_FLAGS_REPLICA_IS_STALE as c_int;
1370
1371 const REPLICA_IS_CONNECTING = raw::REDISMODULE_CTX_FLAGS_REPLICA_IS_CONNECTING as c_int;
1373
1374 const REPLICA_IS_TRANSFERRING = raw::REDISMODULE_CTX_FLAGS_REPLICA_IS_TRANSFERRING as c_int;
1376
1377 const REPLICA_IS_ONLINE = raw::REDISMODULE_CTX_FLAGS_REPLICA_IS_ONLINE as c_int;
1379
1380 const ACTIVE_CHILD = raw::REDISMODULE_CTX_FLAGS_ACTIVE_CHILD as c_int;
1382
1383 const IS_CHILD = raw::REDISMODULE_CTX_FLAGS_IS_CHILD as c_int;
1385
1386 const MULTI_DIRTY = raw::REDISMODULE_CTX_FLAGS_MULTI_DIRTY as c_int;
1388
1389 const DENY_BLOCKING = raw::REDISMODULE_CTX_FLAGS_DENY_BLOCKING as c_int;
1392
1393 const FLAGS_RESP3 = raw::REDISMODULE_CTX_FLAGS_RESP3 as c_int;
1395
1396 const ASYNC_LOADING = raw::REDISMODULE_CTX_FLAGS_ASYNC_LOADING as c_int;
1398 }
1399}