1use bitflags::bitflags;
2use redis_module_macros_internals::api;
3use std::collections::{BTreeMap, HashMap};
4use std::ffi::CString;
5use std::os::raw::c_void;
6use std::os::raw::{c_char, c_int, c_long, c_longlong};
7use std::ptr::{self, NonNull};
8use std::sync::atomic::{AtomicPtr, Ordering};
9
10use crate::key::{KeyFlags, RedisKey, RedisKeyWritable};
11use crate::logging::RedisLogLevel;
12use crate::raw::{ModuleOptions, Version};
13use crate::redisvalue::RedisValueKey;
14use crate::{
15 add_info_begin_dict_field, add_info_end_dict_field, add_info_field_double,
16 add_info_field_long_long, add_info_field_str, add_info_field_unsigned_long_long, raw, utils,
17 Status,
18};
19use crate::{add_info_section, RedisResult};
20use crate::{RedisError, RedisString, RedisValue};
21use std::ops::Deref;
22
23use std::ffi::CStr;
24
25use self::call_reply::{create_promise_call_reply, CallResult, PromiseCallReply};
26use self::thread_safe::RedisLockIndicator;
27
28mod timer;
29
30pub mod blocked;
31pub mod call_reply;
32pub mod commands;
33pub mod info;
34pub mod keys_cursor;
35pub mod server_events;
36pub mod thread_safe;
37
38pub struct CallOptionsBuilder {
39 options: String,
40}
41
42impl Default for CallOptionsBuilder {
43 fn default() -> Self {
44 CallOptionsBuilder {
45 options: "v".to_string(),
46 }
47 }
48}
49
50#[derive(Clone)]
51pub struct CallOptions {
52 options: CString,
53}
54
55#[derive(Clone)]
56#[cfg(feature = "min-redis-compatibility-version-7-2")]
57pub struct BlockingCallOptions {
58 options: CString,
59}
60
61#[derive(Copy, Clone)]
62pub enum CallOptionResp {
63 Resp2,
64 Resp3,
65 Auto,
66}
67
68impl CallOptionsBuilder {
69 pub fn new() -> CallOptionsBuilder {
70 Self::default()
71 }
72
73 fn add_flag(&mut self, flag: &str) {
74 self.options.push_str(flag);
75 }
76
77 pub fn no_writes(mut self) -> CallOptionsBuilder {
79 self.add_flag("W");
80 self
81 }
82
83 pub fn script_mode(mut self) -> CallOptionsBuilder {
88 self.add_flag("S");
89 self
90 }
91
92 pub fn verify_acl(mut self) -> CallOptionsBuilder {
95 self.add_flag("C");
96 self
97 }
98
99 pub fn verify_oom(mut self) -> CallOptionsBuilder {
101 self.add_flag("M");
102 self
103 }
104
105 pub fn errors_as_replies(mut self) -> CallOptionsBuilder {
108 self.add_flag("E");
109 self
110 }
111
112 pub fn replicate(mut self) -> CallOptionsBuilder {
114 self.add_flag("!");
115 self
116 }
117
118 pub fn resp(mut self, resp: CallOptionResp) -> CallOptionsBuilder {
120 match resp {
121 CallOptionResp::Auto => self.add_flag("0"),
122 CallOptionResp::Resp2 => (),
123 CallOptionResp::Resp3 => self.add_flag("3"),
124 }
125 self
126 }
127
128 pub fn build(self) -> CallOptions {
130 CallOptions {
131 options: CString::new(self.options).unwrap(), }
133 }
134
135 #[cfg(feature = "min-redis-compatibility-version-7-2")]
139 pub fn build_blocking(mut self) -> BlockingCallOptions {
140 self.add_flag("K");
141 BlockingCallOptions {
142 options: CString::new(self.options).unwrap(), }
144 }
145}
146
147pub struct DetachedContext {
151 pub(crate) ctx: AtomicPtr<raw::RedisModuleCtx>,
152}
153
154impl DetachedContext {
155 pub const fn new() -> Self {
156 DetachedContext {
157 ctx: AtomicPtr::new(ptr::null_mut()),
158 }
159 }
160}
161
162impl Default for DetachedContext {
163 fn default() -> Self {
164 Self::new()
165 }
166}
167
168pub struct DetachedContextGuard {
175 pub(crate) ctx: Context,
176}
177
178unsafe impl RedisLockIndicator for DetachedContextGuard {}
179
180impl Drop for DetachedContextGuard {
181 fn drop(&mut self) {
182 unsafe {
183 raw::RedisModule_ThreadSafeContextUnlock.unwrap()(self.ctx.ctx);
184 };
185 }
186}
187
188impl Deref for DetachedContextGuard {
189 type Target = Context;
190
191 fn deref(&self) -> &Self::Target {
192 &self.ctx
193 }
194}
195
196impl DetachedContext {
197 pub fn log(&self, level: RedisLogLevel, message: &str) {
198 let c = self.ctx.load(Ordering::Relaxed);
199 crate::logging::log_internal(c, level, message);
200 }
201
202 pub fn log_debug(&self, message: &str) {
203 self.log(RedisLogLevel::Debug, message);
204 }
205
206 pub fn log_notice(&self, message: &str) {
207 self.log(RedisLogLevel::Notice, message);
208 }
209
210 pub fn log_verbose(&self, message: &str) {
211 self.log(RedisLogLevel::Verbose, message);
212 }
213
214 pub fn log_warning(&self, message: &str) {
215 self.log(RedisLogLevel::Warning, message);
216 }
217
218 pub fn set_context(&self, ctx: &Context) -> Result<(), RedisError> {
219 let c = self.ctx.load(Ordering::Relaxed);
220 if !c.is_null() {
221 return Err(RedisError::Str("Detached context is already set"));
222 }
223 let ctx = unsafe { raw::RedisModule_GetDetachedThreadSafeContext.unwrap()(ctx.ctx) };
224 self.ctx.store(ctx, Ordering::Relaxed);
225 Ok(())
226 }
227
228 pub fn lock(&self) -> DetachedContextGuard {
233 let c = self.ctx.load(Ordering::Relaxed);
234 unsafe { raw::RedisModule_ThreadSafeContextLock.unwrap()(c) };
235 let ctx = Context::new(c);
236 DetachedContextGuard { ctx }
237 }
238}
239
240unsafe impl Send for DetachedContext {}
241unsafe impl Sync for DetachedContext {}
242
243#[derive(Debug)]
246pub struct Context {
247 pub ctx: *mut raw::RedisModuleCtx,
248}
249
250#[derive(Debug)]
256pub struct ContextUserScope<'ctx> {
257 ctx: &'ctx Context,
258 user: *mut raw::RedisModuleUser,
259}
260
261impl<'ctx> Drop for ContextUserScope<'ctx> {
262 fn drop(&mut self) {
263 self.ctx.deautenticate_user();
264 unsafe { raw::RedisModule_FreeModuleUser.unwrap()(self.user) };
265 }
266}
267
268impl<'ctx> ContextUserScope<'ctx> {
269 fn new(ctx: &'ctx Context, user: *mut raw::RedisModuleUser) -> ContextUserScope<'ctx> {
270 ContextUserScope { ctx, user }
271 }
272}
273
274pub struct StrCallArgs<'a> {
275 is_owner: bool,
276 args: Vec<*mut raw::RedisModuleString>,
277 phantom: std::marker::PhantomData<&'a raw::RedisModuleString>,
279}
280
281impl<'a> Drop for StrCallArgs<'a> {
282 fn drop(&mut self) {
283 if self.is_owner {
284 self.args.iter_mut().for_each(|v| unsafe {
285 raw::RedisModule_FreeString.unwrap()(std::ptr::null_mut(), *v)
286 });
287 }
288 }
289}
290
291impl<'a, T: AsRef<[u8]> + ?Sized> From<&'a [&T]> for StrCallArgs<'a> {
292 fn from(vals: &'a [&T]) -> Self {
293 StrCallArgs {
294 is_owner: true,
295 args: vals
296 .iter()
297 .map(|v| RedisString::create_from_slice(std::ptr::null_mut(), v.as_ref()).take())
298 .collect(),
299 phantom: std::marker::PhantomData,
300 }
301 }
302}
303
304impl<'a> From<&'a [&RedisString]> for StrCallArgs<'a> {
305 fn from(vals: &'a [&RedisString]) -> Self {
306 StrCallArgs {
307 is_owner: false,
308 args: vals.iter().map(|v| v.inner).collect(),
309 phantom: std::marker::PhantomData,
310 }
311 }
312}
313
314impl<'a, const SIZE: usize, T: ?Sized> From<&'a [&T; SIZE]> for StrCallArgs<'a>
315where
316 for<'b> &'a [&'b T]: Into<StrCallArgs<'a>>,
317{
318 fn from(vals: &'a [&T; SIZE]) -> Self {
319 vals.as_ref().into()
320 }
321}
322
323impl<'a> StrCallArgs<'a> {
324 pub(crate) fn args_mut(&mut self) -> &mut [*mut raw::RedisModuleString] {
325 &mut self.args
326 }
327}
328
329impl Context {
330 pub const fn new(ctx: *mut raw::RedisModuleCtx) -> Self {
331 Self { ctx }
332 }
333
334 #[must_use]
335 pub const fn dummy() -> Self {
336 Self {
337 ctx: ptr::null_mut(),
338 }
339 }
340
341 pub fn log(&self, level: RedisLogLevel, message: &str) {
342 crate::logging::log_internal(self.ctx, level, message);
343 }
344
345 pub fn log_debug(&self, message: &str) {
346 self.log(RedisLogLevel::Debug, message);
347 }
348
349 pub fn log_notice(&self, message: &str) {
350 self.log(RedisLogLevel::Notice, message);
351 }
352
353 pub fn log_verbose(&self, message: &str) {
354 self.log(RedisLogLevel::Verbose, message);
355 }
356
357 pub fn log_warning(&self, message: &str) {
358 self.log(RedisLogLevel::Warning, message);
359 }
360
361 pub fn auto_memory(&self) {
365 unsafe {
366 raw::RedisModule_AutoMemory.unwrap()(self.ctx);
367 }
368 }
369
370 #[must_use]
374 pub fn is_keys_position_request(&self) -> bool {
375 if cfg!(test) {
377 return false;
378 }
379
380 (unsafe { raw::RedisModule_IsKeysPositionRequest.unwrap()(self.ctx) }) != 0
381 }
382
383 pub fn key_at_pos(&self, pos: i32) {
387 unsafe {
390 raw::RedisModule_KeyAtPos.unwrap()(self.ctx, pos as c_int);
391 }
392 }
393
394 fn call_internal<
395 'ctx,
396 'a,
397 T: Into<StrCallArgs<'a>>,
398 R: From<PromiseCallReply<'static, 'ctx>>,
399 >(
400 &'ctx self,
401 command: &str,
402 fmt: *const c_char,
403 args: T,
404 ) -> R {
405 let mut call_args: StrCallArgs = args.into();
406 let final_args = call_args.args_mut();
407
408 let cmd = CString::new(command).unwrap();
409 let reply: *mut raw::RedisModuleCallReply = unsafe {
410 let p_call = raw::RedisModule_Call.unwrap();
411 p_call(
412 self.ctx,
413 cmd.as_ptr(),
414 fmt,
415 final_args.as_mut_ptr(),
416 final_args.len(),
417 )
418 };
419 let promise = create_promise_call_reply(self, NonNull::new(reply));
420 R::from(promise)
421 }
422
423 pub fn call<'a, T: Into<StrCallArgs<'a>>>(&self, command: &str, args: T) -> RedisResult {
424 self.call_internal::<_, CallResult>(command, raw::FMT, args)
425 .map_or_else(|e| Err(e.into()), |v| Ok((&v).into()))
426 }
427
428 pub fn call_ext<'a, T: Into<StrCallArgs<'a>>, R: From<CallResult<'static>>>(
432 &self,
433 command: &str,
434 options: &CallOptions,
435 args: T,
436 ) -> R {
437 let res: CallResult<'static> =
438 self.call_internal(command, options.options.as_ptr() as *const c_char, args);
439 R::from(res)
440 }
441
442 #[cfg(feature = "min-redis-compatibility-version-7-2")]
444 pub fn call_blocking<
445 'ctx,
446 'a,
447 T: Into<StrCallArgs<'a>>,
448 R: From<PromiseCallReply<'static, 'ctx>>,
449 >(
450 &'ctx self,
451 command: &str,
452 options: &BlockingCallOptions,
453 args: T,
454 ) -> R {
455 self.call_internal(command, options.options.as_ptr() as *const c_char, args)
456 }
457
458 #[must_use]
459 pub fn str_as_legal_resp_string(s: &str) -> CString {
460 CString::new(
461 s.chars()
462 .map(|c| match c {
463 '\r' | '\n' | '\0' => b' ',
464 _ => c as u8,
465 })
466 .collect::<Vec<_>>(),
467 )
468 .unwrap()
469 }
470
471 #[allow(clippy::must_use_candidate)]
472 pub fn reply_simple_string(&self, s: &str) -> raw::Status {
473 let msg = Self::str_as_legal_resp_string(s);
474 raw::reply_with_simple_string(self.ctx, msg.as_ptr())
475 }
476
477 #[allow(clippy::must_use_candidate)]
478 pub fn reply_error_string(&self, s: &str) -> raw::Status {
479 let msg = Self::str_as_legal_resp_string(s);
480 unsafe { raw::RedisModule_ReplyWithError.unwrap()(self.ctx, msg.as_ptr()).into() }
481 }
482
483 pub fn reply_with_key(&self, result: RedisValueKey) -> raw::Status {
484 match result {
485 RedisValueKey::Integer(i) => raw::reply_with_long_long(self.ctx, i),
486 RedisValueKey::String(s) => {
487 raw::reply_with_string_buffer(self.ctx, s.as_ptr().cast::<c_char>(), s.len())
488 }
489 RedisValueKey::BulkString(b) => {
490 raw::reply_with_string_buffer(self.ctx, b.as_ptr().cast::<c_char>(), b.len())
491 }
492 RedisValueKey::BulkRedisString(s) => raw::reply_with_string(self.ctx, s.inner),
493 RedisValueKey::Bool(b) => raw::reply_with_bool(self.ctx, b.into()),
494 }
495 }
496
497 #[allow(clippy::must_use_candidate)]
501 pub fn reply(&self, result: RedisResult) -> raw::Status {
502 match result {
503 Ok(RedisValue::Bool(v)) => raw::reply_with_bool(self.ctx, v.into()),
504 Ok(RedisValue::Integer(v)) => raw::reply_with_long_long(self.ctx, v),
505 Ok(RedisValue::Float(v)) => raw::reply_with_double(self.ctx, v),
506 Ok(RedisValue::SimpleStringStatic(s)) => {
507 let msg = CString::new(s).unwrap();
508 raw::reply_with_simple_string(self.ctx, msg.as_ptr())
509 }
510
511 Ok(RedisValue::SimpleString(s)) => {
512 let msg = CString::new(s).unwrap();
513 raw::reply_with_simple_string(self.ctx, msg.as_ptr())
514 }
515
516 Ok(RedisValue::BulkString(s)) => {
517 raw::reply_with_string_buffer(self.ctx, s.as_ptr().cast::<c_char>(), s.len())
518 }
519
520 Ok(RedisValue::BigNumber(s)) => {
521 raw::reply_with_big_number(self.ctx, s.as_ptr().cast::<c_char>(), s.len())
522 }
523
524 Ok(RedisValue::VerbatimString((format, data))) => raw::reply_with_verbatim_string(
525 self.ctx,
526 data.as_ptr().cast(),
527 data.len(),
528 format.0.as_ptr().cast(),
529 ),
530
531 Ok(RedisValue::BulkRedisString(s)) => raw::reply_with_string(self.ctx, s.inner),
532
533 Ok(RedisValue::StringBuffer(s)) => {
534 raw::reply_with_string_buffer(self.ctx, s.as_ptr().cast::<c_char>(), s.len())
535 }
536
537 Ok(RedisValue::Array(array)) => {
538 raw::reply_with_array(self.ctx, array.len() as c_long);
539
540 for elem in array {
541 self.reply(Ok(elem));
542 }
543
544 raw::Status::Ok
545 }
546
547 Ok(RedisValue::Map(map)) => {
548 raw::reply_with_map(self.ctx, map.len() as c_long);
549
550 for (key, value) in map {
551 self.reply_with_key(key);
552 self.reply(Ok(value));
553 }
554
555 raw::Status::Ok
556 }
557
558 Ok(RedisValue::OrderedMap(map)) => {
559 raw::reply_with_map(self.ctx, map.len() as c_long);
560
561 for (key, value) in map {
562 self.reply_with_key(key);
563 self.reply(Ok(value));
564 }
565
566 raw::Status::Ok
567 }
568
569 Ok(RedisValue::Set(set)) => {
570 raw::reply_with_set(self.ctx, set.len() as c_long);
571 set.into_iter().for_each(|e| {
572 self.reply_with_key(e);
573 });
574
575 raw::Status::Ok
576 }
577
578 Ok(RedisValue::OrderedSet(set)) => {
579 raw::reply_with_set(self.ctx, set.len() as c_long);
580 set.into_iter().for_each(|e| {
581 self.reply_with_key(e);
582 });
583
584 raw::Status::Ok
585 }
586
587 Ok(RedisValue::Null) => raw::reply_with_null(self.ctx),
588
589 Ok(RedisValue::NoReply) => raw::Status::Ok,
590
591 Ok(RedisValue::StaticError(s)) => self.reply_error_string(s),
592
593 Err(RedisError::WrongArity) => unsafe {
594 if self.is_keys_position_request() {
595 raw::Status::Err
597 } else {
598 raw::RedisModule_WrongArity.unwrap()(self.ctx).into()
599 }
600 },
601
602 Err(RedisError::WrongType) => {
603 self.reply_error_string(RedisError::WrongType.to_string().as_str())
604 }
605
606 Err(RedisError::String(s)) => self.reply_error_string(s.as_str()),
607
608 Err(RedisError::Str(s)) => self.reply_error_string(s),
609 }
610 }
611
612 #[must_use]
613 pub fn open_key(&self, key: &RedisString) -> RedisKey {
614 RedisKey::open(self.ctx, key)
615 }
616
617 #[must_use]
618 pub fn open_key_with_flags(&self, key: &RedisString, flags: KeyFlags) -> RedisKey {
619 RedisKey::open_with_flags(self.ctx, key, flags)
620 }
621
622 #[must_use]
623 pub fn open_key_writable(&self, key: &RedisString) -> RedisKeyWritable {
624 RedisKeyWritable::open(self.ctx, key)
625 }
626
627 #[must_use]
628 pub fn open_key_writable_with_flags(
629 &self,
630 key: &RedisString,
631 flags: KeyFlags,
632 ) -> RedisKeyWritable {
633 RedisKeyWritable::open_with_flags(self.ctx, key, flags)
634 }
635
636 pub fn replicate_verbatim(&self) {
637 raw::replicate_verbatim(self.ctx);
638 }
639
640 pub fn replicate<'a, T: Into<StrCallArgs<'a>>>(&self, command: &str, args: T) {
642 raw::replicate(self.ctx, command, args);
643 }
644
645 #[must_use]
646 pub fn create_string<T: Into<Vec<u8>>>(&self, s: T) -> RedisString {
647 RedisString::create(NonNull::new(self.ctx), s)
648 }
649
650 #[must_use]
651 pub const fn get_raw(&self) -> *mut raw::RedisModuleCtx {
652 self.ctx
653 }
654
655 pub unsafe fn export_shared_api(
659 &self,
660 func: *const ::std::os::raw::c_void,
661 name: *const ::std::os::raw::c_char,
662 ) {
663 raw::export_shared_api(self.ctx, func, name);
664 }
665
666 #[allow(clippy::must_use_candidate)]
670 pub fn notify_keyspace_event(
671 &self,
672 event_type: raw::NotifyEvent,
673 event: &str,
674 keyname: &RedisString,
675 ) -> raw::Status {
676 unsafe { raw::notify_keyspace_event(self.ctx, event_type, event, keyname) }
677 }
678
679 pub fn current_command_name(&self) -> Result<String, RedisError> {
680 unsafe {
681 match raw::RedisModule_GetCurrentCommandName {
682 Some(cmd) => Ok(CStr::from_ptr(cmd(self.ctx)).to_str().unwrap().to_string()),
683 None => Err(RedisError::Str(
684 "API RedisModule_GetCurrentCommandName is not available",
685 )),
686 }
687 }
688 }
689
690 pub fn get_redis_version(&self) -> Result<Version, RedisError> {
693 self.get_redis_version_internal(false)
694 }
695
696 pub fn get_redis_version_rm_call(&self) -> Result<Version, RedisError> {
698 self.get_redis_version_internal(true)
699 }
700
701 pub fn version_from_info(info: RedisValue) -> Result<Version, RedisError> {
702 if let RedisValue::SimpleString(info_str) = info {
703 if let Some(ver) = utils::get_regexp_captures(
704 info_str.as_str(),
705 r"(?m)\bredis_version:([0-9]+)\.([0-9]+)\.([0-9]+)\b",
706 ) {
707 return Ok(Version {
708 major: ver[0].parse::<c_int>().unwrap(),
709 minor: ver[1].parse::<c_int>().unwrap(),
710 patch: ver[2].parse::<c_int>().unwrap(),
711 });
712 }
713 }
714 Err(RedisError::Str("Error getting redis_version"))
715 }
716
717 #[allow(clippy::not_unsafe_ptr_arg_deref)]
718 fn get_redis_version_internal(&self, force_use_rm_call: bool) -> Result<Version, RedisError> {
719 match unsafe { raw::RedisModule_GetServerVersion } {
720 Some(api) if !force_use_rm_call => {
721 Ok(Version::from(unsafe { api() }))
723 }
724 _ => {
725 if let Ok(info) = self.call("info", &["server"]) {
727 Self::version_from_info(info)
728 } else {
729 Err(RedisError::Str("Error calling \"info server\""))
730 }
731 }
732 }
733 }
734 pub fn set_module_options(&self, options: ModuleOptions) {
735 unsafe { raw::RedisModule_SetModuleOptions.unwrap()(self.ctx, options.bits()) };
736 }
737
738 pub fn get_flags(&self) -> ContextFlags {
744 ContextFlags::from_bits_truncate(unsafe {
745 raw::RedisModule_GetContextFlags.unwrap()(self.ctx)
746 })
747 }
748
749 pub fn get_current_user(&self) -> RedisString {
751 let user = unsafe { raw::RedisModule_GetCurrentUserName.unwrap()(self.ctx) };
752 RedisString::from_redis_module_string(ptr::null_mut(), user)
753 }
754
755 pub fn authenticate_user(
760 &self,
761 user_name: &RedisString,
762 ) -> Result<ContextUserScope<'_>, RedisError> {
763 let user = unsafe { raw::RedisModule_GetModuleUserFromUserName.unwrap()(user_name.inner) };
764 if user.is_null() {
765 return Err(RedisError::Str("User does not exists or disabled"));
766 }
767 unsafe { raw::RedisModule_SetContextUser.unwrap()(self.ctx, user) };
768 Ok(ContextUserScope::new(self, user))
769 }
770
771 fn deautenticate_user(&self) {
772 unsafe { raw::RedisModule_SetContextUser.unwrap()(self.ctx, ptr::null_mut()) };
773 }
774
775 pub fn acl_check_key_permission(
779 &self,
780 user_name: &RedisString,
781 key_name: &RedisString,
782 permissions: &AclPermissions,
783 ) -> Result<(), RedisError> {
784 let user = unsafe { raw::RedisModule_GetModuleUserFromUserName.unwrap()(user_name.inner) };
785 if user.is_null() {
786 return Err(RedisError::Str("User does not exists or disabled"));
787 }
788 let acl_permission_result: raw::Status = unsafe {
789 raw::RedisModule_ACLCheckKeyPermissions.unwrap()(
790 user,
791 key_name.inner,
792 permissions.bits(),
793 )
794 }
795 .into();
796 unsafe { raw::RedisModule_FreeModuleUser.unwrap()(user) };
797 let acl_permission_result: Result<(), &str> = acl_permission_result.into();
798 acl_permission_result.map_err(|_e| RedisError::Str("User does not have permissions on key"))
799 }
800
801 api!(
802 [RedisModule_AddPostNotificationJob],
803 pub fn add_post_notification_job<F: FnOnce(&Context) + 'static>(
816 &self,
817 callback: F,
818 ) -> Status {
819 let callback = Box::into_raw(Box::new(Some(callback)));
820 unsafe {
821 RedisModule_AddPostNotificationJob(
822 self.ctx,
823 Some(post_notification_job::<F>),
824 callback as *mut c_void,
825 Some(post_notification_job_free_callback::<F>),
826 )
827 }
828 .into()
829 }
830 );
831
832 api!(
833 [RedisModule_AvoidReplicaTraffic],
834 pub fn avoid_replication_traffic(&self) -> bool {
850 unsafe { RedisModule_AvoidReplicaTraffic() == 1 }
851 }
852 );
853}
854
855extern "C" fn post_notification_job_free_callback<F: FnOnce(&Context)>(pd: *mut c_void) {
856 unsafe { Box::from_raw(pd as *mut Option<F>) };
857}
858
859extern "C" fn post_notification_job<F: FnOnce(&Context)>(
860 ctx: *mut raw::RedisModuleCtx,
861 pd: *mut c_void,
862) {
863 let callback = unsafe { &mut *(pd as *mut Option<F>) };
864 let ctx = Context::new(ctx);
865 callback.take().map_or_else(
866 || {
867 ctx.log(
868 RedisLogLevel::Warning,
869 "Got a None callback on post notification job.",
870 )
871 },
872 |callback| {
873 callback(&ctx);
874 },
875 );
876}
877
878unsafe impl RedisLockIndicator for Context {}
879
880bitflags! {
881 #[derive(Debug)]
884 pub struct AclPermissions : c_int {
885 const ACCESS = raw::REDISMODULE_CMD_KEY_ACCESS as c_int;
887
888 const INSERT = raw::REDISMODULE_CMD_KEY_INSERT as c_int;
890
891 const DELETE = raw::REDISMODULE_CMD_KEY_DELETE as c_int;
893
894 const UPDATE = raw::REDISMODULE_CMD_KEY_UPDATE as c_int;
896 }
897}
898
899#[derive(Debug, Clone)]
901pub enum InfoContextBuilderFieldBottomLevelValue {
902 String(String),
904 I64(i64),
906 U64(u64),
908 F64(f64),
910}
911
912impl From<String> for InfoContextBuilderFieldBottomLevelValue {
913 fn from(value: String) -> Self {
914 Self::String(value)
915 }
916}
917
918impl From<&str> for InfoContextBuilderFieldBottomLevelValue {
919 fn from(value: &str) -> Self {
920 Self::String(value.to_owned())
921 }
922}
923
924impl From<i64> for InfoContextBuilderFieldBottomLevelValue {
925 fn from(value: i64) -> Self {
926 Self::I64(value)
927 }
928}
929
930impl From<u64> for InfoContextBuilderFieldBottomLevelValue {
931 fn from(value: u64) -> Self {
932 Self::U64(value)
933 }
934}
935
936#[derive(Debug, Clone)]
937pub enum InfoContextBuilderFieldTopLevelValue {
938 Value(InfoContextBuilderFieldBottomLevelValue),
940 Dictionary {
964 name: String,
965 fields: InfoContextFieldBottomLevelData,
966 },
967}
968
969impl<T: Into<InfoContextBuilderFieldBottomLevelValue>> From<T>
970 for InfoContextBuilderFieldTopLevelValue
971{
972 fn from(value: T) -> Self {
973 Self::Value(value.into())
974 }
975}
976
977#[derive(Debug)]
980pub struct InfoContextBuilderDictionaryBuilder<'a> {
981 info_section_builder: InfoContextBuilderSectionBuilder<'a>,
983 name: String,
985 fields: InfoContextFieldBottomLevelData,
987}
988
989impl<'a> InfoContextBuilderDictionaryBuilder<'a> {
990 pub fn field<F: Into<InfoContextBuilderFieldBottomLevelValue>>(
992 mut self,
993 name: &str,
994 value: F,
995 ) -> RedisResult<Self> {
996 if self.fields.iter().any(|k| k.0 .0 == name) {
997 return Err(RedisError::String(format!(
998 "Found duplicate key '{name}' in the info dictionary '{}'",
999 self.name
1000 )));
1001 }
1002
1003 self.fields.push((name.to_owned(), value.into()).into());
1004 Ok(self)
1005 }
1006
1007 pub fn build_dictionary(self) -> RedisResult<InfoContextBuilderSectionBuilder<'a>> {
1009 let name = self.name;
1010 let name_ref = name.clone();
1011 self.info_section_builder.field(
1012 &name_ref,
1013 InfoContextBuilderFieldTopLevelValue::Dictionary {
1014 name,
1015 fields: self.fields.to_owned(),
1016 },
1017 )
1018 }
1019}
1020
1021#[derive(Debug)]
1023pub struct InfoContextBuilderSectionBuilder<'a> {
1024 info_builder: InfoContextBuilder<'a>,
1026 name: String,
1028 fields: InfoContextFieldTopLevelData,
1030}
1031
1032impl<'a> InfoContextBuilderSectionBuilder<'a> {
1033 pub fn field<F: Into<InfoContextBuilderFieldTopLevelValue>>(
1035 mut self,
1036 name: &str,
1037 value: F,
1038 ) -> RedisResult<Self> {
1039 if self.fields.iter().any(|(k, _)| k == name) {
1040 return Err(RedisError::String(format!(
1041 "Found duplicate key '{name}' in the info section '{}'",
1042 self.name
1043 )));
1044 }
1045 self.fields.push((name.to_owned(), value.into()));
1046 Ok(self)
1047 }
1048
1049 pub fn add_dictionary(self, dictionary_name: &str) -> InfoContextBuilderDictionaryBuilder<'a> {
1051 InfoContextBuilderDictionaryBuilder {
1052 info_section_builder: self,
1053 name: dictionary_name.to_owned(),
1054 fields: InfoContextFieldBottomLevelData::default(),
1055 }
1056 }
1057
1058 pub fn build_section(mut self) -> RedisResult<InfoContextBuilder<'a>> {
1060 if self
1061 .info_builder
1062 .sections
1063 .iter()
1064 .any(|(k, _)| k == &self.name)
1065 {
1066 return Err(RedisError::String(format!(
1067 "Found duplicate section in the Info reply: {}",
1068 self.name
1069 )));
1070 }
1071
1072 self.info_builder
1073 .sections
1074 .push((self.name.clone(), self.fields));
1075
1076 Ok(self.info_builder)
1077 }
1078}
1079
1080#[derive(Debug, Clone)]
1082#[repr(transparent)]
1083pub struct InfoContextBottomLevelFieldData(pub (String, InfoContextBuilderFieldBottomLevelValue));
1084impl Deref for InfoContextBottomLevelFieldData {
1085 type Target = (String, InfoContextBuilderFieldBottomLevelValue);
1086
1087 fn deref(&self) -> &Self::Target {
1088 &self.0
1089 }
1090}
1091impl std::ops::DerefMut for InfoContextBottomLevelFieldData {
1092 fn deref_mut(&mut self) -> &mut Self::Target {
1093 &mut self.0
1094 }
1095}
1096
1097impl<T: Into<InfoContextBuilderFieldBottomLevelValue>> From<(String, T)>
1098 for InfoContextBottomLevelFieldData
1099{
1100 fn from(value: (String, T)) -> Self {
1101 Self((value.0, value.1.into()))
1102 }
1103}
1104#[derive(Debug, Default, Clone)]
1107#[repr(transparent)]
1108pub struct InfoContextFieldBottomLevelData(pub Vec<InfoContextBottomLevelFieldData>);
1109impl Deref for InfoContextFieldBottomLevelData {
1110 type Target = Vec<InfoContextBottomLevelFieldData>;
1111
1112 fn deref(&self) -> &Self::Target {
1113 &self.0
1114 }
1115}
1116impl std::ops::DerefMut for InfoContextFieldBottomLevelData {
1117 fn deref_mut(&mut self) -> &mut Self::Target {
1118 &mut self.0
1119 }
1120}
1121
1122pub type InfoContextFieldTopLevelData = Vec<(String, InfoContextBuilderFieldTopLevelValue)>;
1125pub type OneInfoSectionData = (String, InfoContextFieldTopLevelData);
1127pub type InfoContextTreeData = Vec<OneInfoSectionData>;
1129
1130impl<T: Into<InfoContextBuilderFieldBottomLevelValue>> From<BTreeMap<String, T>>
1131 for InfoContextFieldBottomLevelData
1132{
1133 fn from(value: BTreeMap<String, T>) -> Self {
1134 Self(
1135 value
1136 .into_iter()
1137 .map(|e| (e.0, e.1.into()).into())
1138 .collect(),
1139 )
1140 }
1141}
1142
1143impl<T: Into<InfoContextBuilderFieldBottomLevelValue>> From<HashMap<String, T>>
1144 for InfoContextFieldBottomLevelData
1145{
1146 fn from(value: HashMap<String, T>) -> Self {
1147 Self(
1148 value
1149 .into_iter()
1150 .map(|e| (e.0, e.1.into()).into())
1151 .collect(),
1152 )
1153 }
1154}
1155
1156#[derive(Debug)]
1157pub struct InfoContextBuilder<'a> {
1158 context: &'a InfoContext,
1159 sections: InfoContextTreeData,
1160}
1161impl<'a> InfoContextBuilder<'a> {
1162 fn add_bottom_level_field(
1163 &self,
1164 key: &str,
1165 value: &InfoContextBuilderFieldBottomLevelValue,
1166 ) -> RedisResult<()> {
1167 use InfoContextBuilderFieldBottomLevelValue as BottomLevel;
1168
1169 match value {
1170 BottomLevel::String(string) => add_info_field_str(self.context.ctx, key, string),
1171 BottomLevel::I64(number) => add_info_field_long_long(self.context.ctx, key, *number),
1172 BottomLevel::U64(number) => {
1173 add_info_field_unsigned_long_long(self.context.ctx, key, *number)
1174 }
1175 BottomLevel::F64(number) => add_info_field_double(self.context.ctx, key, *number),
1176 }
1177 .into()
1178 }
1179 fn add_top_level_fields(&self, fields: &InfoContextFieldTopLevelData) -> RedisResult<()> {
1182 use InfoContextBuilderFieldTopLevelValue as TopLevel;
1183
1184 fields.iter().try_for_each(|(key, value)| match value {
1185 TopLevel::Value(bottom_level) => self.add_bottom_level_field(key, bottom_level),
1186 TopLevel::Dictionary { name, fields } => {
1187 std::convert::Into::<RedisResult<()>>::into(add_info_begin_dict_field(
1188 self.context.ctx,
1189 name,
1190 ))?;
1191 fields
1192 .iter()
1193 .try_for_each(|f| self.add_bottom_level_field(&f.0 .0, &f.0 .1))?;
1194 add_info_end_dict_field(self.context.ctx).into()
1195 }
1196 })
1197 }
1198
1199 fn finalise_data(&self) -> RedisResult<()> {
1200 self.sections
1201 .iter()
1202 .try_for_each(|(section_name, section_fields)| -> RedisResult<()> {
1203 if add_info_section(self.context.ctx, Some(section_name)) == Status::Ok {
1204 self.add_top_level_fields(section_fields)
1205 } else {
1206 Ok(())
1208 }
1209 })
1210 }
1211
1212 pub fn build_info(self) -> RedisResult<&'a InfoContext> {
1214 self.finalise_data().map(|_| self.context)
1215 }
1216
1217 pub fn add_section(self, name: &'a str) -> InfoContextBuilderSectionBuilder {
1219 InfoContextBuilderSectionBuilder {
1220 info_builder: self,
1221 name: name.to_owned(),
1222 fields: InfoContextFieldTopLevelData::new(),
1223 }
1224 }
1225
1226 pub(crate) fn add_section_unchecked(mut self, section: OneInfoSectionData) -> Self {
1229 self.sections.push(section);
1230 self
1231 }
1232}
1233
1234impl<'a> From<&'a InfoContext> for InfoContextBuilder<'a> {
1235 fn from(context: &'a InfoContext) -> Self {
1236 Self {
1237 context,
1238 sections: InfoContextTreeData::new(),
1239 }
1240 }
1241}
1242
1243#[derive(Debug)]
1244pub struct InfoContext {
1245 pub ctx: *mut raw::RedisModuleInfoCtx,
1246}
1247
1248impl InfoContext {
1249 pub const fn new(ctx: *mut raw::RedisModuleInfoCtx) -> Self {
1250 Self { ctx }
1251 }
1252
1253 pub fn builder(&self) -> InfoContextBuilder<'_> {
1255 InfoContextBuilder::from(self)
1256 }
1257
1258 pub fn build_one_section<T: Into<OneInfoSectionData>>(&self, data: T) -> RedisResult<()> {
1260 self.builder()
1261 .add_section_unchecked(data.into())
1262 .build_info()?;
1263 Ok(())
1264 }
1265
1266 #[deprecated = "Please use [`InfoContext::builder`] instead."]
1267 pub fn add_info_section(&self, name: Option<&str>) -> Status {
1270 add_info_section(self.ctx, name)
1271 }
1272
1273 #[deprecated = "Please use [`InfoContext::builder`] instead."]
1274 pub fn add_info_field_str(&self, name: &str, content: &str) -> Status {
1278 add_info_field_str(self.ctx, name, content)
1279 }
1280
1281 #[deprecated = "Please use [`InfoContext::builder`] instead."]
1282 pub fn add_info_field_long_long(&self, name: &str, value: c_longlong) -> Status {
1286 add_info_field_long_long(self.ctx, name, value)
1287 }
1288}
1289
1290bitflags! {
1291 pub struct ContextFlags : c_int {
1292 const LUA = raw::REDISMODULE_CTX_FLAGS_LUA as c_int;
1294
1295 const MULTI = raw::REDISMODULE_CTX_FLAGS_MULTI as c_int;
1297
1298 const MASTER = raw::REDISMODULE_CTX_FLAGS_MASTER as c_int;
1300
1301 const SLAVE = raw::REDISMODULE_CTX_FLAGS_SLAVE as c_int;
1303
1304 const READONLY = raw::REDISMODULE_CTX_FLAGS_READONLY as c_int;
1306
1307 const CLUSTER = raw::REDISMODULE_CTX_FLAGS_CLUSTER as c_int;
1309
1310 const AOF = raw::REDISMODULE_CTX_FLAGS_AOF as c_int;
1312
1313 const RDB = raw::REDISMODULE_CTX_FLAGS_RDB as c_int;
1315
1316 const MAXMEMORY = raw::REDISMODULE_CTX_FLAGS_MAXMEMORY as c_int;
1318
1319 const EVICTED = raw::REDISMODULE_CTX_FLAGS_EVICT as c_int;
1321
1322 const OOM = raw::REDISMODULE_CTX_FLAGS_OOM as c_int;
1324
1325 const OOM_WARNING = raw::REDISMODULE_CTX_FLAGS_OOM_WARNING as c_int;
1327
1328 const REPLICATED = raw::REDISMODULE_CTX_FLAGS_REPLICATED as c_int;
1330
1331 const LOADING = raw::REDISMODULE_CTX_FLAGS_LOADING as c_int;
1333
1334 const REPLICA_IS_STALE = raw::REDISMODULE_CTX_FLAGS_REPLICA_IS_STALE as c_int;
1336
1337 const REPLICA_IS_CONNECTING = raw::REDISMODULE_CTX_FLAGS_REPLICA_IS_CONNECTING as c_int;
1339
1340 const REPLICA_IS_TRANSFERRING = raw::REDISMODULE_CTX_FLAGS_REPLICA_IS_TRANSFERRING as c_int;
1342
1343 const REPLICA_IS_ONLINE = raw::REDISMODULE_CTX_FLAGS_REPLICA_IS_ONLINE as c_int;
1345
1346 const ACTIVE_CHILD = raw::REDISMODULE_CTX_FLAGS_ACTIVE_CHILD as c_int;
1348
1349 const IS_CHILD = raw::REDISMODULE_CTX_FLAGS_IS_CHILD as c_int;
1351
1352 const MULTI_DIRTY = raw::REDISMODULE_CTX_FLAGS_MULTI_DIRTY as c_int;
1354
1355 const DENY_BLOCKING = raw::REDISMODULE_CTX_FLAGS_DENY_BLOCKING as c_int;
1358
1359 const FLAGS_RESP3 = raw::REDISMODULE_CTX_FLAGS_RESP3 as c_int;
1361
1362 const ASYNC_LOADING = raw::REDISMODULE_CTX_FLAGS_ASYNC_LOADING as c_int;
1364 }
1365}