1use bitflags::bitflags;
2use std::collections::{BTreeMap, HashMap};
3use std::ffi::CString;
4use std::os::raw::c_void;
5use std::os::raw::{c_char, c_int, c_long, c_longlong};
6use std::ptr::{self, NonNull};
7use std::sync::atomic::{AtomicPtr, Ordering};
8use valkey_module_macros_internals::api;
9
10use crate::key::{KeyFlags, ValkeyKey, ValkeyKeyWritable};
11use crate::logging::ValkeyLogLevel;
12use crate::raw::{ModuleOptions, Version};
13use crate::redisvalue::ValkeyValueKey;
14use crate::{
15 add_info_begin_dict_field, add_info_end_dict_field, add_info_field_double,
16 add_info_field_long_long, add_info_field_str, add_info_field_unsigned_long_long, raw, utils,
17 Status,
18};
19use crate::{add_info_section, ValkeyResult};
20use crate::{ValkeyError, ValkeyString, ValkeyValue};
21use std::ops::Deref;
22
23use std::ffi::CStr;
24
25use self::call_reply::{create_promise_call_reply, CallResult, PromiseCallReply};
26use self::thread_safe::ValkeyLockIndicator;
27
28mod timer;
29
30pub mod auth;
31pub mod blocked;
32pub mod call_reply;
33pub mod client;
34pub mod commands;
35pub mod filter;
36pub mod info;
37pub mod keys_cursor;
38pub mod server_events;
39pub mod thread_safe;
40
41pub struct CallOptionsBuilder {
42 options: String,
43}
44
45impl Default for CallOptionsBuilder {
46 fn default() -> Self {
47 CallOptionsBuilder {
48 options: "v".to_string(),
49 }
50 }
51}
52
53#[derive(Clone)]
54pub struct CallOptions {
55 options: CString,
56}
57
58#[derive(Clone)]
59#[cfg(all(any(
60 feature = "min-valkey-compatibility-version-8-0",
61 feature = "min-redis-compatibility-version-7-2"
62)))]
63pub struct BlockingCallOptions {
64 options: CString,
65}
66
67#[derive(Copy, Clone)]
68pub enum CallOptionResp {
69 Resp2,
70 Resp3,
71 Auto,
72}
73
74impl CallOptionsBuilder {
75 pub fn new() -> CallOptionsBuilder {
76 Self::default()
77 }
78
79 fn add_flag(&mut self, flag: &str) {
80 self.options.push_str(flag);
81 }
82
83 pub fn no_writes(mut self) -> CallOptionsBuilder {
85 self.add_flag("W");
86 self
87 }
88
89 pub fn script_mode(mut self) -> CallOptionsBuilder {
94 self.add_flag("S");
95 self
96 }
97
98 pub fn verify_acl(mut self) -> CallOptionsBuilder {
101 self.add_flag("C");
102 self
103 }
104
105 pub fn verify_oom(mut self) -> CallOptionsBuilder {
107 self.add_flag("M");
108 self
109 }
110
111 pub fn errors_as_replies(mut self) -> CallOptionsBuilder {
114 self.add_flag("E");
115 self
116 }
117
118 pub fn replicate(mut self) -> CallOptionsBuilder {
120 self.add_flag("!");
121 self
122 }
123
124 pub fn resp(mut self, resp: CallOptionResp) -> CallOptionsBuilder {
126 match resp {
127 CallOptionResp::Auto => self.add_flag("0"),
128 CallOptionResp::Resp2 => (),
129 CallOptionResp::Resp3 => self.add_flag("3"),
130 }
131 self
132 }
133
134 pub fn build(self) -> CallOptions {
136 CallOptions {
137 options: CString::new(self.options).unwrap(), }
139 }
140
141 #[cfg(all(any(
145 feature = "min-valkey-compatibility-version-8-0",
146 feature = "min-redis-compatibility-version-7-2"
147 )))]
148 pub fn build_blocking(mut self) -> BlockingCallOptions {
149 self.add_flag("K");
150 BlockingCallOptions {
151 options: CString::new(self.options).unwrap(), }
153 }
154}
155
156pub struct DetachedContext {
160 pub(crate) ctx: AtomicPtr<raw::RedisModuleCtx>,
161}
162
163impl DetachedContext {
164 pub const fn new() -> Self {
165 DetachedContext {
166 ctx: AtomicPtr::new(ptr::null_mut()),
167 }
168 }
169}
170
171impl Default for DetachedContext {
172 fn default() -> Self {
173 Self::new()
174 }
175}
176
177pub struct DetachedContextGuard {
184 pub(crate) ctx: Context,
185}
186
187unsafe impl ValkeyLockIndicator for DetachedContextGuard {}
188
189impl Drop for DetachedContextGuard {
190 fn drop(&mut self) {
191 unsafe {
192 raw::RedisModule_ThreadSafeContextUnlock.unwrap()(self.ctx.ctx);
193 };
194 }
195}
196
197impl Deref for DetachedContextGuard {
198 type Target = Context;
199
200 fn deref(&self) -> &Self::Target {
201 &self.ctx
202 }
203}
204
205impl DetachedContext {
206 pub fn log(&self, level: ValkeyLogLevel, message: &str) {
207 let c = self.ctx.load(Ordering::Relaxed);
208 crate::logging::log_internal(c, level, message);
209 }
210
211 pub fn log_debug(&self, message: &str) {
212 self.log(ValkeyLogLevel::Debug, message);
213 }
214
215 pub fn log_notice(&self, message: &str) {
216 self.log(ValkeyLogLevel::Notice, message);
217 }
218
219 pub fn log_verbose(&self, message: &str) {
220 self.log(ValkeyLogLevel::Verbose, message);
221 }
222
223 pub fn log_warning(&self, message: &str) {
224 self.log(ValkeyLogLevel::Warning, message);
225 }
226
227 pub fn set_context(&self, ctx: &Context) -> Result<(), ValkeyError> {
228 let c = self.ctx.load(Ordering::Relaxed);
229 if !c.is_null() {
230 return Err(ValkeyError::Str("Detached context is already set"));
231 }
232 let ctx = unsafe { raw::RedisModule_GetDetachedThreadSafeContext.unwrap()(ctx.ctx) };
233 self.ctx.store(ctx, Ordering::Relaxed);
234 Ok(())
235 }
236
237 pub fn lock(&self) -> DetachedContextGuard {
242 let c = self.ctx.load(Ordering::Relaxed);
243 unsafe { raw::RedisModule_ThreadSafeContextLock.unwrap()(c) };
244 let ctx = Context::new(c);
245 DetachedContextGuard { ctx }
246 }
247}
248
249unsafe impl Send for DetachedContext {}
250unsafe impl Sync for DetachedContext {}
251
252#[derive(Debug)]
255pub struct Context {
256 pub ctx: *mut raw::RedisModuleCtx,
257}
258
259#[derive(Debug)]
265pub struct ContextUserScope<'ctx> {
266 ctx: &'ctx Context,
267 user: *mut raw::RedisModuleUser,
268}
269
270impl<'ctx> Drop for ContextUserScope<'ctx> {
271 fn drop(&mut self) {
272 self.ctx.deautenticate_user();
273 unsafe { raw::RedisModule_FreeModuleUser.unwrap()(self.user) };
274 }
275}
276
277impl<'ctx> ContextUserScope<'ctx> {
278 fn new(ctx: &'ctx Context, user: *mut raw::RedisModuleUser) -> ContextUserScope<'ctx> {
279 ContextUserScope { ctx, user }
280 }
281}
282
283pub struct StrCallArgs<'a> {
284 is_owner: bool,
285 args: Vec<*mut raw::RedisModuleString>,
286 phantom: std::marker::PhantomData<&'a raw::RedisModuleString>,
288}
289
290impl<'a> Drop for StrCallArgs<'a> {
291 fn drop(&mut self) {
292 if self.is_owner {
293 self.args.iter_mut().for_each(|v| unsafe {
294 raw::RedisModule_FreeString.unwrap()(std::ptr::null_mut(), *v)
295 });
296 }
297 }
298}
299
300impl<'a, T: AsRef<[u8]> + ?Sized> From<&'a [&T]> for StrCallArgs<'a> {
301 fn from(vals: &'a [&T]) -> Self {
302 StrCallArgs {
303 is_owner: true,
304 args: vals
305 .iter()
306 .map(|v| ValkeyString::create_from_slice(std::ptr::null_mut(), v.as_ref()).take())
307 .collect(),
308 phantom: std::marker::PhantomData,
309 }
310 }
311}
312
313impl<'a> From<&'a [&ValkeyString]> for StrCallArgs<'a> {
314 fn from(vals: &'a [&ValkeyString]) -> Self {
315 StrCallArgs {
316 is_owner: false,
317 args: vals.iter().map(|v| v.inner).collect(),
318 phantom: std::marker::PhantomData,
319 }
320 }
321}
322
323impl<'a, const SIZE: usize, T: ?Sized> From<&'a [&T; SIZE]> for StrCallArgs<'a>
324where
325 for<'b> &'a [&'b T]: Into<StrCallArgs<'a>>,
326{
327 fn from(vals: &'a [&T; SIZE]) -> Self {
328 vals.as_ref().into()
329 }
330}
331
332impl<'a> StrCallArgs<'a> {
333 pub(crate) fn args_mut(&mut self) -> &mut [*mut raw::RedisModuleString] {
334 &mut self.args
335 }
336}
337
338impl Context {
339 pub const fn new(ctx: *mut raw::RedisModuleCtx) -> Self {
340 Self { ctx }
341 }
342
343 #[must_use]
344 pub const fn dummy() -> Self {
345 Self {
346 ctx: ptr::null_mut(),
347 }
348 }
349
350 pub fn log(&self, level: ValkeyLogLevel, message: &str) {
351 crate::logging::log_internal(self.ctx, level, message);
352 }
353
354 pub fn log_debug(&self, message: &str) {
355 self.log(ValkeyLogLevel::Debug, message);
356 }
357
358 pub fn log_notice(&self, message: &str) {
359 self.log(ValkeyLogLevel::Notice, message);
360 }
361
362 pub fn log_verbose(&self, message: &str) {
363 self.log(ValkeyLogLevel::Verbose, message);
364 }
365
366 pub fn log_warning(&self, message: &str) {
367 self.log(ValkeyLogLevel::Warning, message);
368 }
369
370 pub fn auto_memory(&self) {
374 unsafe {
375 raw::RedisModule_AutoMemory.unwrap()(self.ctx);
376 }
377 }
378
379 #[must_use]
383 pub fn is_keys_position_request(&self) -> bool {
384 if cfg!(test) {
386 return false;
387 }
388
389 (unsafe { raw::RedisModule_IsKeysPositionRequest.unwrap()(self.ctx) }) != 0
390 }
391
392 pub fn key_at_pos(&self, pos: i32) {
396 unsafe {
399 raw::RedisModule_KeyAtPos.unwrap()(self.ctx, pos as c_int);
400 }
401 }
402
403 fn call_internal<
404 'ctx,
405 'a,
406 T: Into<StrCallArgs<'a>>,
407 R: From<PromiseCallReply<'static, 'ctx>>,
408 >(
409 &'ctx self,
410 command: &str,
411 fmt: *const c_char,
412 args: T,
413 ) -> R {
414 let mut call_args: StrCallArgs = args.into();
415 let final_args = call_args.args_mut();
416
417 let cmd = CString::new(command).unwrap();
418 let reply: *mut raw::RedisModuleCallReply = unsafe {
419 let p_call = raw::RedisModule_Call.unwrap();
420 p_call(
421 self.ctx,
422 cmd.as_ptr(),
423 fmt,
424 final_args.as_mut_ptr(),
425 final_args.len(),
426 )
427 };
428 let promise = create_promise_call_reply(self, NonNull::new(reply));
429 R::from(promise)
430 }
431
432 pub fn call<'a, T: Into<StrCallArgs<'a>>>(&self, command: &str, args: T) -> ValkeyResult {
433 self.call_internal::<_, CallResult>(command, raw::FMT, args)
434 .map_or_else(|e| Err(e.into()), |v| Ok((&v).into()))
435 }
436
437 pub fn call_ext<'a, T: Into<StrCallArgs<'a>>, R: From<CallResult<'static>>>(
441 &self,
442 command: &str,
443 options: &CallOptions,
444 args: T,
445 ) -> R {
446 let res: CallResult<'static> =
447 self.call_internal(command, options.options.as_ptr() as *const c_char, args);
448 R::from(res)
449 }
450
451 #[cfg(all(any(
453 feature = "min-valkey-compatibility-version-8-0",
454 feature = "min-redis-compatibility-version-7-2"
455 )))]
456 pub fn call_blocking<
457 'ctx,
458 'a,
459 T: Into<StrCallArgs<'a>>,
460 R: From<PromiseCallReply<'static, 'ctx>>,
461 >(
462 &'ctx self,
463 command: &str,
464 options: &BlockingCallOptions,
465 args: T,
466 ) -> R {
467 self.call_internal(command, options.options.as_ptr() as *const c_char, args)
468 }
469
470 #[must_use]
471 pub fn str_as_legal_resp_string(s: &str) -> CString {
472 CString::new(
473 s.chars()
474 .map(|c| match c {
475 '\r' | '\n' | '\0' => b' ',
476 _ => c as u8,
477 })
478 .collect::<Vec<_>>(),
479 )
480 .unwrap()
481 }
482
483 #[allow(clippy::must_use_candidate)]
484 pub fn reply_simple_string(&self, s: &str) -> raw::Status {
485 let msg = Self::str_as_legal_resp_string(s);
486 raw::reply_with_simple_string(self.ctx, msg.as_ptr())
487 }
488
489 #[allow(clippy::must_use_candidate)]
490 pub fn reply_error_string(&self, s: &str) -> raw::Status {
491 let msg = Self::str_as_legal_resp_string(s);
492 unsafe { raw::RedisModule_ReplyWithError.unwrap()(self.ctx, msg.as_ptr()).into() }
493 }
494
495 #[cfg(feature = "min-valkey-compatibility-version-8-0")]
496 pub fn add_acl_category(&self, s: &str) -> raw::Status {
497 let acl_flags = Self::str_as_legal_resp_string(s);
498 unsafe { raw::RedisModule_AddACLCategory.unwrap()(self.ctx, acl_flags.as_ptr()).into() }
499 }
500
501 #[cfg(all(any(
502 feature = "min-redis-compatibility-version-7-2",
503 feature = "min-valkey-compatibility-version-8-0"
504 ),))]
505 pub fn set_acl_category(
506 &self,
507 command_name: *const c_char,
508 acl_flags: *const c_char,
509 ) -> raw::Status {
510 unsafe {
511 let command = raw::RedisModule_GetCommand.unwrap()(self.ctx, command_name);
512 raw::RedisModule_SetCommandACLCategories.unwrap()(command, acl_flags).into()
513 }
514 }
515
516 pub fn reply_with_key(&self, result: ValkeyValueKey) -> raw::Status {
517 match result {
518 ValkeyValueKey::Integer(i) => raw::reply_with_long_long(self.ctx, i),
519 ValkeyValueKey::String(s) => {
520 raw::reply_with_string_buffer(self.ctx, s.as_ptr().cast::<c_char>(), s.len())
521 }
522 ValkeyValueKey::BulkString(b) => {
523 raw::reply_with_string_buffer(self.ctx, b.as_ptr().cast::<c_char>(), b.len())
524 }
525 ValkeyValueKey::BulkValkeyString(s) => raw::reply_with_string(self.ctx, s.inner),
526 ValkeyValueKey::Bool(b) => raw::reply_with_bool(self.ctx, b.into()),
527 }
528 }
529
530 #[allow(clippy::must_use_candidate)]
534 pub fn reply(&self, result: ValkeyResult) -> raw::Status {
535 match result {
536 Ok(ValkeyValue::Bool(v)) => raw::reply_with_bool(self.ctx, v.into()),
537 Ok(ValkeyValue::Integer(v)) => raw::reply_with_long_long(self.ctx, v),
538 Ok(ValkeyValue::Float(v)) => raw::reply_with_double(self.ctx, v),
539 Ok(ValkeyValue::SimpleStringStatic(s)) => {
540 let msg = CString::new(s).unwrap();
541 raw::reply_with_simple_string(self.ctx, msg.as_ptr())
542 }
543
544 Ok(ValkeyValue::SimpleString(s)) => {
545 let msg = CString::new(s).unwrap();
546 raw::reply_with_simple_string(self.ctx, msg.as_ptr())
547 }
548
549 Ok(ValkeyValue::BulkString(s)) => {
550 raw::reply_with_string_buffer(self.ctx, s.as_ptr().cast::<c_char>(), s.len())
551 }
552
553 Ok(ValkeyValue::BigNumber(s)) => {
554 raw::reply_with_big_number(self.ctx, s.as_ptr().cast::<c_char>(), s.len())
555 }
556
557 Ok(ValkeyValue::VerbatimString((format, data))) => raw::reply_with_verbatim_string(
558 self.ctx,
559 data.as_ptr().cast(),
560 data.len(),
561 format.0.as_ptr().cast(),
562 ),
563
564 Ok(ValkeyValue::BulkValkeyString(s)) => raw::reply_with_string(self.ctx, s.inner),
565
566 Ok(ValkeyValue::StringBuffer(s)) => {
567 raw::reply_with_string_buffer(self.ctx, s.as_ptr().cast::<c_char>(), s.len())
568 }
569
570 Ok(ValkeyValue::Array(array)) => {
571 raw::reply_with_array(self.ctx, array.len() as c_long);
572
573 for elem in array {
574 self.reply(Ok(elem));
575 }
576
577 raw::Status::Ok
578 }
579
580 Ok(ValkeyValue::Map(map)) => {
581 raw::reply_with_map(self.ctx, map.len() as c_long);
582
583 for (key, value) in map {
584 self.reply_with_key(key);
585 self.reply(Ok(value));
586 }
587
588 raw::Status::Ok
589 }
590
591 Ok(ValkeyValue::OrderedMap(map)) => {
592 raw::reply_with_map(self.ctx, map.len() as c_long);
593
594 for (key, value) in map {
595 self.reply_with_key(key);
596 self.reply(Ok(value));
597 }
598
599 raw::Status::Ok
600 }
601
602 Ok(ValkeyValue::Set(set)) => {
603 raw::reply_with_set(self.ctx, set.len() as c_long);
604 set.into_iter().for_each(|e| {
605 self.reply_with_key(e);
606 });
607
608 raw::Status::Ok
609 }
610
611 Ok(ValkeyValue::OrderedSet(set)) => {
612 raw::reply_with_set(self.ctx, set.len() as c_long);
613 set.into_iter().for_each(|e| {
614 self.reply_with_key(e);
615 });
616
617 raw::Status::Ok
618 }
619
620 Ok(ValkeyValue::Null) => raw::reply_with_null(self.ctx),
621
622 Ok(ValkeyValue::NoReply) => raw::Status::Ok,
623
624 Ok(ValkeyValue::StaticError(s)) => self.reply_error_string(s),
625
626 Err(ValkeyError::WrongArity) => unsafe {
627 if self.is_keys_position_request() {
628 raw::Status::Err
630 } else {
631 raw::RedisModule_WrongArity.unwrap()(self.ctx).into()
632 }
633 },
634
635 Err(ValkeyError::WrongType) => {
636 self.reply_error_string(ValkeyError::WrongType.to_string().as_str())
637 }
638
639 Err(ValkeyError::String(s)) => self.reply_error_string(s.as_str()),
640
641 Err(ValkeyError::Str(s)) => self.reply_error_string(s),
642 }
643 }
644
645 #[must_use]
646 pub fn open_key(&self, key: &ValkeyString) -> ValkeyKey {
647 ValkeyKey::open(self.ctx, key)
648 }
649
650 #[must_use]
651 pub fn open_key_with_flags(&self, key: &ValkeyString, flags: KeyFlags) -> ValkeyKey {
652 ValkeyKey::open_with_flags(self.ctx, key, flags)
653 }
654
655 #[must_use]
656 pub fn open_key_writable(&self, key: &ValkeyString) -> ValkeyKeyWritable {
657 ValkeyKeyWritable::open(self.ctx, key)
658 }
659
660 #[must_use]
661 pub fn open_key_writable_with_flags(
662 &self,
663 key: &ValkeyString,
664 flags: KeyFlags,
665 ) -> ValkeyKeyWritable {
666 ValkeyKeyWritable::open_with_flags(self.ctx, key, flags)
667 }
668
669 pub fn replicate_verbatim(&self) {
670 raw::replicate_verbatim(self.ctx);
671 }
672
673 pub fn replicate<'a, T: Into<StrCallArgs<'a>>>(&self, command: &str, args: T) {
675 raw::replicate(self.ctx, command, args);
676 }
677
678 #[must_use]
679 pub fn create_string<T: Into<Vec<u8>>>(&self, s: T) -> ValkeyString {
680 ValkeyString::create(NonNull::new(self.ctx), s)
681 }
682
683 #[must_use]
684 pub const fn get_raw(&self) -> *mut raw::RedisModuleCtx {
685 self.ctx
686 }
687
688 pub unsafe fn export_shared_api(
692 &self,
693 func: *const ::std::os::raw::c_void,
694 name: *const ::std::os::raw::c_char,
695 ) {
696 raw::export_shared_api(self.ctx, func, name);
697 }
698
699 #[allow(clippy::must_use_candidate)]
703 pub fn notify_keyspace_event(
704 &self,
705 event_type: raw::NotifyEvent,
706 event: &str,
707 keyname: &ValkeyString,
708 ) -> raw::Status {
709 unsafe { raw::notify_keyspace_event(self.ctx, event_type, event, keyname) }
710 }
711
712 pub fn current_command_name(&self) -> Result<String, ValkeyError> {
713 unsafe {
714 match raw::RedisModule_GetCurrentCommandName {
715 Some(cmd) => Ok(CStr::from_ptr(cmd(self.ctx)).to_str().unwrap().to_string()),
716 None => Err(ValkeyError::Str(
717 "API RedisModule_GetCurrentCommandName is not available",
718 )),
719 }
720 }
721 }
722
723 pub fn get_server_version(&self) -> Result<Version, ValkeyError> {
726 self.get_server_version_internal(false)
727 }
728
729 pub fn get_server_version_rm_call(&self) -> Result<Version, ValkeyError> {
731 self.get_server_version_internal(true)
732 }
733
734 pub fn version_from_info(info: ValkeyValue) -> Result<Version, ValkeyError> {
735 if let ValkeyValue::SimpleString(info_str) = info {
736 if let Some(ver) = utils::get_regexp_captures(
737 info_str.as_str(),
738 r"(?m)\bredis_version:([0-9]+)\.([0-9]+)\.([0-9]+)\b",
739 ) {
740 return Ok(Version {
741 major: ver[0].parse::<c_int>().unwrap(),
742 minor: ver[1].parse::<c_int>().unwrap(),
743 patch: ver[2].parse::<c_int>().unwrap(),
744 });
745 }
746 }
747 Err(ValkeyError::Str("Error getting redis_version"))
748 }
749
750 #[allow(clippy::not_unsafe_ptr_arg_deref)]
751 fn get_server_version_internal(&self, force_use_rm_call: bool) -> Result<Version, ValkeyError> {
752 match unsafe { raw::RedisModule_GetServerVersion } {
753 Some(api) if !force_use_rm_call => {
754 Ok(Version::from(unsafe { api() }))
756 }
757 _ => {
758 if let Ok(info) = self.call("info", &["server"]) {
760 Self::version_from_info(info)
761 } else {
762 Err(ValkeyError::Str("Error calling \"info server\""))
763 }
764 }
765 }
766 }
767 pub fn set_module_options(&self, options: ModuleOptions) {
768 unsafe { raw::RedisModule_SetModuleOptions.unwrap()(self.ctx, options.bits()) };
769 }
770
771 pub fn get_flags(&self) -> ContextFlags {
777 ContextFlags::from_bits_truncate(unsafe {
778 raw::RedisModule_GetContextFlags.unwrap()(self.ctx)
779 })
780 }
781
782 pub fn get_current_user(&self) -> ValkeyString {
784 let user = unsafe { raw::RedisModule_GetCurrentUserName.unwrap()(self.ctx) };
785 ValkeyString::from_redis_module_string(ptr::null_mut(), user)
786 }
787
788 pub fn authenticate_user(
793 &self,
794 user_name: &ValkeyString,
795 ) -> Result<ContextUserScope<'_>, ValkeyError> {
796 let user = unsafe { raw::RedisModule_GetModuleUserFromUserName.unwrap()(user_name.inner) };
797 if user.is_null() {
798 return Err(ValkeyError::Str("User does not exists or disabled"));
799 }
800 unsafe { raw::RedisModule_SetContextUser.unwrap()(self.ctx, user) };
801 Ok(ContextUserScope::new(self, user))
802 }
803
804 fn deautenticate_user(&self) {
805 unsafe { raw::RedisModule_SetContextUser.unwrap()(self.ctx, ptr::null_mut()) };
806 }
807
808 pub fn acl_check_key_permission(
812 &self,
813 user_name: &ValkeyString,
814 key_name: &ValkeyString,
815 permissions: &AclPermissions,
816 ) -> Result<(), ValkeyError> {
817 let user = unsafe { raw::RedisModule_GetModuleUserFromUserName.unwrap()(user_name.inner) };
818 if user.is_null() {
819 return Err(ValkeyError::Str("User does not exists or disabled"));
820 }
821 let acl_permission_result: raw::Status = unsafe {
822 raw::RedisModule_ACLCheckKeyPermissions.unwrap()(
823 user,
824 key_name.inner,
825 permissions.bits(),
826 )
827 }
828 .into();
829 unsafe { raw::RedisModule_FreeModuleUser.unwrap()(user) };
830 let acl_permission_result: Result<(), &str> = acl_permission_result.into();
831 acl_permission_result
832 .map_err(|_e| ValkeyError::Str("User does not have permissions on key"))
833 }
834
835 api!(
836 [RedisModule_AddPostNotificationJob],
837 pub fn add_post_notification_job<F: FnOnce(&Context) + 'static>(
850 &self,
851 callback: F,
852 ) -> Status {
853 let callback = Box::into_raw(Box::new(Some(callback)));
854 unsafe {
855 RedisModule_AddPostNotificationJob(
856 self.ctx,
857 Some(post_notification_job::<F>),
858 callback as *mut c_void,
859 Some(post_notification_job_free_callback::<F>),
860 )
861 }
862 .into()
863 }
864 );
865
866 api!(
867 [RedisModule_AvoidReplicaTraffic],
868 pub fn avoid_replication_traffic(&self) -> bool {
884 unsafe { RedisModule_AvoidReplicaTraffic() == 1 }
885 }
886 );
887}
888
889extern "C" fn post_notification_job_free_callback<F: FnOnce(&Context)>(pd: *mut c_void) {
890 unsafe {
891 drop(Box::from_raw(pd as *mut Option<F>));
892 };
893}
894
895extern "C" fn post_notification_job<F: FnOnce(&Context)>(
896 ctx: *mut raw::RedisModuleCtx,
897 pd: *mut c_void,
898) {
899 let callback = unsafe { &mut *(pd as *mut Option<F>) };
900 let ctx = Context::new(ctx);
901 callback.take().map_or_else(
902 || {
903 ctx.log(
904 ValkeyLogLevel::Warning,
905 "Got a None callback on post notification job.",
906 )
907 },
908 |callback| {
909 callback(&ctx);
910 },
911 );
912}
913
914unsafe impl ValkeyLockIndicator for Context {}
915
916bitflags! {
917 #[derive(Debug)]
920 pub struct AclPermissions : c_int {
921 const ACCESS = raw::REDISMODULE_CMD_KEY_ACCESS as c_int;
923
924 const INSERT = raw::REDISMODULE_CMD_KEY_INSERT as c_int;
926
927 const DELETE = raw::REDISMODULE_CMD_KEY_DELETE as c_int;
929
930 const UPDATE = raw::REDISMODULE_CMD_KEY_UPDATE as c_int;
932 }
933}
934
935#[derive(Debug, Clone)]
937pub enum InfoContextBuilderFieldBottomLevelValue {
938 String(String),
940 I64(i64),
942 U64(u64),
944 F64(f64),
946}
947
948impl From<String> for InfoContextBuilderFieldBottomLevelValue {
949 fn from(value: String) -> Self {
950 Self::String(value)
951 }
952}
953
954impl From<&str> for InfoContextBuilderFieldBottomLevelValue {
955 fn from(value: &str) -> Self {
956 Self::String(value.to_owned())
957 }
958}
959
960impl From<i64> for InfoContextBuilderFieldBottomLevelValue {
961 fn from(value: i64) -> Self {
962 Self::I64(value)
963 }
964}
965
966impl From<u64> for InfoContextBuilderFieldBottomLevelValue {
967 fn from(value: u64) -> Self {
968 Self::U64(value)
969 }
970}
971
972#[derive(Debug, Clone)]
973pub enum InfoContextBuilderFieldTopLevelValue {
974 Value(InfoContextBuilderFieldBottomLevelValue),
976 Dictionary {
1000 name: String,
1001 fields: InfoContextFieldBottomLevelData,
1002 },
1003}
1004
1005impl<T: Into<InfoContextBuilderFieldBottomLevelValue>> From<T>
1006 for InfoContextBuilderFieldTopLevelValue
1007{
1008 fn from(value: T) -> Self {
1009 Self::Value(value.into())
1010 }
1011}
1012
1013#[derive(Debug)]
1016pub struct InfoContextBuilderDictionaryBuilder<'a> {
1017 info_section_builder: InfoContextBuilderSectionBuilder<'a>,
1019 name: String,
1021 fields: InfoContextFieldBottomLevelData,
1023}
1024
1025impl<'a> InfoContextBuilderDictionaryBuilder<'a> {
1026 pub fn field<F: Into<InfoContextBuilderFieldBottomLevelValue>>(
1028 mut self,
1029 name: &str,
1030 value: F,
1031 ) -> ValkeyResult<Self> {
1032 if self.fields.iter().any(|k| k.0 .0 == name) {
1033 return Err(ValkeyError::String(format!(
1034 "Found duplicate key '{name}' in the info dictionary '{}'",
1035 self.name
1036 )));
1037 }
1038
1039 self.fields.push((name.to_owned(), value.into()).into());
1040 Ok(self)
1041 }
1042
1043 pub fn build_dictionary(self) -> ValkeyResult<InfoContextBuilderSectionBuilder<'a>> {
1045 let name = self.name;
1046 let name_ref = name.clone();
1047 self.info_section_builder.field(
1048 &name_ref,
1049 InfoContextBuilderFieldTopLevelValue::Dictionary {
1050 name,
1051 fields: self.fields.to_owned(),
1052 },
1053 )
1054 }
1055}
1056
1057#[derive(Debug)]
1059pub struct InfoContextBuilderSectionBuilder<'a> {
1060 info_builder: InfoContextBuilder<'a>,
1062 name: String,
1064 fields: InfoContextFieldTopLevelData,
1066}
1067
1068impl<'a> InfoContextBuilderSectionBuilder<'a> {
1069 pub fn field<F: Into<InfoContextBuilderFieldTopLevelValue>>(
1071 mut self,
1072 name: &str,
1073 value: F,
1074 ) -> ValkeyResult<Self> {
1075 if self.fields.iter().any(|(k, _)| k == name) {
1076 return Err(ValkeyError::String(format!(
1077 "Found duplicate key '{name}' in the info section '{}'",
1078 self.name
1079 )));
1080 }
1081 self.fields.push((name.to_owned(), value.into()));
1082 Ok(self)
1083 }
1084
1085 pub fn add_dictionary(self, dictionary_name: &str) -> InfoContextBuilderDictionaryBuilder<'a> {
1087 InfoContextBuilderDictionaryBuilder {
1088 info_section_builder: self,
1089 name: dictionary_name.to_owned(),
1090 fields: InfoContextFieldBottomLevelData::default(),
1091 }
1092 }
1093
1094 pub fn build_section(mut self) -> ValkeyResult<InfoContextBuilder<'a>> {
1096 if self
1097 .info_builder
1098 .sections
1099 .iter()
1100 .any(|(k, _)| k == &self.name)
1101 {
1102 return Err(ValkeyError::String(format!(
1103 "Found duplicate section in the Info reply: {}",
1104 self.name
1105 )));
1106 }
1107
1108 self.info_builder
1109 .sections
1110 .push((self.name.clone(), self.fields));
1111
1112 Ok(self.info_builder)
1113 }
1114}
1115
1116#[derive(Debug, Clone)]
1118#[repr(transparent)]
1119pub struct InfoContextBottomLevelFieldData(pub (String, InfoContextBuilderFieldBottomLevelValue));
1120impl Deref for InfoContextBottomLevelFieldData {
1121 type Target = (String, InfoContextBuilderFieldBottomLevelValue);
1122
1123 fn deref(&self) -> &Self::Target {
1124 &self.0
1125 }
1126}
1127impl std::ops::DerefMut for InfoContextBottomLevelFieldData {
1128 fn deref_mut(&mut self) -> &mut Self::Target {
1129 &mut self.0
1130 }
1131}
1132
1133impl<T: Into<InfoContextBuilderFieldBottomLevelValue>> From<(String, T)>
1134 for InfoContextBottomLevelFieldData
1135{
1136 fn from(value: (String, T)) -> Self {
1137 Self((value.0, value.1.into()))
1138 }
1139}
1140#[derive(Debug, Default, Clone)]
1143#[repr(transparent)]
1144pub struct InfoContextFieldBottomLevelData(pub Vec<InfoContextBottomLevelFieldData>);
1145impl Deref for InfoContextFieldBottomLevelData {
1146 type Target = Vec<InfoContextBottomLevelFieldData>;
1147
1148 fn deref(&self) -> &Self::Target {
1149 &self.0
1150 }
1151}
1152impl std::ops::DerefMut for InfoContextFieldBottomLevelData {
1153 fn deref_mut(&mut self) -> &mut Self::Target {
1154 &mut self.0
1155 }
1156}
1157
1158pub type InfoContextFieldTopLevelData = Vec<(String, InfoContextBuilderFieldTopLevelValue)>;
1161pub type OneInfoSectionData = (String, InfoContextFieldTopLevelData);
1163pub type InfoContextTreeData = Vec<OneInfoSectionData>;
1165
1166impl<T: Into<InfoContextBuilderFieldBottomLevelValue>> From<BTreeMap<String, T>>
1167 for InfoContextFieldBottomLevelData
1168{
1169 fn from(value: BTreeMap<String, T>) -> Self {
1170 Self(
1171 value
1172 .into_iter()
1173 .map(|e| (e.0, e.1.into()).into())
1174 .collect(),
1175 )
1176 }
1177}
1178
1179impl<T: Into<InfoContextBuilderFieldBottomLevelValue>> From<HashMap<String, T>>
1180 for InfoContextFieldBottomLevelData
1181{
1182 fn from(value: HashMap<String, T>) -> Self {
1183 Self(
1184 value
1185 .into_iter()
1186 .map(|e| (e.0, e.1.into()).into())
1187 .collect(),
1188 )
1189 }
1190}
1191
1192#[derive(Debug)]
1193pub struct InfoContextBuilder<'a> {
1194 context: &'a InfoContext,
1195 sections: InfoContextTreeData,
1196}
1197impl<'a> InfoContextBuilder<'a> {
1198 fn add_bottom_level_field(
1199 &self,
1200 key: &str,
1201 value: &InfoContextBuilderFieldBottomLevelValue,
1202 ) -> ValkeyResult<()> {
1203 use InfoContextBuilderFieldBottomLevelValue as BottomLevel;
1204
1205 match value {
1206 BottomLevel::String(string) => add_info_field_str(self.context.ctx, key, string),
1207 BottomLevel::I64(number) => add_info_field_long_long(self.context.ctx, key, *number),
1208 BottomLevel::U64(number) => {
1209 add_info_field_unsigned_long_long(self.context.ctx, key, *number)
1210 }
1211 BottomLevel::F64(number) => add_info_field_double(self.context.ctx, key, *number),
1212 }
1213 .into()
1214 }
1215 fn add_top_level_fields(&self, fields: &InfoContextFieldTopLevelData) -> ValkeyResult<()> {
1218 use InfoContextBuilderFieldTopLevelValue as TopLevel;
1219
1220 fields.iter().try_for_each(|(key, value)| match value {
1221 TopLevel::Value(bottom_level) => self.add_bottom_level_field(key, bottom_level),
1222 TopLevel::Dictionary { name, fields } => {
1223 std::convert::Into::<ValkeyResult<()>>::into(add_info_begin_dict_field(
1224 self.context.ctx,
1225 name,
1226 ))?;
1227 fields
1228 .iter()
1229 .try_for_each(|f| self.add_bottom_level_field(&f.0 .0, &f.0 .1))?;
1230 add_info_end_dict_field(self.context.ctx).into()
1231 }
1232 })
1233 }
1234
1235 fn finalise_data(&self) -> ValkeyResult<()> {
1236 self.sections
1237 .iter()
1238 .try_for_each(|(section_name, section_fields)| -> ValkeyResult<()> {
1239 if add_info_section(self.context.ctx, Some(section_name)) == Status::Ok {
1240 self.add_top_level_fields(section_fields)
1241 } else {
1242 Ok(())
1244 }
1245 })
1246 }
1247
1248 pub fn build_info(self) -> ValkeyResult<&'a InfoContext> {
1250 self.finalise_data().map(|_| self.context)
1251 }
1252
1253 pub fn add_section(self, name: &'a str) -> InfoContextBuilderSectionBuilder {
1255 InfoContextBuilderSectionBuilder {
1256 info_builder: self,
1257 name: name.to_owned(),
1258 fields: InfoContextFieldTopLevelData::new(),
1259 }
1260 }
1261
1262 pub(crate) fn add_section_unchecked(mut self, section: OneInfoSectionData) -> Self {
1265 self.sections.push(section);
1266 self
1267 }
1268}
1269
1270impl<'a> From<&'a InfoContext> for InfoContextBuilder<'a> {
1271 fn from(context: &'a InfoContext) -> Self {
1272 Self {
1273 context,
1274 sections: InfoContextTreeData::new(),
1275 }
1276 }
1277}
1278
1279#[derive(Debug)]
1280pub struct InfoContext {
1281 pub ctx: *mut raw::RedisModuleInfoCtx,
1282}
1283
1284impl InfoContext {
1285 pub const fn new(ctx: *mut raw::RedisModuleInfoCtx) -> Self {
1286 Self { ctx }
1287 }
1288
1289 pub fn builder(&self) -> InfoContextBuilder<'_> {
1291 InfoContextBuilder::from(self)
1292 }
1293
1294 pub fn build_one_section<T: Into<OneInfoSectionData>>(&self, data: T) -> ValkeyResult<()> {
1296 self.builder()
1297 .add_section_unchecked(data.into())
1298 .build_info()?;
1299 Ok(())
1300 }
1301
1302 #[deprecated = "Please use [`InfoContext::builder`] instead."]
1303 pub fn add_info_section(&self, name: Option<&str>) -> Status {
1306 add_info_section(self.ctx, name)
1307 }
1308
1309 #[deprecated = "Please use [`InfoContext::builder`] instead."]
1310 pub fn add_info_field_str(&self, name: &str, content: &str) -> Status {
1314 add_info_field_str(self.ctx, name, content)
1315 }
1316
1317 #[deprecated = "Please use [`InfoContext::builder`] instead."]
1318 pub fn add_info_field_long_long(&self, name: &str, value: c_longlong) -> Status {
1322 add_info_field_long_long(self.ctx, name, value)
1323 }
1324}
1325
1326bitflags! {
1327 pub struct ContextFlags : c_int {
1328 const LUA = raw::REDISMODULE_CTX_FLAGS_LUA as c_int;
1330
1331 const MULTI = raw::REDISMODULE_CTX_FLAGS_MULTI as c_int;
1333
1334 const MASTER = raw::REDISMODULE_CTX_FLAGS_MASTER as c_int;
1336
1337 const SLAVE = raw::REDISMODULE_CTX_FLAGS_SLAVE as c_int;
1339
1340 const READONLY = raw::REDISMODULE_CTX_FLAGS_READONLY as c_int;
1342
1343 const CLUSTER = raw::REDISMODULE_CTX_FLAGS_CLUSTER as c_int;
1345
1346 const AOF = raw::REDISMODULE_CTX_FLAGS_AOF as c_int;
1348
1349 const RDB = raw::REDISMODULE_CTX_FLAGS_RDB as c_int;
1351
1352 const MAXMEMORY = raw::REDISMODULE_CTX_FLAGS_MAXMEMORY as c_int;
1354
1355 const EVICTED = raw::REDISMODULE_CTX_FLAGS_EVICT as c_int;
1357
1358 const OOM = raw::REDISMODULE_CTX_FLAGS_OOM as c_int;
1360
1361 const OOM_WARNING = raw::REDISMODULE_CTX_FLAGS_OOM_WARNING as c_int;
1363
1364 const REPLICATED = raw::REDISMODULE_CTX_FLAGS_REPLICATED as c_int;
1366
1367 const LOADING = raw::REDISMODULE_CTX_FLAGS_LOADING as c_int;
1369
1370 const REPLICA_IS_STALE = raw::REDISMODULE_CTX_FLAGS_REPLICA_IS_STALE as c_int;
1372
1373 const REPLICA_IS_CONNECTING = raw::REDISMODULE_CTX_FLAGS_REPLICA_IS_CONNECTING as c_int;
1375
1376 const REPLICA_IS_TRANSFERRING = raw::REDISMODULE_CTX_FLAGS_REPLICA_IS_TRANSFERRING as c_int;
1378
1379 const REPLICA_IS_ONLINE = raw::REDISMODULE_CTX_FLAGS_REPLICA_IS_ONLINE as c_int;
1381
1382 const ACTIVE_CHILD = raw::REDISMODULE_CTX_FLAGS_ACTIVE_CHILD as c_int;
1384
1385 const IS_CHILD = raw::REDISMODULE_CTX_FLAGS_IS_CHILD as c_int;
1387
1388 const MULTI_DIRTY = raw::REDISMODULE_CTX_FLAGS_MULTI_DIRTY as c_int;
1390
1391 const DENY_BLOCKING = raw::REDISMODULE_CTX_FLAGS_DENY_BLOCKING as c_int;
1394
1395 const FLAGS_RESP3 = raw::REDISMODULE_CTX_FLAGS_RESP3 as c_int;
1397
1398 const ASYNC_LOADING = raw::REDISMODULE_CTX_FLAGS_ASYNC_LOADING as c_int;
1400 }
1401}