1use bitflags::bitflags;
2use std::collections::{BTreeMap, HashMap};
3use std::ffi::CString;
4use std::os::raw::c_void;
5use std::os::raw::{c_char, c_int, c_long, c_longlong};
6use std::ptr::{self, NonNull};
7use std::sync::atomic::{AtomicPtr, Ordering};
8use valkey_module_macros_internals::api;
9
10use crate::key::{KeyFlags, ValkeyKey, ValkeyKeyWritable};
11use crate::logging::ValkeyLogLevel;
12use crate::raw::{ModuleOptions, Version};
13use crate::redisvalue::ValkeyValueKey;
14use crate::{
15 add_info_begin_dict_field, add_info_end_dict_field, add_info_field_double,
16 add_info_field_long_long, add_info_field_str, add_info_field_unsigned_long_long, raw, utils,
17 Status,
18};
19use crate::{add_info_section, ValkeyResult};
20use crate::{ValkeyError, ValkeyString, ValkeyValue};
21use std::ops::Deref;
22
23use std::ffi::CStr;
24
25use self::call_reply::{create_promise_call_reply, CallResult, PromiseCallReply};
26use self::thread_safe::ValkeyLockIndicator;
27
28mod timer;
29
30pub mod blocked;
31pub mod call_reply;
32pub mod client;
33pub mod commands;
34pub mod filter;
35pub mod info;
36pub mod keys_cursor;
37pub mod server_events;
38pub mod thread_safe;
39
40pub struct CallOptionsBuilder {
41 options: String,
42}
43
44impl Default for CallOptionsBuilder {
45 fn default() -> Self {
46 CallOptionsBuilder {
47 options: "v".to_string(),
48 }
49 }
50}
51
52#[derive(Clone)]
53pub struct CallOptions {
54 options: CString,
55}
56
57#[derive(Clone)]
58#[cfg(all(any(
59 feature = "min-valkey-compatibility-version-8-0",
60 feature = "min-redis-compatibility-version-7-2"
61)))]
62pub struct BlockingCallOptions {
63 options: CString,
64}
65
66#[derive(Copy, Clone)]
67pub enum CallOptionResp {
68 Resp2,
69 Resp3,
70 Auto,
71}
72
73impl CallOptionsBuilder {
74 pub fn new() -> CallOptionsBuilder {
75 Self::default()
76 }
77
78 fn add_flag(&mut self, flag: &str) {
79 self.options.push_str(flag);
80 }
81
82 pub fn no_writes(mut self) -> CallOptionsBuilder {
84 self.add_flag("W");
85 self
86 }
87
88 pub fn script_mode(mut self) -> CallOptionsBuilder {
93 self.add_flag("S");
94 self
95 }
96
97 pub fn verify_acl(mut self) -> CallOptionsBuilder {
100 self.add_flag("C");
101 self
102 }
103
104 pub fn verify_oom(mut self) -> CallOptionsBuilder {
106 self.add_flag("M");
107 self
108 }
109
110 pub fn errors_as_replies(mut self) -> CallOptionsBuilder {
113 self.add_flag("E");
114 self
115 }
116
117 pub fn replicate(mut self) -> CallOptionsBuilder {
119 self.add_flag("!");
120 self
121 }
122
123 pub fn resp(mut self, resp: CallOptionResp) -> CallOptionsBuilder {
125 match resp {
126 CallOptionResp::Auto => self.add_flag("0"),
127 CallOptionResp::Resp2 => (),
128 CallOptionResp::Resp3 => self.add_flag("3"),
129 }
130 self
131 }
132
133 pub fn build(self) -> CallOptions {
135 CallOptions {
136 options: CString::new(self.options).unwrap(), }
138 }
139
140 #[cfg(all(any(
144 feature = "min-valkey-compatibility-version-8-0",
145 feature = "min-redis-compatibility-version-7-2"
146 )))]
147 pub fn build_blocking(mut self) -> BlockingCallOptions {
148 self.add_flag("K");
149 BlockingCallOptions {
150 options: CString::new(self.options).unwrap(), }
152 }
153}
154
155pub struct DetachedContext {
159 pub(crate) ctx: AtomicPtr<raw::RedisModuleCtx>,
160}
161
162impl DetachedContext {
163 pub const fn new() -> Self {
164 DetachedContext {
165 ctx: AtomicPtr::new(ptr::null_mut()),
166 }
167 }
168}
169
170impl Default for DetachedContext {
171 fn default() -> Self {
172 Self::new()
173 }
174}
175
176pub struct DetachedContextGuard {
183 pub(crate) ctx: Context,
184}
185
186unsafe impl ValkeyLockIndicator for DetachedContextGuard {}
187
188impl Drop for DetachedContextGuard {
189 fn drop(&mut self) {
190 unsafe {
191 raw::RedisModule_ThreadSafeContextUnlock.unwrap()(self.ctx.ctx);
192 };
193 }
194}
195
196impl Deref for DetachedContextGuard {
197 type Target = Context;
198
199 fn deref(&self) -> &Self::Target {
200 &self.ctx
201 }
202}
203
204impl DetachedContext {
205 pub fn log(&self, level: ValkeyLogLevel, message: &str) {
206 let c = self.ctx.load(Ordering::Relaxed);
207 crate::logging::log_internal(c, level, message);
208 }
209
210 pub fn log_debug(&self, message: &str) {
211 self.log(ValkeyLogLevel::Debug, message);
212 }
213
214 pub fn log_notice(&self, message: &str) {
215 self.log(ValkeyLogLevel::Notice, message);
216 }
217
218 pub fn log_verbose(&self, message: &str) {
219 self.log(ValkeyLogLevel::Verbose, message);
220 }
221
222 pub fn log_warning(&self, message: &str) {
223 self.log(ValkeyLogLevel::Warning, message);
224 }
225
226 pub fn set_context(&self, ctx: &Context) -> Result<(), ValkeyError> {
227 let c = self.ctx.load(Ordering::Relaxed);
228 if !c.is_null() {
229 return Err(ValkeyError::Str("Detached context is already set"));
230 }
231 let ctx = unsafe { raw::RedisModule_GetDetachedThreadSafeContext.unwrap()(ctx.ctx) };
232 self.ctx.store(ctx, Ordering::Relaxed);
233 Ok(())
234 }
235
236 pub fn lock(&self) -> DetachedContextGuard {
241 let c = self.ctx.load(Ordering::Relaxed);
242 unsafe { raw::RedisModule_ThreadSafeContextLock.unwrap()(c) };
243 let ctx = Context::new(c);
244 DetachedContextGuard { ctx }
245 }
246}
247
248unsafe impl Send for DetachedContext {}
249unsafe impl Sync for DetachedContext {}
250
251#[derive(Debug)]
254pub struct Context {
255 pub ctx: *mut raw::RedisModuleCtx,
256}
257
258#[derive(Debug)]
264pub struct ContextUserScope<'ctx> {
265 ctx: &'ctx Context,
266 user: *mut raw::RedisModuleUser,
267}
268
269impl<'ctx> Drop for ContextUserScope<'ctx> {
270 fn drop(&mut self) {
271 self.ctx.deautenticate_user();
272 unsafe { raw::RedisModule_FreeModuleUser.unwrap()(self.user) };
273 }
274}
275
276impl<'ctx> ContextUserScope<'ctx> {
277 fn new(ctx: &'ctx Context, user: *mut raw::RedisModuleUser) -> ContextUserScope<'ctx> {
278 ContextUserScope { ctx, user }
279 }
280}
281
282pub struct StrCallArgs<'a> {
283 is_owner: bool,
284 args: Vec<*mut raw::RedisModuleString>,
285 phantom: std::marker::PhantomData<&'a raw::RedisModuleString>,
287}
288
289impl<'a> Drop for StrCallArgs<'a> {
290 fn drop(&mut self) {
291 if self.is_owner {
292 self.args.iter_mut().for_each(|v| unsafe {
293 raw::RedisModule_FreeString.unwrap()(std::ptr::null_mut(), *v)
294 });
295 }
296 }
297}
298
299impl<'a, T: AsRef<[u8]> + ?Sized> From<&'a [&T]> for StrCallArgs<'a> {
300 fn from(vals: &'a [&T]) -> Self {
301 StrCallArgs {
302 is_owner: true,
303 args: vals
304 .iter()
305 .map(|v| ValkeyString::create_from_slice(std::ptr::null_mut(), v.as_ref()).take())
306 .collect(),
307 phantom: std::marker::PhantomData,
308 }
309 }
310}
311
312impl<'a> From<&'a [&ValkeyString]> for StrCallArgs<'a> {
313 fn from(vals: &'a [&ValkeyString]) -> Self {
314 StrCallArgs {
315 is_owner: false,
316 args: vals.iter().map(|v| v.inner).collect(),
317 phantom: std::marker::PhantomData,
318 }
319 }
320}
321
322impl<'a, const SIZE: usize, T: ?Sized> From<&'a [&T; SIZE]> for StrCallArgs<'a>
323where
324 for<'b> &'a [&'b T]: Into<StrCallArgs<'a>>,
325{
326 fn from(vals: &'a [&T; SIZE]) -> Self {
327 vals.as_ref().into()
328 }
329}
330
331impl<'a> StrCallArgs<'a> {
332 pub(crate) fn args_mut(&mut self) -> &mut [*mut raw::RedisModuleString] {
333 &mut self.args
334 }
335}
336
337impl Context {
338 pub const fn new(ctx: *mut raw::RedisModuleCtx) -> Self {
339 Self { ctx }
340 }
341
342 #[must_use]
343 pub const fn dummy() -> Self {
344 Self {
345 ctx: ptr::null_mut(),
346 }
347 }
348
349 pub fn log(&self, level: ValkeyLogLevel, message: &str) {
350 crate::logging::log_internal(self.ctx, level, message);
351 }
352
353 pub fn log_debug(&self, message: &str) {
354 self.log(ValkeyLogLevel::Debug, message);
355 }
356
357 pub fn log_notice(&self, message: &str) {
358 self.log(ValkeyLogLevel::Notice, message);
359 }
360
361 pub fn log_verbose(&self, message: &str) {
362 self.log(ValkeyLogLevel::Verbose, message);
363 }
364
365 pub fn log_warning(&self, message: &str) {
366 self.log(ValkeyLogLevel::Warning, message);
367 }
368
369 pub fn auto_memory(&self) {
373 unsafe {
374 raw::RedisModule_AutoMemory.unwrap()(self.ctx);
375 }
376 }
377
378 #[must_use]
382 pub fn is_keys_position_request(&self) -> bool {
383 if cfg!(test) {
385 return false;
386 }
387
388 (unsafe { raw::RedisModule_IsKeysPositionRequest.unwrap()(self.ctx) }) != 0
389 }
390
391 pub fn key_at_pos(&self, pos: i32) {
395 unsafe {
398 raw::RedisModule_KeyAtPos.unwrap()(self.ctx, pos as c_int);
399 }
400 }
401
402 fn call_internal<
403 'ctx,
404 'a,
405 T: Into<StrCallArgs<'a>>,
406 R: From<PromiseCallReply<'static, 'ctx>>,
407 >(
408 &'ctx self,
409 command: &str,
410 fmt: *const c_char,
411 args: T,
412 ) -> R {
413 let mut call_args: StrCallArgs = args.into();
414 let final_args = call_args.args_mut();
415
416 let cmd = CString::new(command).unwrap();
417 let reply: *mut raw::RedisModuleCallReply = unsafe {
418 let p_call = raw::RedisModule_Call.unwrap();
419 p_call(
420 self.ctx,
421 cmd.as_ptr(),
422 fmt,
423 final_args.as_mut_ptr(),
424 final_args.len(),
425 )
426 };
427 let promise = create_promise_call_reply(self, NonNull::new(reply));
428 R::from(promise)
429 }
430
431 pub fn call<'a, T: Into<StrCallArgs<'a>>>(&self, command: &str, args: T) -> ValkeyResult {
432 self.call_internal::<_, CallResult>(command, raw::FMT, args)
433 .map_or_else(|e| Err(e.into()), |v| Ok((&v).into()))
434 }
435
436 pub fn call_ext<'a, T: Into<StrCallArgs<'a>>, R: From<CallResult<'static>>>(
440 &self,
441 command: &str,
442 options: &CallOptions,
443 args: T,
444 ) -> R {
445 let res: CallResult<'static> =
446 self.call_internal(command, options.options.as_ptr() as *const c_char, args);
447 R::from(res)
448 }
449
450 #[cfg(all(any(
452 feature = "min-valkey-compatibility-version-8-0",
453 feature = "min-redis-compatibility-version-7-2"
454 )))]
455 pub fn call_blocking<
456 'ctx,
457 'a,
458 T: Into<StrCallArgs<'a>>,
459 R: From<PromiseCallReply<'static, 'ctx>>,
460 >(
461 &'ctx self,
462 command: &str,
463 options: &BlockingCallOptions,
464 args: T,
465 ) -> R {
466 self.call_internal(command, options.options.as_ptr() as *const c_char, args)
467 }
468
469 #[must_use]
470 pub fn str_as_legal_resp_string(s: &str) -> CString {
471 CString::new(
472 s.chars()
473 .map(|c| match c {
474 '\r' | '\n' | '\0' => b' ',
475 _ => c as u8,
476 })
477 .collect::<Vec<_>>(),
478 )
479 .unwrap()
480 }
481
482 #[allow(clippy::must_use_candidate)]
483 pub fn reply_simple_string(&self, s: &str) -> raw::Status {
484 let msg = Self::str_as_legal_resp_string(s);
485 raw::reply_with_simple_string(self.ctx, msg.as_ptr())
486 }
487
488 #[allow(clippy::must_use_candidate)]
489 pub fn reply_error_string(&self, s: &str) -> raw::Status {
490 let msg = Self::str_as_legal_resp_string(s);
491 unsafe { raw::RedisModule_ReplyWithError.unwrap()(self.ctx, msg.as_ptr()).into() }
492 }
493
494 #[cfg(feature = "min-valkey-compatibility-version-8-0")]
495 pub fn add_acl_category(&self, s: &str) -> raw::Status {
496 let acl_flags = Self::str_as_legal_resp_string(s);
497 unsafe { raw::RedisModule_AddACLCategory.unwrap()(self.ctx, acl_flags.as_ptr()).into() }
498 }
499
500 #[cfg(all(any(
501 feature = "min-redis-compatibility-version-7-2",
502 feature = "min-valkey-compatibility-version-8-0"
503 ),))]
504 pub fn set_acl_category(
505 &self,
506 command_name: *const c_char,
507 acl_flags: *const c_char,
508 ) -> raw::Status {
509 unsafe {
510 let command = raw::RedisModule_GetCommand.unwrap()(self.ctx, command_name);
511 raw::RedisModule_SetCommandACLCategories.unwrap()(command, acl_flags).into()
512 }
513 }
514
515 pub fn reply_with_key(&self, result: ValkeyValueKey) -> raw::Status {
516 match result {
517 ValkeyValueKey::Integer(i) => raw::reply_with_long_long(self.ctx, i),
518 ValkeyValueKey::String(s) => {
519 raw::reply_with_string_buffer(self.ctx, s.as_ptr().cast::<c_char>(), s.len())
520 }
521 ValkeyValueKey::BulkString(b) => {
522 raw::reply_with_string_buffer(self.ctx, b.as_ptr().cast::<c_char>(), b.len())
523 }
524 ValkeyValueKey::BulkValkeyString(s) => raw::reply_with_string(self.ctx, s.inner),
525 ValkeyValueKey::Bool(b) => raw::reply_with_bool(self.ctx, b.into()),
526 }
527 }
528
529 #[allow(clippy::must_use_candidate)]
533 pub fn reply(&self, result: ValkeyResult) -> raw::Status {
534 match result {
535 Ok(ValkeyValue::Bool(v)) => raw::reply_with_bool(self.ctx, v.into()),
536 Ok(ValkeyValue::Integer(v)) => raw::reply_with_long_long(self.ctx, v),
537 Ok(ValkeyValue::Float(v)) => raw::reply_with_double(self.ctx, v),
538 Ok(ValkeyValue::SimpleStringStatic(s)) => {
539 let msg = CString::new(s).unwrap();
540 raw::reply_with_simple_string(self.ctx, msg.as_ptr())
541 }
542
543 Ok(ValkeyValue::SimpleString(s)) => {
544 let msg = CString::new(s).unwrap();
545 raw::reply_with_simple_string(self.ctx, msg.as_ptr())
546 }
547
548 Ok(ValkeyValue::BulkString(s)) => {
549 raw::reply_with_string_buffer(self.ctx, s.as_ptr().cast::<c_char>(), s.len())
550 }
551
552 Ok(ValkeyValue::BigNumber(s)) => {
553 raw::reply_with_big_number(self.ctx, s.as_ptr().cast::<c_char>(), s.len())
554 }
555
556 Ok(ValkeyValue::VerbatimString((format, data))) => raw::reply_with_verbatim_string(
557 self.ctx,
558 data.as_ptr().cast(),
559 data.len(),
560 format.0.as_ptr().cast(),
561 ),
562
563 Ok(ValkeyValue::BulkValkeyString(s)) => raw::reply_with_string(self.ctx, s.inner),
564
565 Ok(ValkeyValue::StringBuffer(s)) => {
566 raw::reply_with_string_buffer(self.ctx, s.as_ptr().cast::<c_char>(), s.len())
567 }
568
569 Ok(ValkeyValue::Array(array)) => {
570 raw::reply_with_array(self.ctx, array.len() as c_long);
571
572 for elem in array {
573 self.reply(Ok(elem));
574 }
575
576 raw::Status::Ok
577 }
578
579 Ok(ValkeyValue::Map(map)) => {
580 raw::reply_with_map(self.ctx, map.len() as c_long);
581
582 for (key, value) in map {
583 self.reply_with_key(key);
584 self.reply(Ok(value));
585 }
586
587 raw::Status::Ok
588 }
589
590 Ok(ValkeyValue::OrderedMap(map)) => {
591 raw::reply_with_map(self.ctx, map.len() as c_long);
592
593 for (key, value) in map {
594 self.reply_with_key(key);
595 self.reply(Ok(value));
596 }
597
598 raw::Status::Ok
599 }
600
601 Ok(ValkeyValue::Set(set)) => {
602 raw::reply_with_set(self.ctx, set.len() as c_long);
603 set.into_iter().for_each(|e| {
604 self.reply_with_key(e);
605 });
606
607 raw::Status::Ok
608 }
609
610 Ok(ValkeyValue::OrderedSet(set)) => {
611 raw::reply_with_set(self.ctx, set.len() as c_long);
612 set.into_iter().for_each(|e| {
613 self.reply_with_key(e);
614 });
615
616 raw::Status::Ok
617 }
618
619 Ok(ValkeyValue::Null) => raw::reply_with_null(self.ctx),
620
621 Ok(ValkeyValue::NoReply) => raw::Status::Ok,
622
623 Ok(ValkeyValue::StaticError(s)) => self.reply_error_string(s),
624
625 Err(ValkeyError::WrongArity) => unsafe {
626 if self.is_keys_position_request() {
627 raw::Status::Err
629 } else {
630 raw::RedisModule_WrongArity.unwrap()(self.ctx).into()
631 }
632 },
633
634 Err(ValkeyError::WrongType) => {
635 self.reply_error_string(ValkeyError::WrongType.to_string().as_str())
636 }
637
638 Err(ValkeyError::String(s)) => self.reply_error_string(s.as_str()),
639
640 Err(ValkeyError::Str(s)) => self.reply_error_string(s),
641 }
642 }
643
644 #[must_use]
645 pub fn open_key(&self, key: &ValkeyString) -> ValkeyKey {
646 ValkeyKey::open(self.ctx, key)
647 }
648
649 #[must_use]
650 pub fn open_key_with_flags(&self, key: &ValkeyString, flags: KeyFlags) -> ValkeyKey {
651 ValkeyKey::open_with_flags(self.ctx, key, flags)
652 }
653
654 #[must_use]
655 pub fn open_key_writable(&self, key: &ValkeyString) -> ValkeyKeyWritable {
656 ValkeyKeyWritable::open(self.ctx, key)
657 }
658
659 #[must_use]
660 pub fn open_key_writable_with_flags(
661 &self,
662 key: &ValkeyString,
663 flags: KeyFlags,
664 ) -> ValkeyKeyWritable {
665 ValkeyKeyWritable::open_with_flags(self.ctx, key, flags)
666 }
667
668 pub fn replicate_verbatim(&self) {
669 raw::replicate_verbatim(self.ctx);
670 }
671
672 pub fn replicate<'a, T: Into<StrCallArgs<'a>>>(&self, command: &str, args: T) {
674 raw::replicate(self.ctx, command, args);
675 }
676
677 #[must_use]
678 pub fn create_string<T: Into<Vec<u8>>>(&self, s: T) -> ValkeyString {
679 ValkeyString::create(NonNull::new(self.ctx), s)
680 }
681
682 #[must_use]
683 pub const fn get_raw(&self) -> *mut raw::RedisModuleCtx {
684 self.ctx
685 }
686
687 pub unsafe fn export_shared_api(
691 &self,
692 func: *const ::std::os::raw::c_void,
693 name: *const ::std::os::raw::c_char,
694 ) {
695 raw::export_shared_api(self.ctx, func, name);
696 }
697
698 #[allow(clippy::must_use_candidate)]
702 pub fn notify_keyspace_event(
703 &self,
704 event_type: raw::NotifyEvent,
705 event: &str,
706 keyname: &ValkeyString,
707 ) -> raw::Status {
708 unsafe { raw::notify_keyspace_event(self.ctx, event_type, event, keyname) }
709 }
710
711 pub fn current_command_name(&self) -> Result<String, ValkeyError> {
712 unsafe {
713 match raw::RedisModule_GetCurrentCommandName {
714 Some(cmd) => Ok(CStr::from_ptr(cmd(self.ctx)).to_str().unwrap().to_string()),
715 None => Err(ValkeyError::Str(
716 "API RedisModule_GetCurrentCommandName is not available",
717 )),
718 }
719 }
720 }
721
722 pub fn get_server_version(&self) -> Result<Version, ValkeyError> {
725 self.get_server_version_internal(false)
726 }
727
728 pub fn get_server_version_rm_call(&self) -> Result<Version, ValkeyError> {
730 self.get_server_version_internal(true)
731 }
732
733 pub fn version_from_info(info: ValkeyValue) -> Result<Version, ValkeyError> {
734 if let ValkeyValue::SimpleString(info_str) = info {
735 if let Some(ver) = utils::get_regexp_captures(
736 info_str.as_str(),
737 r"(?m)\bredis_version:([0-9]+)\.([0-9]+)\.([0-9]+)\b",
738 ) {
739 return Ok(Version {
740 major: ver[0].parse::<c_int>().unwrap(),
741 minor: ver[1].parse::<c_int>().unwrap(),
742 patch: ver[2].parse::<c_int>().unwrap(),
743 });
744 }
745 }
746 Err(ValkeyError::Str("Error getting redis_version"))
747 }
748
749 #[allow(clippy::not_unsafe_ptr_arg_deref)]
750 fn get_server_version_internal(&self, force_use_rm_call: bool) -> Result<Version, ValkeyError> {
751 match unsafe { raw::RedisModule_GetServerVersion } {
752 Some(api) if !force_use_rm_call => {
753 Ok(Version::from(unsafe { api() }))
755 }
756 _ => {
757 if let Ok(info) = self.call("info", &["server"]) {
759 Self::version_from_info(info)
760 } else {
761 Err(ValkeyError::Str("Error calling \"info server\""))
762 }
763 }
764 }
765 }
766 pub fn set_module_options(&self, options: ModuleOptions) {
767 unsafe { raw::RedisModule_SetModuleOptions.unwrap()(self.ctx, options.bits()) };
768 }
769
770 pub fn get_flags(&self) -> ContextFlags {
776 ContextFlags::from_bits_truncate(unsafe {
777 raw::RedisModule_GetContextFlags.unwrap()(self.ctx)
778 })
779 }
780
781 pub fn get_current_user(&self) -> ValkeyString {
783 let user = unsafe { raw::RedisModule_GetCurrentUserName.unwrap()(self.ctx) };
784 ValkeyString::from_redis_module_string(ptr::null_mut(), user)
785 }
786
787 pub fn authenticate_user(
792 &self,
793 user_name: &ValkeyString,
794 ) -> Result<ContextUserScope<'_>, ValkeyError> {
795 let user = unsafe { raw::RedisModule_GetModuleUserFromUserName.unwrap()(user_name.inner) };
796 if user.is_null() {
797 return Err(ValkeyError::Str("User does not exists or disabled"));
798 }
799 unsafe { raw::RedisModule_SetContextUser.unwrap()(self.ctx, user) };
800 Ok(ContextUserScope::new(self, user))
801 }
802
803 fn deautenticate_user(&self) {
804 unsafe { raw::RedisModule_SetContextUser.unwrap()(self.ctx, ptr::null_mut()) };
805 }
806
807 pub fn acl_check_key_permission(
811 &self,
812 user_name: &ValkeyString,
813 key_name: &ValkeyString,
814 permissions: &AclPermissions,
815 ) -> Result<(), ValkeyError> {
816 let user = unsafe { raw::RedisModule_GetModuleUserFromUserName.unwrap()(user_name.inner) };
817 if user.is_null() {
818 return Err(ValkeyError::Str("User does not exists or disabled"));
819 }
820 let acl_permission_result: raw::Status = unsafe {
821 raw::RedisModule_ACLCheckKeyPermissions.unwrap()(
822 user,
823 key_name.inner,
824 permissions.bits(),
825 )
826 }
827 .into();
828 unsafe { raw::RedisModule_FreeModuleUser.unwrap()(user) };
829 let acl_permission_result: Result<(), &str> = acl_permission_result.into();
830 acl_permission_result
831 .map_err(|_e| ValkeyError::Str("User does not have permissions on key"))
832 }
833
834 api!(
835 [RedisModule_AddPostNotificationJob],
836 pub fn add_post_notification_job<F: FnOnce(&Context) + 'static>(
849 &self,
850 callback: F,
851 ) -> Status {
852 let callback = Box::into_raw(Box::new(Some(callback)));
853 unsafe {
854 RedisModule_AddPostNotificationJob(
855 self.ctx,
856 Some(post_notification_job::<F>),
857 callback as *mut c_void,
858 Some(post_notification_job_free_callback::<F>),
859 )
860 }
861 .into()
862 }
863 );
864
865 api!(
866 [RedisModule_AvoidReplicaTraffic],
867 pub fn avoid_replication_traffic(&self) -> bool {
883 unsafe { RedisModule_AvoidReplicaTraffic() == 1 }
884 }
885 );
886}
887
888extern "C" fn post_notification_job_free_callback<F: FnOnce(&Context)>(pd: *mut c_void) {
889 unsafe {
890 drop(Box::from_raw(pd as *mut Option<F>));
891 };
892}
893
894extern "C" fn post_notification_job<F: FnOnce(&Context)>(
895 ctx: *mut raw::RedisModuleCtx,
896 pd: *mut c_void,
897) {
898 let callback = unsafe { &mut *(pd as *mut Option<F>) };
899 let ctx = Context::new(ctx);
900 callback.take().map_or_else(
901 || {
902 ctx.log(
903 ValkeyLogLevel::Warning,
904 "Got a None callback on post notification job.",
905 )
906 },
907 |callback| {
908 callback(&ctx);
909 },
910 );
911}
912
913unsafe impl ValkeyLockIndicator for Context {}
914
915bitflags! {
916 #[derive(Debug)]
919 pub struct AclPermissions : c_int {
920 const ACCESS = raw::REDISMODULE_CMD_KEY_ACCESS as c_int;
922
923 const INSERT = raw::REDISMODULE_CMD_KEY_INSERT as c_int;
925
926 const DELETE = raw::REDISMODULE_CMD_KEY_DELETE as c_int;
928
929 const UPDATE = raw::REDISMODULE_CMD_KEY_UPDATE as c_int;
931 }
932}
933
934#[derive(Debug, Clone)]
936pub enum InfoContextBuilderFieldBottomLevelValue {
937 String(String),
939 I64(i64),
941 U64(u64),
943 F64(f64),
945}
946
947impl From<String> for InfoContextBuilderFieldBottomLevelValue {
948 fn from(value: String) -> Self {
949 Self::String(value)
950 }
951}
952
953impl From<&str> for InfoContextBuilderFieldBottomLevelValue {
954 fn from(value: &str) -> Self {
955 Self::String(value.to_owned())
956 }
957}
958
959impl From<i64> for InfoContextBuilderFieldBottomLevelValue {
960 fn from(value: i64) -> Self {
961 Self::I64(value)
962 }
963}
964
965impl From<u64> for InfoContextBuilderFieldBottomLevelValue {
966 fn from(value: u64) -> Self {
967 Self::U64(value)
968 }
969}
970
971#[derive(Debug, Clone)]
972pub enum InfoContextBuilderFieldTopLevelValue {
973 Value(InfoContextBuilderFieldBottomLevelValue),
975 Dictionary {
999 name: String,
1000 fields: InfoContextFieldBottomLevelData,
1001 },
1002}
1003
1004impl<T: Into<InfoContextBuilderFieldBottomLevelValue>> From<T>
1005 for InfoContextBuilderFieldTopLevelValue
1006{
1007 fn from(value: T) -> Self {
1008 Self::Value(value.into())
1009 }
1010}
1011
1012#[derive(Debug)]
1015pub struct InfoContextBuilderDictionaryBuilder<'a> {
1016 info_section_builder: InfoContextBuilderSectionBuilder<'a>,
1018 name: String,
1020 fields: InfoContextFieldBottomLevelData,
1022}
1023
1024impl<'a> InfoContextBuilderDictionaryBuilder<'a> {
1025 pub fn field<F: Into<InfoContextBuilderFieldBottomLevelValue>>(
1027 mut self,
1028 name: &str,
1029 value: F,
1030 ) -> ValkeyResult<Self> {
1031 if self.fields.iter().any(|k| k.0 .0 == name) {
1032 return Err(ValkeyError::String(format!(
1033 "Found duplicate key '{name}' in the info dictionary '{}'",
1034 self.name
1035 )));
1036 }
1037
1038 self.fields.push((name.to_owned(), value.into()).into());
1039 Ok(self)
1040 }
1041
1042 pub fn build_dictionary(self) -> ValkeyResult<InfoContextBuilderSectionBuilder<'a>> {
1044 let name = self.name;
1045 let name_ref = name.clone();
1046 self.info_section_builder.field(
1047 &name_ref,
1048 InfoContextBuilderFieldTopLevelValue::Dictionary {
1049 name,
1050 fields: self.fields.to_owned(),
1051 },
1052 )
1053 }
1054}
1055
1056#[derive(Debug)]
1058pub struct InfoContextBuilderSectionBuilder<'a> {
1059 info_builder: InfoContextBuilder<'a>,
1061 name: String,
1063 fields: InfoContextFieldTopLevelData,
1065}
1066
1067impl<'a> InfoContextBuilderSectionBuilder<'a> {
1068 pub fn field<F: Into<InfoContextBuilderFieldTopLevelValue>>(
1070 mut self,
1071 name: &str,
1072 value: F,
1073 ) -> ValkeyResult<Self> {
1074 if self.fields.iter().any(|(k, _)| k == name) {
1075 return Err(ValkeyError::String(format!(
1076 "Found duplicate key '{name}' in the info section '{}'",
1077 self.name
1078 )));
1079 }
1080 self.fields.push((name.to_owned(), value.into()));
1081 Ok(self)
1082 }
1083
1084 pub fn add_dictionary(self, dictionary_name: &str) -> InfoContextBuilderDictionaryBuilder<'a> {
1086 InfoContextBuilderDictionaryBuilder {
1087 info_section_builder: self,
1088 name: dictionary_name.to_owned(),
1089 fields: InfoContextFieldBottomLevelData::default(),
1090 }
1091 }
1092
1093 pub fn build_section(mut self) -> ValkeyResult<InfoContextBuilder<'a>> {
1095 if self
1096 .info_builder
1097 .sections
1098 .iter()
1099 .any(|(k, _)| k == &self.name)
1100 {
1101 return Err(ValkeyError::String(format!(
1102 "Found duplicate section in the Info reply: {}",
1103 self.name
1104 )));
1105 }
1106
1107 self.info_builder
1108 .sections
1109 .push((self.name.clone(), self.fields));
1110
1111 Ok(self.info_builder)
1112 }
1113}
1114
1115#[derive(Debug, Clone)]
1117#[repr(transparent)]
1118pub struct InfoContextBottomLevelFieldData(pub (String, InfoContextBuilderFieldBottomLevelValue));
1119impl Deref for InfoContextBottomLevelFieldData {
1120 type Target = (String, InfoContextBuilderFieldBottomLevelValue);
1121
1122 fn deref(&self) -> &Self::Target {
1123 &self.0
1124 }
1125}
1126impl std::ops::DerefMut for InfoContextBottomLevelFieldData {
1127 fn deref_mut(&mut self) -> &mut Self::Target {
1128 &mut self.0
1129 }
1130}
1131
1132impl<T: Into<InfoContextBuilderFieldBottomLevelValue>> From<(String, T)>
1133 for InfoContextBottomLevelFieldData
1134{
1135 fn from(value: (String, T)) -> Self {
1136 Self((value.0, value.1.into()))
1137 }
1138}
1139#[derive(Debug, Default, Clone)]
1142#[repr(transparent)]
1143pub struct InfoContextFieldBottomLevelData(pub Vec<InfoContextBottomLevelFieldData>);
1144impl Deref for InfoContextFieldBottomLevelData {
1145 type Target = Vec<InfoContextBottomLevelFieldData>;
1146
1147 fn deref(&self) -> &Self::Target {
1148 &self.0
1149 }
1150}
1151impl std::ops::DerefMut for InfoContextFieldBottomLevelData {
1152 fn deref_mut(&mut self) -> &mut Self::Target {
1153 &mut self.0
1154 }
1155}
1156
1157pub type InfoContextFieldTopLevelData = Vec<(String, InfoContextBuilderFieldTopLevelValue)>;
1160pub type OneInfoSectionData = (String, InfoContextFieldTopLevelData);
1162pub type InfoContextTreeData = Vec<OneInfoSectionData>;
1164
1165impl<T: Into<InfoContextBuilderFieldBottomLevelValue>> From<BTreeMap<String, T>>
1166 for InfoContextFieldBottomLevelData
1167{
1168 fn from(value: BTreeMap<String, T>) -> Self {
1169 Self(
1170 value
1171 .into_iter()
1172 .map(|e| (e.0, e.1.into()).into())
1173 .collect(),
1174 )
1175 }
1176}
1177
1178impl<T: Into<InfoContextBuilderFieldBottomLevelValue>> From<HashMap<String, T>>
1179 for InfoContextFieldBottomLevelData
1180{
1181 fn from(value: HashMap<String, T>) -> Self {
1182 Self(
1183 value
1184 .into_iter()
1185 .map(|e| (e.0, e.1.into()).into())
1186 .collect(),
1187 )
1188 }
1189}
1190
1191#[derive(Debug)]
1192pub struct InfoContextBuilder<'a> {
1193 context: &'a InfoContext,
1194 sections: InfoContextTreeData,
1195}
1196impl<'a> InfoContextBuilder<'a> {
1197 fn add_bottom_level_field(
1198 &self,
1199 key: &str,
1200 value: &InfoContextBuilderFieldBottomLevelValue,
1201 ) -> ValkeyResult<()> {
1202 use InfoContextBuilderFieldBottomLevelValue as BottomLevel;
1203
1204 match value {
1205 BottomLevel::String(string) => add_info_field_str(self.context.ctx, key, string),
1206 BottomLevel::I64(number) => add_info_field_long_long(self.context.ctx, key, *number),
1207 BottomLevel::U64(number) => {
1208 add_info_field_unsigned_long_long(self.context.ctx, key, *number)
1209 }
1210 BottomLevel::F64(number) => add_info_field_double(self.context.ctx, key, *number),
1211 }
1212 .into()
1213 }
1214 fn add_top_level_fields(&self, fields: &InfoContextFieldTopLevelData) -> ValkeyResult<()> {
1217 use InfoContextBuilderFieldTopLevelValue as TopLevel;
1218
1219 fields.iter().try_for_each(|(key, value)| match value {
1220 TopLevel::Value(bottom_level) => self.add_bottom_level_field(key, bottom_level),
1221 TopLevel::Dictionary { name, fields } => {
1222 std::convert::Into::<ValkeyResult<()>>::into(add_info_begin_dict_field(
1223 self.context.ctx,
1224 name,
1225 ))?;
1226 fields
1227 .iter()
1228 .try_for_each(|f| self.add_bottom_level_field(&f.0 .0, &f.0 .1))?;
1229 add_info_end_dict_field(self.context.ctx).into()
1230 }
1231 })
1232 }
1233
1234 fn finalise_data(&self) -> ValkeyResult<()> {
1235 self.sections
1236 .iter()
1237 .try_for_each(|(section_name, section_fields)| -> ValkeyResult<()> {
1238 if add_info_section(self.context.ctx, Some(section_name)) == Status::Ok {
1239 self.add_top_level_fields(section_fields)
1240 } else {
1241 Ok(())
1243 }
1244 })
1245 }
1246
1247 pub fn build_info(self) -> ValkeyResult<&'a InfoContext> {
1249 self.finalise_data().map(|_| self.context)
1250 }
1251
1252 pub fn add_section(self, name: &'a str) -> InfoContextBuilderSectionBuilder {
1254 InfoContextBuilderSectionBuilder {
1255 info_builder: self,
1256 name: name.to_owned(),
1257 fields: InfoContextFieldTopLevelData::new(),
1258 }
1259 }
1260
1261 pub(crate) fn add_section_unchecked(mut self, section: OneInfoSectionData) -> Self {
1264 self.sections.push(section);
1265 self
1266 }
1267}
1268
1269impl<'a> From<&'a InfoContext> for InfoContextBuilder<'a> {
1270 fn from(context: &'a InfoContext) -> Self {
1271 Self {
1272 context,
1273 sections: InfoContextTreeData::new(),
1274 }
1275 }
1276}
1277
1278#[derive(Debug)]
1279pub struct InfoContext {
1280 pub ctx: *mut raw::RedisModuleInfoCtx,
1281}
1282
1283impl InfoContext {
1284 pub const fn new(ctx: *mut raw::RedisModuleInfoCtx) -> Self {
1285 Self { ctx }
1286 }
1287
1288 pub fn builder(&self) -> InfoContextBuilder<'_> {
1290 InfoContextBuilder::from(self)
1291 }
1292
1293 pub fn build_one_section<T: Into<OneInfoSectionData>>(&self, data: T) -> ValkeyResult<()> {
1295 self.builder()
1296 .add_section_unchecked(data.into())
1297 .build_info()?;
1298 Ok(())
1299 }
1300
1301 #[deprecated = "Please use [`InfoContext::builder`] instead."]
1302 pub fn add_info_section(&self, name: Option<&str>) -> Status {
1305 add_info_section(self.ctx, name)
1306 }
1307
1308 #[deprecated = "Please use [`InfoContext::builder`] instead."]
1309 pub fn add_info_field_str(&self, name: &str, content: &str) -> Status {
1313 add_info_field_str(self.ctx, name, content)
1314 }
1315
1316 #[deprecated = "Please use [`InfoContext::builder`] instead."]
1317 pub fn add_info_field_long_long(&self, name: &str, value: c_longlong) -> Status {
1321 add_info_field_long_long(self.ctx, name, value)
1322 }
1323}
1324
1325bitflags! {
1326 pub struct ContextFlags : c_int {
1327 const LUA = raw::REDISMODULE_CTX_FLAGS_LUA as c_int;
1329
1330 const MULTI = raw::REDISMODULE_CTX_FLAGS_MULTI as c_int;
1332
1333 const MASTER = raw::REDISMODULE_CTX_FLAGS_MASTER as c_int;
1335
1336 const SLAVE = raw::REDISMODULE_CTX_FLAGS_SLAVE as c_int;
1338
1339 const READONLY = raw::REDISMODULE_CTX_FLAGS_READONLY as c_int;
1341
1342 const CLUSTER = raw::REDISMODULE_CTX_FLAGS_CLUSTER as c_int;
1344
1345 const AOF = raw::REDISMODULE_CTX_FLAGS_AOF as c_int;
1347
1348 const RDB = raw::REDISMODULE_CTX_FLAGS_RDB as c_int;
1350
1351 const MAXMEMORY = raw::REDISMODULE_CTX_FLAGS_MAXMEMORY as c_int;
1353
1354 const EVICTED = raw::REDISMODULE_CTX_FLAGS_EVICT as c_int;
1356
1357 const OOM = raw::REDISMODULE_CTX_FLAGS_OOM as c_int;
1359
1360 const OOM_WARNING = raw::REDISMODULE_CTX_FLAGS_OOM_WARNING as c_int;
1362
1363 const REPLICATED = raw::REDISMODULE_CTX_FLAGS_REPLICATED as c_int;
1365
1366 const LOADING = raw::REDISMODULE_CTX_FLAGS_LOADING as c_int;
1368
1369 const REPLICA_IS_STALE = raw::REDISMODULE_CTX_FLAGS_REPLICA_IS_STALE as c_int;
1371
1372 const REPLICA_IS_CONNECTING = raw::REDISMODULE_CTX_FLAGS_REPLICA_IS_CONNECTING as c_int;
1374
1375 const REPLICA_IS_TRANSFERRING = raw::REDISMODULE_CTX_FLAGS_REPLICA_IS_TRANSFERRING as c_int;
1377
1378 const REPLICA_IS_ONLINE = raw::REDISMODULE_CTX_FLAGS_REPLICA_IS_ONLINE as c_int;
1380
1381 const ACTIVE_CHILD = raw::REDISMODULE_CTX_FLAGS_ACTIVE_CHILD as c_int;
1383
1384 const IS_CHILD = raw::REDISMODULE_CTX_FLAGS_IS_CHILD as c_int;
1386
1387 const MULTI_DIRTY = raw::REDISMODULE_CTX_FLAGS_MULTI_DIRTY as c_int;
1389
1390 const DENY_BLOCKING = raw::REDISMODULE_CTX_FLAGS_DENY_BLOCKING as c_int;
1393
1394 const FLAGS_RESP3 = raw::REDISMODULE_CTX_FLAGS_RESP3 as c_int;
1396
1397 const ASYNC_LOADING = raw::REDISMODULE_CTX_FLAGS_ASYNC_LOADING as c_int;
1399 }
1400}